diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Download.hs | 376 |
1 files changed, 248 insertions, 128 deletions
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs index fcc94485..9a6b5f91 100644 --- a/src/Network/BitTorrent/Exchange/Download.hs +++ b/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -1,44 +1,196 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- | ||
9 | -- | ||
10 | {-# LANGUAGE FlexibleInstances #-} | ||
11 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
12 | {-# LANGUAGE FunctionalDependencies #-} | ||
13 | {-# LANGUAGE TemplateHaskell #-} | ||
1 | module Network.BitTorrent.Exchange.Download | 14 | module Network.BitTorrent.Exchange.Download |
2 | ( -- * Environment | 15 | ( -- * Downloading |
3 | StatusUpdates | 16 | Download (..) |
4 | , runStatusUpdates | 17 | , Updates |
5 | 18 | , runDownloadUpdates | |
6 | -- * Status | 19 | |
7 | , SessionStatus | 20 | -- ** Metadata |
8 | , sessionStatus | 21 | -- $metadata-download |
9 | 22 | , MetadataDownload | |
10 | -- * Query | 23 | , metadataDownload |
11 | , getBitfield | 24 | |
12 | , getRequestQueueLength | 25 | -- ** Content |
13 | 26 | -- $content-download | |
14 | -- * Control | 27 | , ContentDownload |
15 | , scheduleBlocks | 28 | , contentDownload |
16 | , resetPending | ||
17 | , pushBlock | ||
18 | ) where | 29 | ) where |
19 | 30 | ||
20 | import Control.Applicative | 31 | import Control.Applicative |
21 | import Control.Concurrent | 32 | import Control.Concurrent |
33 | import Control.Lens | ||
22 | import Control.Monad.State | 34 | import Control.Monad.State |
35 | import Data.BEncode as BE | ||
36 | import Data.ByteString as BS | ||
23 | import Data.ByteString.Lazy as BL | 37 | import Data.ByteString.Lazy as BL |
24 | import Data.Default | 38 | import Data.Default |
25 | import Data.List as L | 39 | import Data.List as L |
26 | import Data.Maybe | 40 | import Data.Maybe |
27 | import Data.Map as M | 41 | import Data.Map as M |
28 | import Data.Set as S | ||
29 | import Data.Tuple | 42 | import Data.Tuple |
30 | 43 | ||
31 | import Data.Torrent | 44 | import Data.Torrent as Torrent |
32 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
33 | import Network.BitTorrent.Address | 45 | import Network.BitTorrent.Address |
34 | import Network.BitTorrent.Exchange.Block as Block | 46 | import Network.BitTorrent.Exchange.Bitfield as BF |
47 | import Network.BitTorrent.Exchange.Block as Block | ||
48 | import Network.BitTorrent.Exchange.Message as Msg | ||
35 | import System.Torrent.Storage (Storage, writePiece) | 49 | import System.Torrent.Storage (Storage, writePiece) |
36 | 50 | ||
37 | 51 | ||
38 | {----------------------------------------------------------------------- | 52 | {----------------------------------------------------------------------- |
39 | -- Piece entry | 53 | -- Class |
40 | -----------------------------------------------------------------------} | 54 | -----------------------------------------------------------------------} |
41 | 55 | ||
56 | type Updates s a = StateT s IO a | ||
57 | |||
58 | runDownloadUpdates :: MVar s -> Updates s a -> IO a | ||
59 | runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
60 | |||
61 | class Download s chunk | s -> chunk where | ||
62 | scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] | ||
63 | |||
64 | -- | | ||
65 | scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) | ||
66 | scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf | ||
67 | |||
68 | -- | Get number of sent requests to this peer. | ||
69 | getRequestQueueLength :: PeerAddr IP -> Updates s Int | ||
70 | |||
71 | -- | Remove all pending block requests to the remote peer. May be used | ||
72 | -- when: | ||
73 | -- | ||
74 | -- * a peer closes connection; | ||
75 | -- | ||
76 | -- * remote peer choked this peer; | ||
77 | -- | ||
78 | -- * timeout expired. | ||
79 | -- | ||
80 | resetPending :: PeerAddr IP -> Updates s () | ||
81 | |||
82 | -- | MAY write to storage, if a new piece have been completed. | ||
83 | -- | ||
84 | -- You should check if a returned by peer block is actually have | ||
85 | -- been requested and in-flight. This is needed to avoid "I send | ||
86 | -- random corrupted block" attacks. | ||
87 | pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) | ||
88 | |||
89 | {----------------------------------------------------------------------- | ||
90 | -- Metadata download | ||
91 | -----------------------------------------------------------------------} | ||
92 | -- $metadata-download | ||
93 | -- TODO | ||
94 | |||
95 | data MetadataDownload = MetadataDownload | ||
96 | { _pendingPieces :: [(PeerAddr IP, PieceIx)] | ||
97 | , _bucket :: Bucket | ||
98 | , _topic :: InfoHash | ||
99 | } | ||
100 | |||
101 | makeLenses ''MetadataDownload | ||
102 | |||
103 | -- | Create a new scheduler for infodict of the given size. | ||
104 | metadataDownload :: Int -> InfoHash -> MetadataDownload | ||
105 | metadataDownload ps = MetadataDownload [] (Block.empty ps) | ||
106 | |||
107 | instance Default MetadataDownload where | ||
108 | def = error "instance Default MetadataDownload" | ||
109 | |||
110 | --cancelPending :: PieceIx -> Updates () | ||
111 | cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) | ||
112 | |||
113 | instance Download MetadataDownload (Piece BS.ByteString) where | ||
114 | scheduleBlock addr bf = do | ||
115 | bkt <- use bucket | ||
116 | case spans metadataPieceSize bkt of | ||
117 | [] -> return Nothing | ||
118 | ((off, _ ) : _) -> do | ||
119 | let pix = off `div` metadataPieceSize | ||
120 | pendingPieces %= ((addr, pix) :) | ||
121 | return (Just (BlockIx pix 0 metadataPieceSize)) | ||
122 | |||
123 | resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) | ||
124 | |||
125 | pushBlock addr Torrent.Piece {..} = do | ||
126 | p <- use pendingPieces | ||
127 | when ((addr, pieceIndex) `L.notElem` p) $ | ||
128 | error "not requested" | ||
129 | cancelPending pieceIndex | ||
130 | |||
131 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
132 | b <- use bucket | ||
133 | case toPiece b of | ||
134 | Nothing -> return Nothing | ||
135 | Just chunks -> do | ||
136 | t <- use topic | ||
137 | case parseInfoDict (BL.toStrict chunks) t of | ||
138 | Right x -> do | ||
139 | pendingPieces .= [] | ||
140 | return undefined -- (Just x) | ||
141 | Left e -> do | ||
142 | pendingPieces .= [] | ||
143 | bucket .= Block.empty (Block.size b) | ||
144 | return undefined -- Nothing | ||
145 | where | ||
146 | -- todo use incremental parsing to avoid BS.concat call | ||
147 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
148 | parseInfoDict chunk topic = | ||
149 | case BE.decode chunk of | ||
150 | Right (infodict @ InfoDict {..}) | ||
151 | | topic == idInfoHash -> return infodict | ||
152 | | otherwise -> Left "broken infodict" | ||
153 | Left err -> Left $ "unable to parse infodict " ++ err | ||
154 | |||
155 | {----------------------------------------------------------------------- | ||
156 | -- Content download | ||
157 | -----------------------------------------------------------------------} | ||
158 | -- $content-download | ||
159 | -- | ||
160 | -- A block can have one of the following status: | ||
161 | -- | ||
162 | -- 1) /not allowed/: Piece is not in download set. | ||
163 | -- | ||
164 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
165 | -- but /this/ peer did not send any 'Request' message for this | ||
166 | -- block. To allow some piece use | ||
167 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
168 | -- and 'allowPiece'. | ||
169 | -- | ||
170 | -- 3) /inflight/: (pending?) Block have been requested but | ||
171 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
172 | -- Related functions 'markInflight' | ||
173 | -- | ||
174 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
175 | -- Related functions 'insertBlock'. | ||
176 | -- | ||
177 | -- Piece status: | ||
178 | -- | ||
179 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
180 | -- downloaded but the piece did not verified yet. | ||
181 | -- | ||
182 | -- * Valid: go to completed; | ||
183 | -- | ||
184 | -- * Invalid: go to waiting. | ||
185 | -- | ||
186 | -- 2) /corrupted/: | ||
187 | -- | ||
188 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
189 | -- verified via the hash. Usually the piece should be stored to | ||
190 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
191 | -- messages to the /remote/ peers. | ||
192 | -- | ||
193 | |||
42 | data PieceEntry = PieceEntry | 194 | data PieceEntry = PieceEntry |
43 | { pending :: [(PeerAddr IP, BlockIx)] | 195 | { pending :: [(PeerAddr IP, BlockIx)] |
44 | , stalled :: Bucket | 196 | , stalled :: Bucket |
@@ -50,44 +202,23 @@ pieceEntry s = PieceEntry [] (Block.empty s) | |||
50 | isEmpty :: PieceEntry -> Bool | 202 | isEmpty :: PieceEntry -> Bool |
51 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | 203 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled |
52 | 204 | ||
53 | holes :: PieceIx -> PieceEntry -> [BlockIx] | 205 | _holes :: PieceIx -> PieceEntry -> [BlockIx] |
54 | holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | 206 | _holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) |
55 | where | 207 | where |
56 | mkBlockIx (off, sz) = BlockIx pix off sz | 208 | mkBlockIx (off, sz) = BlockIx pix off sz |
57 | 209 | ||
58 | {----------------------------------------------------------------------- | 210 | data ContentDownload = ContentDownload |
59 | -- Session status | 211 | { inprogress :: !(Map PieceIx PieceEntry) |
60 | -----------------------------------------------------------------------} | 212 | , bitfield :: !Bitfield |
61 | 213 | , pieceSize :: !PieceSize | |
62 | data SessionStatus = SessionStatus | 214 | , contentStorage :: Storage |
63 | { inprogress :: !(Map PieceIx PieceEntry) | ||
64 | , bitfield :: !Bitfield | ||
65 | , pieceSize :: !PieceSize | ||
66 | } | ||
67 | |||
68 | sessionStatus :: Bitfield -> PieceSize -> SessionStatus | ||
69 | sessionStatus bf ps = SessionStatus | ||
70 | { inprogress = M.empty | ||
71 | , bitfield = bf | ||
72 | , pieceSize = ps | ||
73 | } | 215 | } |
74 | 216 | ||
75 | type StatusUpdates a = StateT SessionStatus IO a | 217 | contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload |
76 | 218 | contentDownload = ContentDownload M.empty | |
77 | -- | | ||
78 | runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a | ||
79 | runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
80 | |||
81 | getBitfield :: MVar SessionStatus -> IO Bitfield | ||
82 | getBitfield var = bitfield <$> readMVar var | ||
83 | 219 | ||
84 | getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int | 220 | --modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () |
85 | getRequestQueueLength addr = do | 221 | modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s |
86 | m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
87 | return $ L.sum $ L.map L.length m | ||
88 | |||
89 | modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates () | ||
90 | modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | ||
91 | { inprogress = alter (g pieceSize) pix inprogress } | 222 | { inprogress = alter (g pieceSize) pix inprogress } |
92 | where | 223 | where |
93 | g s = h . f . fromMaybe (pieceEntry s) | 224 | g s = h . f . fromMaybe (pieceEntry s) |
@@ -95,81 +226,70 @@ modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | |||
95 | | isEmpty e = Nothing | 226 | | isEmpty e = Nothing |
96 | | otherwise = Just e | 227 | | otherwise = Just e |
97 | 228 | ||
98 | {----------------------------------------------------------------------- | 229 | instance Download ContentDownload (Block BL.ByteString) where |
99 | -- Piece download | 230 | scheduleBlocks n addr maskBF = do |
100 | -----------------------------------------------------------------------} | 231 | ContentDownload {..} <- get |
232 | let wantPieces = maskBF `BF.difference` bitfield | ||
233 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ | ||
234 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) | ||
235 | inprogress | ||
101 | 236 | ||
102 | -- TODO choose block nearest to pending or stalled sets to reduce disk | 237 | bixs <- if L.null wantBlocks |
103 | -- seeks on remote machines | 238 | then do |
104 | chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] | 239 | mpix <- choosePiece wantPieces |
105 | chooseBlocks xs n = return (L.take n xs) | 240 | case mpix of -- TODO return 'n' blocks |
106 | 241 | Nothing -> return [] | |
107 | -- TODO use selection strategies from Exchange.Selector | 242 | Just pix -> return [leadingBlock pix defaultTransferSize] |
108 | choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) | 243 | else chooseBlocks wantBlocks n |
109 | choosePiece bf | 244 | |
110 | | BF.null bf = return $ Nothing | 245 | forM_ bixs $ \ bix -> do |
111 | | otherwise = return $ Just $ BF.findMin bf | 246 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e |
112 | 247 | { pending = (addr, bix) : pending } | |
113 | scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] | 248 | |
114 | scheduleBlocks addr maskBF n = do | 249 | return bixs |
115 | SessionStatus {..} <- get | 250 | where |
116 | let wantPieces = maskBF `BF.difference` bitfield | 251 | -- TODO choose block nearest to pending or stalled sets to reduce disk |
117 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ | 252 | -- seeks on remote machines |
118 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress | 253 | --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] |
119 | 254 | chooseBlocks xs n = return (L.take n xs) | |
120 | bixs <- if L.null wantBlocks | 255 | |
121 | then do | 256 | -- TODO use selection strategies from Exchange.Selector |
122 | mpix <- choosePiece wantPieces | 257 | --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) |
123 | case mpix of -- TODO return 'n' blocks | 258 | choosePiece bf |
124 | Nothing -> return [] | 259 | | BF.null bf = return $ Nothing |
125 | Just pix -> return [leadingBlock pix defaultTransferSize] | 260 | | otherwise = return $ Just $ BF.findMin bf |
126 | else chooseBlocks wantBlocks n | 261 | |
127 | 262 | getRequestQueueLength addr = do | |
128 | forM_ bixs $ \ bix -> do | 263 | m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) |
129 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | 264 | return $ L.sum $ L.map L.length $ M.elems m |
130 | { pending = (addr, bix) : pending } | 265 | |
131 | 266 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | |
132 | return bixs | 267 | where |
133 | 268 | reset = fmap $ \ e -> e | |
134 | |||
135 | -- | Remove all pending block requests to the remote peer. May be used | ||
136 | -- when: | ||
137 | -- | ||
138 | -- * a peer closes connection; | ||
139 | -- | ||
140 | -- * remote peer choked this peer; | ||
141 | -- | ||
142 | -- * timeout expired. | ||
143 | -- | ||
144 | resetPending :: PeerAddr IP -> StatusUpdates () | ||
145 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
146 | where | ||
147 | reset = fmap $ \ e -> e | ||
148 | { pending = L.filter (not . (==) addr . fst) (pending e) } | 269 | { pending = L.filter (not . (==) addr . fst) (pending e) } |
149 | 270 | ||
150 | -- | MAY write to storage, if a new piece have been completed. | 271 | pushBlock addr blk @ Block {..} = do |
151 | pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) | 272 | mpe <- gets (M.lookup blkPiece . inprogress) |
152 | pushBlock blk @ Block {..} storage = do | 273 | case mpe of |
153 | mpe <- gets (M.lookup blkPiece . inprogress) | 274 | Nothing -> return Nothing |
154 | case mpe of | 275 | Just (pe @ PieceEntry {..}) |
155 | Nothing -> return Nothing | 276 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing |
156 | Just (pe @ PieceEntry {..}) | 277 | | otherwise -> do |
157 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | 278 | let bkt' = Block.insertLazy blkOffset blkData stalled |
158 | | otherwise -> do | 279 | case toPiece bkt' of |
159 | let bkt' = Block.insertLazy blkOffset blkData stalled | 280 | Nothing -> do |
160 | case toPiece bkt' of | 281 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e |
161 | Nothing -> do | 282 | { pending = L.filter ((==) (blockIx blk) . snd) pending |
162 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | 283 | , stalled = bkt' |
163 | { pending = L.filter ((==) (blockIx blk) . snd) pending | 284 | } |
164 | , stalled = bkt' | 285 | return (Just False) |
165 | } | 286 | |
166 | return (Just False) | 287 | Just pieceData -> do |
167 | 288 | -- TODO verify | |
168 | Just pieceData -> do | 289 | storage <- gets contentStorage |
169 | -- TODO verify | 290 | liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage |
170 | liftIO $ writePiece (Piece blkPiece pieceData) storage | 291 | modify $ \ s @ ContentDownload {..} -> s |
171 | modify $ \ s @ SessionStatus {..} -> s | 292 | { inprogress = M.delete blkPiece inprogress |
172 | { inprogress = M.delete blkPiece inprogress | 293 | , bitfield = BF.insert blkPiece bitfield |
173 | , bitfield = BF.insert blkPiece bitfield | 294 | } |
174 | } | 295 | return (Just True) |
175 | return (Just True) | ||