From cb75f50f4cae778d1dfc57edff771a5145dd9894 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 17 Apr 2014 15:27:43 +0400 Subject: [Exchange] Move all download stuff to single module --- src/Network/BitTorrent/Exchange/Assembler.hs | 168 --------- src/Network/BitTorrent/Exchange/Download.hs | 376 ++++++++++++++------- src/Network/BitTorrent/Exchange/Session.hs | 17 +- .../BitTorrent/Exchange/Session/Metadata.hs | 102 ------ 4 files changed, 257 insertions(+), 406 deletions(-) delete mode 100644 src/Network/BitTorrent/Exchange/Assembler.hs delete mode 100644 src/Network/BitTorrent/Exchange/Session/Metadata.hs (limited to 'src/Network') diff --git a/src/Network/BitTorrent/Exchange/Assembler.hs b/src/Network/BitTorrent/Exchange/Assembler.hs deleted file mode 100644 index 7abb8ab0..00000000 --- a/src/Network/BitTorrent/Exchange/Assembler.hs +++ /dev/null @@ -1,168 +0,0 @@ --- | --- Copyright : (c) Sam Truzjan 2013 --- License : BSD3 --- Maintainer : pxqr.sta@gmail.com --- Stability : experimental --- Portability : portable --- --- Assembler is used to build pieces from blocks. In general --- 'Assembler' should be used to handle 'Transfer' messages when --- --- A block can have one of the following status: --- --- 1) /not allowed/: Piece is not in download set. 'null' and 'empty'. --- --- --- 2) /waiting/: (allowed?) Block have been allowed to download, --- but /this/ peer did not send any 'Request' message for this --- block. To allow some piece use --- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' --- and 'allowPiece'. --- --- 3) /inflight/: (pending?) Block have been requested but --- /remote/ peer did not send any 'Piece' message for this block. --- Related functions 'markInflight' --- --- 4) /pending/: (stalled?) Block have have been downloaded --- Related functions 'insertBlock'. --- --- Piece status: --- --- 1) /assembled/: (downloaded?) All blocks in piece have been --- downloaded but the piece did not verified yet. --- --- * Valid: go to completed; --- --- * Invalid: go to waiting. --- --- 2) /corrupted/: --- --- 3) /downloaded/: (verified?) A piece have been successfully --- verified via the hash. Usually the piece should be stored to --- the 'System.Torrent.Storage' and /this/ peer should send 'Have' --- messages to the /remote/ peers. --- -{-# LANGUAGE TemplateHaskell #-} -module Network.BitTorrent.Exchange.Assembler - ( -- * Assembler - Assembler - - -- * Query - , Network.BitTorrent.Exchange.Assembler.null - , Network.BitTorrent.Exchange.Assembler.size - - -- * - , Network.BitTorrent.Exchange.Assembler.empty - , allowPiece - - -- * Debugging - , Network.BitTorrent.Exchange.Assembler.valid - ) where - -import Control.Applicative -import Control.Lens -import Data.IntMap.Strict as IM -import Data.List as L -import Data.Map as M -import Data.Maybe -import Data.IP - -import Data.Torrent -import Network.BitTorrent.Address -import Network.BitTorrent.Exchange.Block as B - -{----------------------------------------------------------------------- --- Assembler ------------------------------------------------------------------------} - -type Timestamp = () -{- -data BlockRequest = BlockRequest - { requestSent :: Timestamp - , requestedPeer :: PeerAddr IP - , requestedBlock :: BlockIx - } --} -type BlockRange = (BlockOffset, BlockSize) -type PieceMap = IntMap - -data Assembler = Assembler - { -- | A set of blocks that have been 'Request'ed but not yet acked. - _inflight :: Map (PeerAddr IP) (PieceMap [BlockRange]) - - -- | A set of blocks that but not yet assembled. - , _pending :: PieceMap Bucket - - -- | Used for validation of assembled pieces. - , info :: PieceInfo - } - -$(makeLenses ''Assembler) - - -valid :: Assembler -> Bool -valid = undefined - -data Result a - = Completed (Piece a) - | Corrupted PieceIx - | NotRequested PieceIx - | Overlapped BlockIx - -null :: Assembler -> Bool -null = undefined - -size :: Assembler -> Bool -size = undefined - -empty :: PieceInfo -> Assembler -empty = Assembler M.empty IM.empty - -allowPiece :: PieceIx -> Assembler -> Assembler -allowPiece pix a @ Assembler {..} = over pending (IM.insert pix bkt) a - where - bkt = B.empty (piPieceLength info) - -allowedSet :: (PeerAddr IP) -> Assembler -> [BlockIx] -allowedSet = undefined - ---inflight :: PeerAddr -> BlockIx -> Assembler -> Assembler ---inflight = undefined - --- You should check if a returned by peer block is actually have --- been requested and in-flight. This is needed to avoid "I send --- random corrupted block" attacks. -insert :: PeerAddr IP -> Block a -> Assembler -> Assembler -insert = undefined - -{- -insert :: Block a -> Assembler a -> (Assembler a, Maybe (Result a)) -insert blk @ Block {..} a @ Assembler {..} = undefined -{- - = let (pending, mpiece) = inserta blk piecePending - in (Assembler inflightSet pending pieceInfo, f <$> mpiece) - where - f p = undefined --- | checkPieceLazy pieceInfo p = Assembled p --- | otherwise = Corrupted ixPiece --} - - -inflightPieces :: Assembler a -> [PieceIx] -inflightPieces Assembler {..} = IM.keys piecePending - -completeBlocks :: PieceIx -> Assembler a -> [Block a] -completeBlocks pix Assembler {..} = fromMaybe [] $ IM.lookup pix piecePending - -incompleteBlocks :: PieceIx -> Assembler a -> [BlockIx] -incompleteBlocks = undefined - -nextBlock :: Assembler a -> Maybe (Assembler a, BlockIx) -nextBlock Assembler {..} = undefined - -inserta :: Block a - -> PieceMap [Block a] - -> (PieceMap [Block a], Maybe (Piece a)) -inserta = undefined - --} diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs index fcc94485..9a6b5f91 100644 --- a/src/Network/BitTorrent/Exchange/Download.hs +++ b/src/Network/BitTorrent/Exchange/Download.hs @@ -1,44 +1,196 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- +-- +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE TemplateHaskell #-} module Network.BitTorrent.Exchange.Download - ( -- * Environment - StatusUpdates - , runStatusUpdates - - -- * Status - , SessionStatus - , sessionStatus - - -- * Query - , getBitfield - , getRequestQueueLength - - -- * Control - , scheduleBlocks - , resetPending - , pushBlock + ( -- * Downloading + Download (..) + , Updates + , runDownloadUpdates + + -- ** Metadata + -- $metadata-download + , MetadataDownload + , metadataDownload + + -- ** Content + -- $content-download + , ContentDownload + , contentDownload ) where import Control.Applicative import Control.Concurrent +import Control.Lens import Control.Monad.State +import Data.BEncode as BE +import Data.ByteString as BS import Data.ByteString.Lazy as BL import Data.Default import Data.List as L import Data.Maybe import Data.Map as M -import Data.Set as S import Data.Tuple -import Data.Torrent -import Network.BitTorrent.Exchange.Bitfield as BF +import Data.Torrent as Torrent import Network.BitTorrent.Address -import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Bitfield as BF +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Message as Msg import System.Torrent.Storage (Storage, writePiece) {----------------------------------------------------------------------- --- Piece entry +-- Class -----------------------------------------------------------------------} +type Updates s a = StateT s IO a + +runDownloadUpdates :: MVar s -> Updates s a -> IO a +runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) + +class Download s chunk | s -> chunk where + scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] + + -- | + scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) + scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf + + -- | Get number of sent requests to this peer. + getRequestQueueLength :: PeerAddr IP -> Updates s Int + + -- | Remove all pending block requests to the remote peer. May be used + -- when: + -- + -- * a peer closes connection; + -- + -- * remote peer choked this peer; + -- + -- * timeout expired. + -- + resetPending :: PeerAddr IP -> Updates s () + + -- | MAY write to storage, if a new piece have been completed. + -- + -- You should check if a returned by peer block is actually have + -- been requested and in-flight. This is needed to avoid "I send + -- random corrupted block" attacks. + pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) + +{----------------------------------------------------------------------- +-- Metadata download +-----------------------------------------------------------------------} +-- $metadata-download +-- TODO + +data MetadataDownload = MetadataDownload + { _pendingPieces :: [(PeerAddr IP, PieceIx)] + , _bucket :: Bucket + , _topic :: InfoHash + } + +makeLenses ''MetadataDownload + +-- | Create a new scheduler for infodict of the given size. +metadataDownload :: Int -> InfoHash -> MetadataDownload +metadataDownload ps = MetadataDownload [] (Block.empty ps) + +instance Default MetadataDownload where + def = error "instance Default MetadataDownload" + +--cancelPending :: PieceIx -> Updates () +cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) + +instance Download MetadataDownload (Piece BS.ByteString) where + scheduleBlock addr bf = do + bkt <- use bucket + case spans metadataPieceSize bkt of + [] -> return Nothing + ((off, _ ) : _) -> do + let pix = off `div` metadataPieceSize + pendingPieces %= ((addr, pix) :) + return (Just (BlockIx pix 0 metadataPieceSize)) + + resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) + + pushBlock addr Torrent.Piece {..} = do + p <- use pendingPieces + when ((addr, pieceIndex) `L.notElem` p) $ + error "not requested" + cancelPending pieceIndex + + bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData + b <- use bucket + case toPiece b of + Nothing -> return Nothing + Just chunks -> do + t <- use topic + case parseInfoDict (BL.toStrict chunks) t of + Right x -> do + pendingPieces .= [] + return undefined -- (Just x) + Left e -> do + pendingPieces .= [] + bucket .= Block.empty (Block.size b) + return undefined -- Nothing + where + -- todo use incremental parsing to avoid BS.concat call + parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict + parseInfoDict chunk topic = + case BE.decode chunk of + Right (infodict @ InfoDict {..}) + | topic == idInfoHash -> return infodict + | otherwise -> Left "broken infodict" + Left err -> Left $ "unable to parse infodict " ++ err + +{----------------------------------------------------------------------- +-- Content download +-----------------------------------------------------------------------} +-- $content-download +-- +-- A block can have one of the following status: +-- +-- 1) /not allowed/: Piece is not in download set. +-- +-- 2) /waiting/: (allowed?) Block have been allowed to download, +-- but /this/ peer did not send any 'Request' message for this +-- block. To allow some piece use +-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' +-- and 'allowPiece'. +-- +-- 3) /inflight/: (pending?) Block have been requested but +-- /remote/ peer did not send any 'Piece' message for this block. +-- Related functions 'markInflight' +-- +-- 4) /pending/: (stalled?) Block have have been downloaded +-- Related functions 'insertBlock'. +-- +-- Piece status: +-- +-- 1) /assembled/: (downloaded?) All blocks in piece have been +-- downloaded but the piece did not verified yet. +-- +-- * Valid: go to completed; +-- +-- * Invalid: go to waiting. +-- +-- 2) /corrupted/: +-- +-- 3) /downloaded/: (verified?) A piece have been successfully +-- verified via the hash. Usually the piece should be stored to +-- the 'System.Torrent.Storage' and /this/ peer should send 'Have' +-- messages to the /remote/ peers. +-- + data PieceEntry = PieceEntry { pending :: [(PeerAddr IP, BlockIx)] , stalled :: Bucket @@ -50,44 +202,23 @@ pieceEntry s = PieceEntry [] (Block.empty s) isEmpty :: PieceEntry -> Bool isEmpty PieceEntry {..} = L.null pending && Block.null stalled -holes :: PieceIx -> PieceEntry -> [BlockIx] -holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) +_holes :: PieceIx -> PieceEntry -> [BlockIx] +_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) where mkBlockIx (off, sz) = BlockIx pix off sz -{----------------------------------------------------------------------- --- Session status ------------------------------------------------------------------------} - -data SessionStatus = SessionStatus - { inprogress :: !(Map PieceIx PieceEntry) - , bitfield :: !Bitfield - , pieceSize :: !PieceSize - } - -sessionStatus :: Bitfield -> PieceSize -> SessionStatus -sessionStatus bf ps = SessionStatus - { inprogress = M.empty - , bitfield = bf - , pieceSize = ps +data ContentDownload = ContentDownload + { inprogress :: !(Map PieceIx PieceEntry) + , bitfield :: !Bitfield + , pieceSize :: !PieceSize + , contentStorage :: Storage } -type StatusUpdates a = StateT SessionStatus IO a - --- | -runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a -runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m) - -getBitfield :: MVar SessionStatus -> IO Bitfield -getBitfield var = bitfield <$> readMVar var +contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload +contentDownload = ContentDownload M.empty -getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int -getRequestQueueLength addr = do - m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress) - return $ L.sum $ L.map L.length m - -modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates () -modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s +--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () +modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s { inprogress = alter (g pieceSize) pix inprogress } where g s = h . f . fromMaybe (pieceEntry s) @@ -95,81 +226,70 @@ modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | isEmpty e = Nothing | otherwise = Just e -{----------------------------------------------------------------------- --- Piece download ------------------------------------------------------------------------} +instance Download ContentDownload (Block BL.ByteString) where + scheduleBlocks n addr maskBF = do + ContentDownload {..} <- get + let wantPieces = maskBF `BF.difference` bitfield + let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ + M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) + inprogress --- TODO choose block nearest to pending or stalled sets to reduce disk --- seeks on remote machines -chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] -chooseBlocks xs n = return (L.take n xs) - --- TODO use selection strategies from Exchange.Selector -choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) -choosePiece bf - | BF.null bf = return $ Nothing - | otherwise = return $ Just $ BF.findMin bf - -scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] -scheduleBlocks addr maskBF n = do - SessionStatus {..} <- get - let wantPieces = maskBF `BF.difference` bitfield - let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ - M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress - - bixs <- if L.null wantBlocks - then do - mpix <- choosePiece wantPieces - case mpix of -- TODO return 'n' blocks - Nothing -> return [] - Just pix -> return [leadingBlock pix defaultTransferSize] - else chooseBlocks wantBlocks n - - forM_ bixs $ \ bix -> do - modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e - { pending = (addr, bix) : pending } - - return bixs - - --- | Remove all pending block requests to the remote peer. May be used --- when: --- --- * a peer closes connection; --- --- * remote peer choked this peer; --- --- * timeout expired. --- -resetPending :: PeerAddr IP -> StatusUpdates () -resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } - where - reset = fmap $ \ e -> e + bixs <- if L.null wantBlocks + then do + mpix <- choosePiece wantPieces + case mpix of -- TODO return 'n' blocks + Nothing -> return [] + Just pix -> return [leadingBlock pix defaultTransferSize] + else chooseBlocks wantBlocks n + + forM_ bixs $ \ bix -> do + modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e + { pending = (addr, bix) : pending } + + return bixs + where + -- TODO choose block nearest to pending or stalled sets to reduce disk + -- seeks on remote machines + --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] + chooseBlocks xs n = return (L.take n xs) + + -- TODO use selection strategies from Exchange.Selector + --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) + choosePiece bf + | BF.null bf = return $ Nothing + | otherwise = return $ Just $ BF.findMin bf + + getRequestQueueLength addr = do + m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) + return $ L.sum $ L.map L.length $ M.elems m + + resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } + where + reset = fmap $ \ e -> e { pending = L.filter (not . (==) addr . fst) (pending e) } --- | MAY write to storage, if a new piece have been completed. -pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) -pushBlock blk @ Block {..} storage = do - mpe <- gets (M.lookup blkPiece . inprogress) - case mpe of - Nothing -> return Nothing - Just (pe @ PieceEntry {..}) - | blockIx blk `L.notElem` fmap snd pending -> return Nothing - | otherwise -> do - let bkt' = Block.insertLazy blkOffset blkData stalled - case toPiece bkt' of - Nothing -> do - modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e - { pending = L.filter ((==) (blockIx blk) . snd) pending - , stalled = bkt' - } - return (Just False) - - Just pieceData -> do - -- TODO verify - liftIO $ writePiece (Piece blkPiece pieceData) storage - modify $ \ s @ SessionStatus {..} -> s - { inprogress = M.delete blkPiece inprogress - , bitfield = BF.insert blkPiece bitfield - } - return (Just True) + pushBlock addr blk @ Block {..} = do + mpe <- gets (M.lookup blkPiece . inprogress) + case mpe of + Nothing -> return Nothing + Just (pe @ PieceEntry {..}) + | blockIx blk `L.notElem` fmap snd pending -> return Nothing + | otherwise -> do + let bkt' = Block.insertLazy blkOffset blkData stalled + case toPiece bkt' of + Nothing -> do + modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e + { pending = L.filter ((==) (blockIx blk) . snd) pending + , stalled = bkt' + } + return (Just False) + + Just pieceData -> do + -- TODO verify + storage <- gets contentStorage + liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage + modify $ \ s @ ContentDownload {..} -> s + { inprogress = M.delete blkPiece inprogress + , bitfield = BF.insert blkPiece bitfield + } + return (Just True) diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 49bff44f..30b7ed0e 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -51,9 +51,8 @@ import Network.BitTorrent.Address import Network.BitTorrent.Exchange.Bitfield as BF import Network.BitTorrent.Exchange.Block as Block import Network.BitTorrent.Exchange.Connection -import Network.BitTorrent.Exchange.Download as SS +import Network.BitTorrent.Exchange.Download as D import Network.BitTorrent.Exchange.Message as Message -import Network.BitTorrent.Exchange.Session.Metadata as Metadata import System.Torrent.Storage {----------------------------------------------------------------------- @@ -90,13 +89,13 @@ type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () data SessionState = WaitingMetadata - { metadataDownload :: MVar Metadata.Status + { metadataDownload :: MVar MetadataDownload , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters , contentRootPath :: FilePath } | HavingMetadata { metadataCache :: Cached InfoDict - , contentDownload :: MVar SessionStatus + , contentDownload :: MVar ContentDownload , contentStorage :: Storage } @@ -105,8 +104,9 @@ newSessionState rootPath (Left ih ) = do WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath newSessionState rootPath (Right dict) = do storage <- openInfoDict ReadWriteEx rootPath dict - download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) - (piPieceLength (idPieceInfo dict)) + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage return $ HavingMetadata (cache dict) download storage closeSessionState :: SessionState -> IO () @@ -116,8 +116,9 @@ closeSessionState HavingMetadata {..} = close contentStorage haveMetadata :: InfoDict -> SessionState -> IO SessionState haveMetadata dict WaitingMetadata {..} = do storage <- openInfoDict ReadWriteEx contentRootPath dict - download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) - (piPieceLength (idPieceInfo dict)) + download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + storage return HavingMetadata { metadataCache = cache dict , contentDownload = download diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs deleted file mode 100644 index f08ebe00..00000000 --- a/src/Network/BitTorrent/Exchange/Session/Metadata.hs +++ /dev/null @@ -1,102 +0,0 @@ -{-# LANGUAGE TemplateHaskell #-} -module Network.BitTorrent.Exchange.Session.Metadata - ( -- * Transfer state - Status - , nullStatus - - -- * State updates - , Updates - , runUpdates - - -- * Piece transfer control - , scheduleBlock - , resetPending - , cancelPending - , pushBlock - ) where - -import Control.Concurrent -import Control.Lens -import Control.Monad.Reader -import Control.Monad.State -import Data.ByteString as BS -import Data.ByteString.Lazy as BL -import Data.Default -import Data.List as L -import Data.Tuple - -import Data.BEncode as BE -import Data.Torrent as Torrent -import Network.BitTorrent.Address -import Network.BitTorrent.Exchange.Block as Block -import Network.BitTorrent.Exchange.Message as Message hiding (Status) - - --- | Current transfer status. -data Status = Status - { _pending :: [(PeerAddr IP, PieceIx)] - , _bucket :: Bucket - } - -makeLenses ''Status - -instance Default Status where - def = error "default status" - --- | Create a new scheduler for infodict of the given size. -nullStatus :: Int -> Status -nullStatus ps = Status [] (Block.empty ps) - -type Updates = ReaderT (PeerAddr IP) (State Status) - -runUpdates :: MVar Status -> PeerAddr IP -> Updates a -> IO a -runUpdates v a m = modifyMVar v (return . swap . runState (runReaderT m a)) - -scheduleBlock :: Updates (Maybe PieceIx) -scheduleBlock = do - addr <- ask - bkt <- use bucket - case spans metadataPieceSize bkt of - [] -> return Nothing - ((off, _ ) : _) -> do - let pix = off `div` metadataPieceSize - pending %= ((addr, pix) :) - return (Just pix) - -cancelPending :: PieceIx -> Updates () -cancelPending pix = pending %= L.filter ((pix ==) . snd) - -resetPending :: Updates () -resetPending = do - addr <- ask - pending %= L.filter ((addr ==) . fst) - -parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict -parseInfoDict chunk topic = - case BE.decode chunk of - Right (infodict @ InfoDict {..}) - | topic == idInfoHash -> return infodict - | otherwise -> Left "broken infodict" - Left err -> Left $ "unable to parse infodict " ++ err - --- todo use incremental parsing to avoid BS.concat call -pushBlock :: Torrent.Piece BS.ByteString -> InfoHash -> Updates (Maybe InfoDict) -pushBlock Torrent.Piece {..} topic = do - addr <- ask - p <- use pending - when ((addr, pieceIndex) `L.notElem` p) $ error "not requested" - cancelPending pieceIndex - - bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData - b <- use bucket - case toPiece b of - Nothing -> return Nothing - Just chunks -> - case parseInfoDict (BL.toStrict chunks) topic of - Right x -> do - pending .= [] - return (Just x) - Left e -> do - pending .= [] - bucket .= Block.empty (Block.size b) - return Nothing -- cgit v1.2.3