diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-04-17 15:27:43 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-04-17 15:27:43 +0400 |
commit | cb75f50f4cae778d1dfc57edff771a5145dd9894 (patch) | |
tree | 0d8133e881866a10138c35eac058cbb09f0148b5 /src | |
parent | 6bd297aee69a74d19f81f240d170d9bca81c96cf (diff) |
[Exchange] Move all download stuff to single module
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Assembler.hs | 168 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Download.hs | 376 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 17 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session/Metadata.hs | 102 |
4 files changed, 257 insertions, 406 deletions
diff --git a/src/Network/BitTorrent/Exchange/Assembler.hs b/src/Network/BitTorrent/Exchange/Assembler.hs deleted file mode 100644 index 7abb8ab0..00000000 --- a/src/Network/BitTorrent/Exchange/Assembler.hs +++ /dev/null | |||
@@ -1,168 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Assembler is used to build pieces from blocks. In general | ||
9 | -- 'Assembler' should be used to handle 'Transfer' messages when | ||
10 | -- | ||
11 | -- A block can have one of the following status: | ||
12 | -- | ||
13 | -- 1) /not allowed/: Piece is not in download set. 'null' and 'empty'. | ||
14 | -- | ||
15 | -- | ||
16 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
17 | -- but /this/ peer did not send any 'Request' message for this | ||
18 | -- block. To allow some piece use | ||
19 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
20 | -- and 'allowPiece'. | ||
21 | -- | ||
22 | -- 3) /inflight/: (pending?) Block have been requested but | ||
23 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
24 | -- Related functions 'markInflight' | ||
25 | -- | ||
26 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
27 | -- Related functions 'insertBlock'. | ||
28 | -- | ||
29 | -- Piece status: | ||
30 | -- | ||
31 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
32 | -- downloaded but the piece did not verified yet. | ||
33 | -- | ||
34 | -- * Valid: go to completed; | ||
35 | -- | ||
36 | -- * Invalid: go to waiting. | ||
37 | -- | ||
38 | -- 2) /corrupted/: | ||
39 | -- | ||
40 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
41 | -- verified via the hash. Usually the piece should be stored to | ||
42 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
43 | -- messages to the /remote/ peers. | ||
44 | -- | ||
45 | {-# LANGUAGE TemplateHaskell #-} | ||
46 | module Network.BitTorrent.Exchange.Assembler | ||
47 | ( -- * Assembler | ||
48 | Assembler | ||
49 | |||
50 | -- * Query | ||
51 | , Network.BitTorrent.Exchange.Assembler.null | ||
52 | , Network.BitTorrent.Exchange.Assembler.size | ||
53 | |||
54 | -- * | ||
55 | , Network.BitTorrent.Exchange.Assembler.empty | ||
56 | , allowPiece | ||
57 | |||
58 | -- * Debugging | ||
59 | , Network.BitTorrent.Exchange.Assembler.valid | ||
60 | ) where | ||
61 | |||
62 | import Control.Applicative | ||
63 | import Control.Lens | ||
64 | import Data.IntMap.Strict as IM | ||
65 | import Data.List as L | ||
66 | import Data.Map as M | ||
67 | import Data.Maybe | ||
68 | import Data.IP | ||
69 | |||
70 | import Data.Torrent | ||
71 | import Network.BitTorrent.Address | ||
72 | import Network.BitTorrent.Exchange.Block as B | ||
73 | |||
74 | {----------------------------------------------------------------------- | ||
75 | -- Assembler | ||
76 | -----------------------------------------------------------------------} | ||
77 | |||
78 | type Timestamp = () | ||
79 | {- | ||
80 | data BlockRequest = BlockRequest | ||
81 | { requestSent :: Timestamp | ||
82 | , requestedPeer :: PeerAddr IP | ||
83 | , requestedBlock :: BlockIx | ||
84 | } | ||
85 | -} | ||
86 | type BlockRange = (BlockOffset, BlockSize) | ||
87 | type PieceMap = IntMap | ||
88 | |||
89 | data Assembler = Assembler | ||
90 | { -- | A set of blocks that have been 'Request'ed but not yet acked. | ||
91 | _inflight :: Map (PeerAddr IP) (PieceMap [BlockRange]) | ||
92 | |||
93 | -- | A set of blocks that but not yet assembled. | ||
94 | , _pending :: PieceMap Bucket | ||
95 | |||
96 | -- | Used for validation of assembled pieces. | ||
97 | , info :: PieceInfo | ||
98 | } | ||
99 | |||
100 | $(makeLenses ''Assembler) | ||
101 | |||
102 | |||
103 | valid :: Assembler -> Bool | ||
104 | valid = undefined | ||
105 | |||
106 | data Result a | ||
107 | = Completed (Piece a) | ||
108 | | Corrupted PieceIx | ||
109 | | NotRequested PieceIx | ||
110 | | Overlapped BlockIx | ||
111 | |||
112 | null :: Assembler -> Bool | ||
113 | null = undefined | ||
114 | |||
115 | size :: Assembler -> Bool | ||
116 | size = undefined | ||
117 | |||
118 | empty :: PieceInfo -> Assembler | ||
119 | empty = Assembler M.empty IM.empty | ||
120 | |||
121 | allowPiece :: PieceIx -> Assembler -> Assembler | ||
122 | allowPiece pix a @ Assembler {..} = over pending (IM.insert pix bkt) a | ||
123 | where | ||
124 | bkt = B.empty (piPieceLength info) | ||
125 | |||
126 | allowedSet :: (PeerAddr IP) -> Assembler -> [BlockIx] | ||
127 | allowedSet = undefined | ||
128 | |||
129 | --inflight :: PeerAddr -> BlockIx -> Assembler -> Assembler | ||
130 | --inflight = undefined | ||
131 | |||
132 | -- You should check if a returned by peer block is actually have | ||
133 | -- been requested and in-flight. This is needed to avoid "I send | ||
134 | -- random corrupted block" attacks. | ||
135 | insert :: PeerAddr IP -> Block a -> Assembler -> Assembler | ||
136 | insert = undefined | ||
137 | |||
138 | {- | ||
139 | insert :: Block a -> Assembler a -> (Assembler a, Maybe (Result a)) | ||
140 | insert blk @ Block {..} a @ Assembler {..} = undefined | ||
141 | {- | ||
142 | = let (pending, mpiece) = inserta blk piecePending | ||
143 | in (Assembler inflightSet pending pieceInfo, f <$> mpiece) | ||
144 | where | ||
145 | f p = undefined | ||
146 | -- | checkPieceLazy pieceInfo p = Assembled p | ||
147 | -- | otherwise = Corrupted ixPiece | ||
148 | -} | ||
149 | |||
150 | |||
151 | inflightPieces :: Assembler a -> [PieceIx] | ||
152 | inflightPieces Assembler {..} = IM.keys piecePending | ||
153 | |||
154 | completeBlocks :: PieceIx -> Assembler a -> [Block a] | ||
155 | completeBlocks pix Assembler {..} = fromMaybe [] $ IM.lookup pix piecePending | ||
156 | |||
157 | incompleteBlocks :: PieceIx -> Assembler a -> [BlockIx] | ||
158 | incompleteBlocks = undefined | ||
159 | |||
160 | nextBlock :: Assembler a -> Maybe (Assembler a, BlockIx) | ||
161 | nextBlock Assembler {..} = undefined | ||
162 | |||
163 | inserta :: Block a | ||
164 | -> PieceMap [Block a] | ||
165 | -> (PieceMap [Block a], Maybe (Piece a)) | ||
166 | inserta = undefined | ||
167 | |||
168 | -} | ||
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs index fcc94485..9a6b5f91 100644 --- a/src/Network/BitTorrent/Exchange/Download.hs +++ b/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -1,44 +1,196 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- | ||
9 | -- | ||
10 | {-# LANGUAGE FlexibleInstances #-} | ||
11 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
12 | {-# LANGUAGE FunctionalDependencies #-} | ||
13 | {-# LANGUAGE TemplateHaskell #-} | ||
1 | module Network.BitTorrent.Exchange.Download | 14 | module Network.BitTorrent.Exchange.Download |
2 | ( -- * Environment | 15 | ( -- * Downloading |
3 | StatusUpdates | 16 | Download (..) |
4 | , runStatusUpdates | 17 | , Updates |
5 | 18 | , runDownloadUpdates | |
6 | -- * Status | 19 | |
7 | , SessionStatus | 20 | -- ** Metadata |
8 | , sessionStatus | 21 | -- $metadata-download |
9 | 22 | , MetadataDownload | |
10 | -- * Query | 23 | , metadataDownload |
11 | , getBitfield | 24 | |
12 | , getRequestQueueLength | 25 | -- ** Content |
13 | 26 | -- $content-download | |
14 | -- * Control | 27 | , ContentDownload |
15 | , scheduleBlocks | 28 | , contentDownload |
16 | , resetPending | ||
17 | , pushBlock | ||
18 | ) where | 29 | ) where |
19 | 30 | ||
20 | import Control.Applicative | 31 | import Control.Applicative |
21 | import Control.Concurrent | 32 | import Control.Concurrent |
33 | import Control.Lens | ||
22 | import Control.Monad.State | 34 | import Control.Monad.State |
35 | import Data.BEncode as BE | ||
36 | import Data.ByteString as BS | ||
23 | import Data.ByteString.Lazy as BL | 37 | import Data.ByteString.Lazy as BL |
24 | import Data.Default | 38 | import Data.Default |
25 | import Data.List as L | 39 | import Data.List as L |
26 | import Data.Maybe | 40 | import Data.Maybe |
27 | import Data.Map as M | 41 | import Data.Map as M |
28 | import Data.Set as S | ||
29 | import Data.Tuple | 42 | import Data.Tuple |
30 | 43 | ||
31 | import Data.Torrent | 44 | import Data.Torrent as Torrent |
32 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
33 | import Network.BitTorrent.Address | 45 | import Network.BitTorrent.Address |
34 | import Network.BitTorrent.Exchange.Block as Block | 46 | import Network.BitTorrent.Exchange.Bitfield as BF |
47 | import Network.BitTorrent.Exchange.Block as Block | ||
48 | import Network.BitTorrent.Exchange.Message as Msg | ||
35 | import System.Torrent.Storage (Storage, writePiece) | 49 | import System.Torrent.Storage (Storage, writePiece) |
36 | 50 | ||
37 | 51 | ||
38 | {----------------------------------------------------------------------- | 52 | {----------------------------------------------------------------------- |
39 | -- Piece entry | 53 | -- Class |
40 | -----------------------------------------------------------------------} | 54 | -----------------------------------------------------------------------} |
41 | 55 | ||
56 | type Updates s a = StateT s IO a | ||
57 | |||
58 | runDownloadUpdates :: MVar s -> Updates s a -> IO a | ||
59 | runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
60 | |||
61 | class Download s chunk | s -> chunk where | ||
62 | scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] | ||
63 | |||
64 | -- | | ||
65 | scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) | ||
66 | scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf | ||
67 | |||
68 | -- | Get number of sent requests to this peer. | ||
69 | getRequestQueueLength :: PeerAddr IP -> Updates s Int | ||
70 | |||
71 | -- | Remove all pending block requests to the remote peer. May be used | ||
72 | -- when: | ||
73 | -- | ||
74 | -- * a peer closes connection; | ||
75 | -- | ||
76 | -- * remote peer choked this peer; | ||
77 | -- | ||
78 | -- * timeout expired. | ||
79 | -- | ||
80 | resetPending :: PeerAddr IP -> Updates s () | ||
81 | |||
82 | -- | MAY write to storage, if a new piece have been completed. | ||
83 | -- | ||
84 | -- You should check if a returned by peer block is actually have | ||
85 | -- been requested and in-flight. This is needed to avoid "I send | ||
86 | -- random corrupted block" attacks. | ||
87 | pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) | ||
88 | |||
89 | {----------------------------------------------------------------------- | ||
90 | -- Metadata download | ||
91 | -----------------------------------------------------------------------} | ||
92 | -- $metadata-download | ||
93 | -- TODO | ||
94 | |||
95 | data MetadataDownload = MetadataDownload | ||
96 | { _pendingPieces :: [(PeerAddr IP, PieceIx)] | ||
97 | , _bucket :: Bucket | ||
98 | , _topic :: InfoHash | ||
99 | } | ||
100 | |||
101 | makeLenses ''MetadataDownload | ||
102 | |||
103 | -- | Create a new scheduler for infodict of the given size. | ||
104 | metadataDownload :: Int -> InfoHash -> MetadataDownload | ||
105 | metadataDownload ps = MetadataDownload [] (Block.empty ps) | ||
106 | |||
107 | instance Default MetadataDownload where | ||
108 | def = error "instance Default MetadataDownload" | ||
109 | |||
110 | --cancelPending :: PieceIx -> Updates () | ||
111 | cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) | ||
112 | |||
113 | instance Download MetadataDownload (Piece BS.ByteString) where | ||
114 | scheduleBlock addr bf = do | ||
115 | bkt <- use bucket | ||
116 | case spans metadataPieceSize bkt of | ||
117 | [] -> return Nothing | ||
118 | ((off, _ ) : _) -> do | ||
119 | let pix = off `div` metadataPieceSize | ||
120 | pendingPieces %= ((addr, pix) :) | ||
121 | return (Just (BlockIx pix 0 metadataPieceSize)) | ||
122 | |||
123 | resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) | ||
124 | |||
125 | pushBlock addr Torrent.Piece {..} = do | ||
126 | p <- use pendingPieces | ||
127 | when ((addr, pieceIndex) `L.notElem` p) $ | ||
128 | error "not requested" | ||
129 | cancelPending pieceIndex | ||
130 | |||
131 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
132 | b <- use bucket | ||
133 | case toPiece b of | ||
134 | Nothing -> return Nothing | ||
135 | Just chunks -> do | ||
136 | t <- use topic | ||
137 | case parseInfoDict (BL.toStrict chunks) t of | ||
138 | Right x -> do | ||
139 | pendingPieces .= [] | ||
140 | return undefined -- (Just x) | ||
141 | Left e -> do | ||
142 | pendingPieces .= [] | ||
143 | bucket .= Block.empty (Block.size b) | ||
144 | return undefined -- Nothing | ||
145 | where | ||
146 | -- todo use incremental parsing to avoid BS.concat call | ||
147 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
148 | parseInfoDict chunk topic = | ||
149 | case BE.decode chunk of | ||
150 | Right (infodict @ InfoDict {..}) | ||
151 | | topic == idInfoHash -> return infodict | ||
152 | | otherwise -> Left "broken infodict" | ||
153 | Left err -> Left $ "unable to parse infodict " ++ err | ||
154 | |||
155 | {----------------------------------------------------------------------- | ||
156 | -- Content download | ||
157 | -----------------------------------------------------------------------} | ||
158 | -- $content-download | ||
159 | -- | ||
160 | -- A block can have one of the following status: | ||
161 | -- | ||
162 | -- 1) /not allowed/: Piece is not in download set. | ||
163 | -- | ||
164 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
165 | -- but /this/ peer did not send any 'Request' message for this | ||
166 | -- block. To allow some piece use | ||
167 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
168 | -- and 'allowPiece'. | ||
169 | -- | ||
170 | -- 3) /inflight/: (pending?) Block have been requested but | ||
171 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
172 | -- Related functions 'markInflight' | ||
173 | -- | ||
174 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
175 | -- Related functions 'insertBlock'. | ||
176 | -- | ||
177 | -- Piece status: | ||
178 | -- | ||
179 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
180 | -- downloaded but the piece did not verified yet. | ||
181 | -- | ||
182 | -- * Valid: go to completed; | ||
183 | -- | ||
184 | -- * Invalid: go to waiting. | ||
185 | -- | ||
186 | -- 2) /corrupted/: | ||
187 | -- | ||
188 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
189 | -- verified via the hash. Usually the piece should be stored to | ||
190 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
191 | -- messages to the /remote/ peers. | ||
192 | -- | ||
193 | |||
42 | data PieceEntry = PieceEntry | 194 | data PieceEntry = PieceEntry |
43 | { pending :: [(PeerAddr IP, BlockIx)] | 195 | { pending :: [(PeerAddr IP, BlockIx)] |
44 | , stalled :: Bucket | 196 | , stalled :: Bucket |
@@ -50,44 +202,23 @@ pieceEntry s = PieceEntry [] (Block.empty s) | |||
50 | isEmpty :: PieceEntry -> Bool | 202 | isEmpty :: PieceEntry -> Bool |
51 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | 203 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled |
52 | 204 | ||
53 | holes :: PieceIx -> PieceEntry -> [BlockIx] | 205 | _holes :: PieceIx -> PieceEntry -> [BlockIx] |
54 | holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | 206 | _holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) |
55 | where | 207 | where |
56 | mkBlockIx (off, sz) = BlockIx pix off sz | 208 | mkBlockIx (off, sz) = BlockIx pix off sz |
57 | 209 | ||
58 | {----------------------------------------------------------------------- | 210 | data ContentDownload = ContentDownload |
59 | -- Session status | 211 | { inprogress :: !(Map PieceIx PieceEntry) |
60 | -----------------------------------------------------------------------} | 212 | , bitfield :: !Bitfield |
61 | 213 | , pieceSize :: !PieceSize | |
62 | data SessionStatus = SessionStatus | 214 | , contentStorage :: Storage |
63 | { inprogress :: !(Map PieceIx PieceEntry) | ||
64 | , bitfield :: !Bitfield | ||
65 | , pieceSize :: !PieceSize | ||
66 | } | ||
67 | |||
68 | sessionStatus :: Bitfield -> PieceSize -> SessionStatus | ||
69 | sessionStatus bf ps = SessionStatus | ||
70 | { inprogress = M.empty | ||
71 | , bitfield = bf | ||
72 | , pieceSize = ps | ||
73 | } | 215 | } |
74 | 216 | ||
75 | type StatusUpdates a = StateT SessionStatus IO a | 217 | contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload |
76 | 218 | contentDownload = ContentDownload M.empty | |
77 | -- | | ||
78 | runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a | ||
79 | runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
80 | |||
81 | getBitfield :: MVar SessionStatus -> IO Bitfield | ||
82 | getBitfield var = bitfield <$> readMVar var | ||
83 | 219 | ||
84 | getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int | 220 | --modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () |
85 | getRequestQueueLength addr = do | 221 | modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s |
86 | m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
87 | return $ L.sum $ L.map L.length m | ||
88 | |||
89 | modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates () | ||
90 | modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | ||
91 | { inprogress = alter (g pieceSize) pix inprogress } | 222 | { inprogress = alter (g pieceSize) pix inprogress } |
92 | where | 223 | where |
93 | g s = h . f . fromMaybe (pieceEntry s) | 224 | g s = h . f . fromMaybe (pieceEntry s) |
@@ -95,81 +226,70 @@ modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | |||
95 | | isEmpty e = Nothing | 226 | | isEmpty e = Nothing |
96 | | otherwise = Just e | 227 | | otherwise = Just e |
97 | 228 | ||
98 | {----------------------------------------------------------------------- | 229 | instance Download ContentDownload (Block BL.ByteString) where |
99 | -- Piece download | 230 | scheduleBlocks n addr maskBF = do |
100 | -----------------------------------------------------------------------} | 231 | ContentDownload {..} <- get |
232 | let wantPieces = maskBF `BF.difference` bitfield | ||
233 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ | ||
234 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) | ||
235 | inprogress | ||
101 | 236 | ||
102 | -- TODO choose block nearest to pending or stalled sets to reduce disk | 237 | bixs <- if L.null wantBlocks |
103 | -- seeks on remote machines | 238 | then do |
104 | chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] | 239 | mpix <- choosePiece wantPieces |
105 | chooseBlocks xs n = return (L.take n xs) | 240 | case mpix of -- TODO return 'n' blocks |
106 | 241 | Nothing -> return [] | |
107 | -- TODO use selection strategies from Exchange.Selector | 242 | Just pix -> return [leadingBlock pix defaultTransferSize] |
108 | choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) | 243 | else chooseBlocks wantBlocks n |
109 | choosePiece bf | 244 | |
110 | | BF.null bf = return $ Nothing | 245 | forM_ bixs $ \ bix -> do |
111 | | otherwise = return $ Just $ BF.findMin bf | 246 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e |
112 | 247 | { pending = (addr, bix) : pending } | |
113 | scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] | 248 | |
114 | scheduleBlocks addr maskBF n = do | 249 | return bixs |
115 | SessionStatus {..} <- get | 250 | where |
116 | let wantPieces = maskBF `BF.difference` bitfield | 251 | -- TODO choose block nearest to pending or stalled sets to reduce disk |
117 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ | 252 | -- seeks on remote machines |
118 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress | 253 | --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] |
119 | 254 | chooseBlocks xs n = return (L.take n xs) | |
120 | bixs <- if L.null wantBlocks | 255 | |
121 | then do | 256 | -- TODO use selection strategies from Exchange.Selector |
122 | mpix <- choosePiece wantPieces | 257 | --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) |
123 | case mpix of -- TODO return 'n' blocks | 258 | choosePiece bf |
124 | Nothing -> return [] | 259 | | BF.null bf = return $ Nothing |
125 | Just pix -> return [leadingBlock pix defaultTransferSize] | 260 | | otherwise = return $ Just $ BF.findMin bf |
126 | else chooseBlocks wantBlocks n | 261 | |
127 | 262 | getRequestQueueLength addr = do | |
128 | forM_ bixs $ \ bix -> do | 263 | m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) |
129 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | 264 | return $ L.sum $ L.map L.length $ M.elems m |
130 | { pending = (addr, bix) : pending } | 265 | |
131 | 266 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | |
132 | return bixs | 267 | where |
133 | 268 | reset = fmap $ \ e -> e | |
134 | |||
135 | -- | Remove all pending block requests to the remote peer. May be used | ||
136 | -- when: | ||
137 | -- | ||
138 | -- * a peer closes connection; | ||
139 | -- | ||
140 | -- * remote peer choked this peer; | ||
141 | -- | ||
142 | -- * timeout expired. | ||
143 | -- | ||
144 | resetPending :: PeerAddr IP -> StatusUpdates () | ||
145 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
146 | where | ||
147 | reset = fmap $ \ e -> e | ||
148 | { pending = L.filter (not . (==) addr . fst) (pending e) } | 269 | { pending = L.filter (not . (==) addr . fst) (pending e) } |
149 | 270 | ||
150 | -- | MAY write to storage, if a new piece have been completed. | 271 | pushBlock addr blk @ Block {..} = do |
151 | pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) | 272 | mpe <- gets (M.lookup blkPiece . inprogress) |
152 | pushBlock blk @ Block {..} storage = do | 273 | case mpe of |
153 | mpe <- gets (M.lookup blkPiece . inprogress) | 274 | Nothing -> return Nothing |
154 | case mpe of | 275 | Just (pe @ PieceEntry {..}) |
155 | Nothing -> return Nothing | 276 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing |
156 | Just (pe @ PieceEntry {..}) | 277 | | otherwise -> do |
157 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | 278 | let bkt' = Block.insertLazy blkOffset blkData stalled |
158 | | otherwise -> do | 279 | case toPiece bkt' of |
159 | let bkt' = Block.insertLazy blkOffset blkData stalled | 280 | Nothing -> do |
160 | case toPiece bkt' of | 281 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e |
161 | Nothing -> do | 282 | { pending = L.filter ((==) (blockIx blk) . snd) pending |
162 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | 283 | , stalled = bkt' |
163 | { pending = L.filter ((==) (blockIx blk) . snd) pending | 284 | } |
164 | , stalled = bkt' | 285 | return (Just False) |
165 | } | 286 | |
166 | return (Just False) | 287 | Just pieceData -> do |
167 | 288 | -- TODO verify | |
168 | Just pieceData -> do | 289 | storage <- gets contentStorage |
169 | -- TODO verify | 290 | liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage |
170 | liftIO $ writePiece (Piece blkPiece pieceData) storage | 291 | modify $ \ s @ ContentDownload {..} -> s |
171 | modify $ \ s @ SessionStatus {..} -> s | 292 | { inprogress = M.delete blkPiece inprogress |
172 | { inprogress = M.delete blkPiece inprogress | 293 | , bitfield = BF.insert blkPiece bitfield |
173 | , bitfield = BF.insert blkPiece bitfield | 294 | } |
174 | } | 295 | return (Just True) |
175 | return (Just True) | ||
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 49bff44f..30b7ed0e 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -51,9 +51,8 @@ import Network.BitTorrent.Address | |||
51 | import Network.BitTorrent.Exchange.Bitfield as BF | 51 | import Network.BitTorrent.Exchange.Bitfield as BF |
52 | import Network.BitTorrent.Exchange.Block as Block | 52 | import Network.BitTorrent.Exchange.Block as Block |
53 | import Network.BitTorrent.Exchange.Connection | 53 | import Network.BitTorrent.Exchange.Connection |
54 | import Network.BitTorrent.Exchange.Download as SS | 54 | import Network.BitTorrent.Exchange.Download as D |
55 | import Network.BitTorrent.Exchange.Message as Message | 55 | import Network.BitTorrent.Exchange.Message as Message |
56 | import Network.BitTorrent.Exchange.Session.Metadata as Metadata | ||
57 | import System.Torrent.Storage | 56 | import System.Torrent.Storage |
58 | 57 | ||
59 | {----------------------------------------------------------------------- | 58 | {----------------------------------------------------------------------- |
@@ -90,13 +89,13 @@ type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | |||
90 | 89 | ||
91 | data SessionState | 90 | data SessionState |
92 | = WaitingMetadata | 91 | = WaitingMetadata |
93 | { metadataDownload :: MVar Metadata.Status | 92 | { metadataDownload :: MVar MetadataDownload |
94 | , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters | 93 | , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters |
95 | , contentRootPath :: FilePath | 94 | , contentRootPath :: FilePath |
96 | } | 95 | } |
97 | | HavingMetadata | 96 | | HavingMetadata |
98 | { metadataCache :: Cached InfoDict | 97 | { metadataCache :: Cached InfoDict |
99 | , contentDownload :: MVar SessionStatus | 98 | , contentDownload :: MVar ContentDownload |
100 | , contentStorage :: Storage | 99 | , contentStorage :: Storage |
101 | } | 100 | } |
102 | 101 | ||
@@ -105,8 +104,9 @@ newSessionState rootPath (Left ih ) = do | |||
105 | WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath | 104 | WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath |
106 | newSessionState rootPath (Right dict) = do | 105 | newSessionState rootPath (Right dict) = do |
107 | storage <- openInfoDict ReadWriteEx rootPath dict | 106 | storage <- openInfoDict ReadWriteEx rootPath dict |
108 | download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) | 107 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) |
109 | (piPieceLength (idPieceInfo dict)) | 108 | (piPieceLength (idPieceInfo dict)) |
109 | storage | ||
110 | return $ HavingMetadata (cache dict) download storage | 110 | return $ HavingMetadata (cache dict) download storage |
111 | 111 | ||
112 | closeSessionState :: SessionState -> IO () | 112 | closeSessionState :: SessionState -> IO () |
@@ -116,8 +116,9 @@ closeSessionState HavingMetadata {..} = close contentStorage | |||
116 | haveMetadata :: InfoDict -> SessionState -> IO SessionState | 116 | haveMetadata :: InfoDict -> SessionState -> IO SessionState |
117 | haveMetadata dict WaitingMetadata {..} = do | 117 | haveMetadata dict WaitingMetadata {..} = do |
118 | storage <- openInfoDict ReadWriteEx contentRootPath dict | 118 | storage <- openInfoDict ReadWriteEx contentRootPath dict |
119 | download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) | 119 | download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage)) |
120 | (piPieceLength (idPieceInfo dict)) | 120 | (piPieceLength (idPieceInfo dict)) |
121 | storage | ||
121 | return HavingMetadata | 122 | return HavingMetadata |
122 | { metadataCache = cache dict | 123 | { metadataCache = cache dict |
123 | , contentDownload = download | 124 | , contentDownload = download |
diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs deleted file mode 100644 index f08ebe00..00000000 --- a/src/Network/BitTorrent/Exchange/Session/Metadata.hs +++ /dev/null | |||
@@ -1,102 +0,0 @@ | |||
1 | {-# LANGUAGE TemplateHaskell #-} | ||
2 | module Network.BitTorrent.Exchange.Session.Metadata | ||
3 | ( -- * Transfer state | ||
4 | Status | ||
5 | , nullStatus | ||
6 | |||
7 | -- * State updates | ||
8 | , Updates | ||
9 | , runUpdates | ||
10 | |||
11 | -- * Piece transfer control | ||
12 | , scheduleBlock | ||
13 | , resetPending | ||
14 | , cancelPending | ||
15 | , pushBlock | ||
16 | ) where | ||
17 | |||
18 | import Control.Concurrent | ||
19 | import Control.Lens | ||
20 | import Control.Monad.Reader | ||
21 | import Control.Monad.State | ||
22 | import Data.ByteString as BS | ||
23 | import Data.ByteString.Lazy as BL | ||
24 | import Data.Default | ||
25 | import Data.List as L | ||
26 | import Data.Tuple | ||
27 | |||
28 | import Data.BEncode as BE | ||
29 | import Data.Torrent as Torrent | ||
30 | import Network.BitTorrent.Address | ||
31 | import Network.BitTorrent.Exchange.Block as Block | ||
32 | import Network.BitTorrent.Exchange.Message as Message hiding (Status) | ||
33 | |||
34 | |||
35 | -- | Current transfer status. | ||
36 | data Status = Status | ||
37 | { _pending :: [(PeerAddr IP, PieceIx)] | ||
38 | , _bucket :: Bucket | ||
39 | } | ||
40 | |||
41 | makeLenses ''Status | ||
42 | |||
43 | instance Default Status where | ||
44 | def = error "default status" | ||
45 | |||
46 | -- | Create a new scheduler for infodict of the given size. | ||
47 | nullStatus :: Int -> Status | ||
48 | nullStatus ps = Status [] (Block.empty ps) | ||
49 | |||
50 | type Updates = ReaderT (PeerAddr IP) (State Status) | ||
51 | |||
52 | runUpdates :: MVar Status -> PeerAddr IP -> Updates a -> IO a | ||
53 | runUpdates v a m = modifyMVar v (return . swap . runState (runReaderT m a)) | ||
54 | |||
55 | scheduleBlock :: Updates (Maybe PieceIx) | ||
56 | scheduleBlock = do | ||
57 | addr <- ask | ||
58 | bkt <- use bucket | ||
59 | case spans metadataPieceSize bkt of | ||
60 | [] -> return Nothing | ||
61 | ((off, _ ) : _) -> do | ||
62 | let pix = off `div` metadataPieceSize | ||
63 | pending %= ((addr, pix) :) | ||
64 | return (Just pix) | ||
65 | |||
66 | cancelPending :: PieceIx -> Updates () | ||
67 | cancelPending pix = pending %= L.filter ((pix ==) . snd) | ||
68 | |||
69 | resetPending :: Updates () | ||
70 | resetPending = do | ||
71 | addr <- ask | ||
72 | pending %= L.filter ((addr ==) . fst) | ||
73 | |||
74 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
75 | parseInfoDict chunk topic = | ||
76 | case BE.decode chunk of | ||
77 | Right (infodict @ InfoDict {..}) | ||
78 | | topic == idInfoHash -> return infodict | ||
79 | | otherwise -> Left "broken infodict" | ||
80 | Left err -> Left $ "unable to parse infodict " ++ err | ||
81 | |||
82 | -- todo use incremental parsing to avoid BS.concat call | ||
83 | pushBlock :: Torrent.Piece BS.ByteString -> InfoHash -> Updates (Maybe InfoDict) | ||
84 | pushBlock Torrent.Piece {..} topic = do | ||
85 | addr <- ask | ||
86 | p <- use pending | ||
87 | when ((addr, pieceIndex) `L.notElem` p) $ error "not requested" | ||
88 | cancelPending pieceIndex | ||
89 | |||
90 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
91 | b <- use bucket | ||
92 | case toPiece b of | ||
93 | Nothing -> return Nothing | ||
94 | Just chunks -> | ||
95 | case parseInfoDict (BL.toStrict chunks) topic of | ||
96 | Right x -> do | ||
97 | pending .= [] | ||
98 | return (Just x) | ||
99 | Left e -> do | ||
100 | pending .= [] | ||
101 | bucket .= Block.empty (Block.size b) | ||
102 | return Nothing | ||