From 12cbb3af2413dc28838ed271351dda16df8f7bdb Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 06:22:10 -0400 Subject: Separating dht-client library from bittorrent package. --- src/Network/BitTorrent/Exchange/Download.hs | 296 ---------------------------- 1 file changed, 296 deletions(-) delete mode 100644 src/Network/BitTorrent/Exchange/Download.hs (limited to 'src/Network/BitTorrent/Exchange/Download.hs') diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs deleted file mode 100644 index 981db2fb..00000000 --- a/src/Network/BitTorrent/Exchange/Download.hs +++ /dev/null @@ -1,296 +0,0 @@ --- | --- Copyright : (c) Sam Truzjan 2013 --- License : BSD3 --- Maintainer : pxqr.sta@gmail.com --- Stability : experimental --- Portability : portable --- --- --- -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE FunctionalDependencies #-} -{-# LANGUAGE TemplateHaskell #-} -module Network.BitTorrent.Exchange.Download - ( -- * Downloading - Download (..) - , Updates - , runDownloadUpdates - - -- ** Metadata - -- $metadata-download - , MetadataDownload - , metadataDownload - - -- ** Content - -- $content-download - , ContentDownload - , contentDownload - ) where - -import Control.Applicative -import Control.Concurrent -import Control.Lens -import Control.Monad.State -import Data.BEncode as BE -import Data.ByteString as BS -import Data.ByteString.Lazy as BL -import Data.Default -import Data.List as L -import Data.Maybe -import Data.Map as M -import Data.Tuple - -import Data.Torrent as Torrent -import Network.Address -import Network.BitTorrent.Exchange.Bitfield as BF -import Network.BitTorrent.Exchange.Block as Block -import Network.BitTorrent.Exchange.Message as Msg -import System.Torrent.Storage (Storage, writePiece) - - -{----------------------------------------------------------------------- --- Class ------------------------------------------------------------------------} - -type Updates s a = StateT s IO a - -runDownloadUpdates :: MVar s -> Updates s a -> IO a -runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) - -class Download s chunk | s -> chunk where - scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] - - -- | - scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) - scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf - - -- | Get number of sent requests to this peer. - getRequestQueueLength :: PeerAddr IP -> Updates s Int - - -- | Remove all pending block requests to the remote peer. May be used - -- when: - -- - -- * a peer closes connection; - -- - -- * remote peer choked this peer; - -- - -- * timeout expired. - -- - resetPending :: PeerAddr IP -> Updates s () - - -- | MAY write to storage, if a new piece have been completed. - -- - -- You should check if a returned by peer block is actually have - -- been requested and in-flight. This is needed to avoid "I send - -- random corrupted block" attacks. - pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) - -{----------------------------------------------------------------------- --- Metadata download ------------------------------------------------------------------------} --- $metadata-download --- TODO - -data MetadataDownload = MetadataDownload - { _pendingPieces :: [(PeerAddr IP, PieceIx)] - , _bucket :: Bucket - , _topic :: InfoHash - } - -makeLenses ''MetadataDownload - --- | Create a new scheduler for infodict of the given size. -metadataDownload :: Int -> InfoHash -> MetadataDownload -metadataDownload ps = MetadataDownload [] (Block.empty ps) - -instance Default MetadataDownload where - def = error "instance Default MetadataDownload" - ---cancelPending :: PieceIx -> Updates () -cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) - -instance Download MetadataDownload (Piece BS.ByteString) where - scheduleBlock addr bf = do - bkt <- use bucket - case spans metadataPieceSize bkt of - [] -> return Nothing - ((off, _ ) : _) -> do - let pix = off `div` metadataPieceSize - pendingPieces %= ((addr, pix) :) - return (Just (BlockIx pix 0 metadataPieceSize)) - - resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) - - pushBlock addr Torrent.Piece {..} = do - p <- use pendingPieces - when ((addr, pieceIndex) `L.notElem` p) $ - error "not requested" - cancelPending pieceIndex - - bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData - b <- use bucket - case toPiece b of - Nothing -> return Nothing - Just chunks -> do - t <- use topic - case parseInfoDict (BL.toStrict chunks) t of - Right x -> do - pendingPieces .= [] - return undefined -- (Just x) - Left e -> do - pendingPieces .= [] - bucket .= Block.empty (Block.size b) - return undefined -- Nothing - where - -- todo use incremental parsing to avoid BS.concat call - parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict - parseInfoDict chunk topic = - case BE.decode chunk of - Right (infodict @ InfoDict {..}) - | topic == idInfoHash -> return infodict - | otherwise -> Left "broken infodict" - Left err -> Left $ "unable to parse infodict " ++ err - -{----------------------------------------------------------------------- --- Content download ------------------------------------------------------------------------} --- $content-download --- --- A block can have one of the following status: --- --- 1) /not allowed/: Piece is not in download set. --- --- 2) /waiting/: (allowed?) Block have been allowed to download, --- but /this/ peer did not send any 'Request' message for this --- block. To allow some piece use --- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' --- and 'allowPiece'. --- --- 3) /inflight/: (pending?) Block have been requested but --- /remote/ peer did not send any 'Piece' message for this block. --- Related functions 'markInflight' --- --- 4) /pending/: (stalled?) Block have have been downloaded --- Related functions 'insertBlock'. --- --- Piece status: --- --- 1) /assembled/: (downloaded?) All blocks in piece have been --- downloaded but the piece did not verified yet. --- --- * Valid: go to completed; --- --- * Invalid: go to waiting. --- --- 2) /corrupted/: --- --- 3) /downloaded/: (verified?) A piece have been successfully --- verified via the hash. Usually the piece should be stored to --- the 'System.Torrent.Storage' and /this/ peer should send 'Have' --- messages to the /remote/ peers. --- - -data PieceEntry = PieceEntry - { pending :: [(PeerAddr IP, BlockIx)] - , stalled :: Bucket - } - -pieceEntry :: PieceSize -> PieceEntry -pieceEntry s = PieceEntry [] (Block.empty s) - -isEmpty :: PieceEntry -> Bool -isEmpty PieceEntry {..} = L.null pending && Block.null stalled - -_holes :: PieceIx -> PieceEntry -> [BlockIx] -_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) - where - mkBlockIx (off, sz) = BlockIx pix off sz - -data ContentDownload = ContentDownload - { inprogress :: !(Map PieceIx PieceEntry) - , bitfield :: !Bitfield - , pieceSize :: !PieceSize - , contentStorage :: Storage - } - -contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload -contentDownload = ContentDownload M.empty - ---modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () -modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s - { inprogress = alter (g pieceSize) pix inprogress } - where - g s = h . f . fromMaybe (pieceEntry s) - h e - | isEmpty e = Nothing - | otherwise = Just e - -instance Download ContentDownload (Block BL.ByteString) where - scheduleBlocks n addr maskBF = do - ContentDownload {..} <- get - let wantPieces = maskBF `BF.difference` bitfield - let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ - M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) - inprogress - - bixs <- if L.null wantBlocks - then do - mpix <- choosePiece wantPieces - case mpix of -- TODO return 'n' blocks - Nothing -> return [] - Just pix -> return [leadingBlock pix defaultTransferSize] - else chooseBlocks wantBlocks n - - forM_ bixs $ \ bix -> do - modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e - { pending = (addr, bix) : pending } - - return bixs - where - -- TODO choose block nearest to pending or stalled sets to reduce disk - -- seeks on remote machines - --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] - chooseBlocks xs n = return (L.take n xs) - - -- TODO use selection strategies from Exchange.Selector - --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) - choosePiece bf - | BF.null bf = return $ Nothing - | otherwise = return $ Just $ BF.findMin bf - - getRequestQueueLength addr = do - m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) - return $ L.sum $ L.map L.length $ M.elems m - - resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } - where - reset = fmap $ \ e -> e - { pending = L.filter (not . (==) addr . fst) (pending e) } - - pushBlock addr blk @ Block {..} = do - mpe <- gets (M.lookup blkPiece . inprogress) - case mpe of - Nothing -> return Nothing - Just (pe @ PieceEntry {..}) - | blockIx blk `L.notElem` fmap snd pending -> return Nothing - | otherwise -> do - let bkt' = Block.insertLazy blkOffset blkData stalled - case toPiece bkt' of - Nothing -> do - modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e - { pending = L.filter ((==) (blockIx blk) . snd) pending - , stalled = bkt' - } - return (Just False) - - Just pieceData -> do - -- TODO verify - storage <- gets contentStorage - liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage - modify $ \ s @ ContentDownload {..} -> s - { inprogress = M.delete blkPiece inprogress - , bitfield = BF.insert blkPiece bitfield - } - return (Just True) -- cgit v1.2.3