diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Download.hs | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..9a6b5f91 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -0,0 +1,295 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- | ||
9 | -- | ||
10 | {-# LANGUAGE FlexibleInstances #-} | ||
11 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
12 | {-# LANGUAGE FunctionalDependencies #-} | ||
13 | {-# LANGUAGE TemplateHaskell #-} | ||
14 | module Network.BitTorrent.Exchange.Download | ||
15 | ( -- * Downloading | ||
16 | Download (..) | ||
17 | , Updates | ||
18 | , runDownloadUpdates | ||
19 | |||
20 | -- ** Metadata | ||
21 | -- $metadata-download | ||
22 | , MetadataDownload | ||
23 | , metadataDownload | ||
24 | |||
25 | -- ** Content | ||
26 | -- $content-download | ||
27 | , ContentDownload | ||
28 | , contentDownload | ||
29 | ) where | ||
30 | |||
31 | import Control.Applicative | ||
32 | import Control.Concurrent | ||
33 | import Control.Lens | ||
34 | import Control.Monad.State | ||
35 | import Data.BEncode as BE | ||
36 | import Data.ByteString as BS | ||
37 | import Data.ByteString.Lazy as BL | ||
38 | import Data.Default | ||
39 | import Data.List as L | ||
40 | import Data.Maybe | ||
41 | import Data.Map as M | ||
42 | import Data.Tuple | ||
43 | |||
44 | import Data.Torrent as Torrent | ||
45 | import Network.BitTorrent.Address | ||
46 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
47 | import Network.BitTorrent.Exchange.Block as Block | ||
48 | import Network.BitTorrent.Exchange.Message as Msg | ||
49 | import System.Torrent.Storage (Storage, writePiece) | ||
50 | |||
51 | |||
52 | {----------------------------------------------------------------------- | ||
53 | -- Class | ||
54 | -----------------------------------------------------------------------} | ||
55 | |||
56 | type Updates s a = StateT s IO a | ||
57 | |||
58 | runDownloadUpdates :: MVar s -> Updates s a -> IO a | ||
59 | runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
60 | |||
61 | class Download s chunk | s -> chunk where | ||
62 | scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] | ||
63 | |||
64 | -- | | ||
65 | scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) | ||
66 | scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf | ||
67 | |||
68 | -- | Get number of sent requests to this peer. | ||
69 | getRequestQueueLength :: PeerAddr IP -> Updates s Int | ||
70 | |||
71 | -- | Remove all pending block requests to the remote peer. May be used | ||
72 | -- when: | ||
73 | -- | ||
74 | -- * a peer closes connection; | ||
75 | -- | ||
76 | -- * remote peer choked this peer; | ||
77 | -- | ||
78 | -- * timeout expired. | ||
79 | -- | ||
80 | resetPending :: PeerAddr IP -> Updates s () | ||
81 | |||
82 | -- | MAY write to storage, if a new piece have been completed. | ||
83 | -- | ||
84 | -- You should check if a returned by peer block is actually have | ||
85 | -- been requested and in-flight. This is needed to avoid "I send | ||
86 | -- random corrupted block" attacks. | ||
87 | pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) | ||
88 | |||
89 | {----------------------------------------------------------------------- | ||
90 | -- Metadata download | ||
91 | -----------------------------------------------------------------------} | ||
92 | -- $metadata-download | ||
93 | -- TODO | ||
94 | |||
95 | data MetadataDownload = MetadataDownload | ||
96 | { _pendingPieces :: [(PeerAddr IP, PieceIx)] | ||
97 | , _bucket :: Bucket | ||
98 | , _topic :: InfoHash | ||
99 | } | ||
100 | |||
101 | makeLenses ''MetadataDownload | ||
102 | |||
103 | -- | Create a new scheduler for infodict of the given size. | ||
104 | metadataDownload :: Int -> InfoHash -> MetadataDownload | ||
105 | metadataDownload ps = MetadataDownload [] (Block.empty ps) | ||
106 | |||
107 | instance Default MetadataDownload where | ||
108 | def = error "instance Default MetadataDownload" | ||
109 | |||
110 | --cancelPending :: PieceIx -> Updates () | ||
111 | cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) | ||
112 | |||
113 | instance Download MetadataDownload (Piece BS.ByteString) where | ||
114 | scheduleBlock addr bf = do | ||
115 | bkt <- use bucket | ||
116 | case spans metadataPieceSize bkt of | ||
117 | [] -> return Nothing | ||
118 | ((off, _ ) : _) -> do | ||
119 | let pix = off `div` metadataPieceSize | ||
120 | pendingPieces %= ((addr, pix) :) | ||
121 | return (Just (BlockIx pix 0 metadataPieceSize)) | ||
122 | |||
123 | resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) | ||
124 | |||
125 | pushBlock addr Torrent.Piece {..} = do | ||
126 | p <- use pendingPieces | ||
127 | when ((addr, pieceIndex) `L.notElem` p) $ | ||
128 | error "not requested" | ||
129 | cancelPending pieceIndex | ||
130 | |||
131 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
132 | b <- use bucket | ||
133 | case toPiece b of | ||
134 | Nothing -> return Nothing | ||
135 | Just chunks -> do | ||
136 | t <- use topic | ||
137 | case parseInfoDict (BL.toStrict chunks) t of | ||
138 | Right x -> do | ||
139 | pendingPieces .= [] | ||
140 | return undefined -- (Just x) | ||
141 | Left e -> do | ||
142 | pendingPieces .= [] | ||
143 | bucket .= Block.empty (Block.size b) | ||
144 | return undefined -- Nothing | ||
145 | where | ||
146 | -- todo use incremental parsing to avoid BS.concat call | ||
147 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
148 | parseInfoDict chunk topic = | ||
149 | case BE.decode chunk of | ||
150 | Right (infodict @ InfoDict {..}) | ||
151 | | topic == idInfoHash -> return infodict | ||
152 | | otherwise -> Left "broken infodict" | ||
153 | Left err -> Left $ "unable to parse infodict " ++ err | ||
154 | |||
155 | {----------------------------------------------------------------------- | ||
156 | -- Content download | ||
157 | -----------------------------------------------------------------------} | ||
158 | -- $content-download | ||
159 | -- | ||
160 | -- A block can have one of the following status: | ||
161 | -- | ||
162 | -- 1) /not allowed/: Piece is not in download set. | ||
163 | -- | ||
164 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
165 | -- but /this/ peer did not send any 'Request' message for this | ||
166 | -- block. To allow some piece use | ||
167 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
168 | -- and 'allowPiece'. | ||
169 | -- | ||
170 | -- 3) /inflight/: (pending?) Block have been requested but | ||
171 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
172 | -- Related functions 'markInflight' | ||
173 | -- | ||
174 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
175 | -- Related functions 'insertBlock'. | ||
176 | -- | ||
177 | -- Piece status: | ||
178 | -- | ||
179 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
180 | -- downloaded but the piece did not verified yet. | ||
181 | -- | ||
182 | -- * Valid: go to completed; | ||
183 | -- | ||
184 | -- * Invalid: go to waiting. | ||
185 | -- | ||
186 | -- 2) /corrupted/: | ||
187 | -- | ||
188 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
189 | -- verified via the hash. Usually the piece should be stored to | ||
190 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
191 | -- messages to the /remote/ peers. | ||
192 | -- | ||
193 | |||
194 | data PieceEntry = PieceEntry | ||
195 | { pending :: [(PeerAddr IP, BlockIx)] | ||
196 | , stalled :: Bucket | ||
197 | } | ||
198 | |||
199 | pieceEntry :: PieceSize -> PieceEntry | ||
200 | pieceEntry s = PieceEntry [] (Block.empty s) | ||
201 | |||
202 | isEmpty :: PieceEntry -> Bool | ||
203 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | ||
204 | |||
205 | _holes :: PieceIx -> PieceEntry -> [BlockIx] | ||
206 | _holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | ||
207 | where | ||
208 | mkBlockIx (off, sz) = BlockIx pix off sz | ||
209 | |||
210 | data ContentDownload = ContentDownload | ||
211 | { inprogress :: !(Map PieceIx PieceEntry) | ||
212 | , bitfield :: !Bitfield | ||
213 | , pieceSize :: !PieceSize | ||
214 | , contentStorage :: Storage | ||
215 | } | ||
216 | |||
217 | contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload | ||
218 | contentDownload = ContentDownload M.empty | ||
219 | |||
220 | --modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () | ||
221 | modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s | ||
222 | { inprogress = alter (g pieceSize) pix inprogress } | ||
223 | where | ||
224 | g s = h . f . fromMaybe (pieceEntry s) | ||
225 | h e | ||
226 | | isEmpty e = Nothing | ||
227 | | otherwise = Just e | ||
228 | |||
229 | instance Download ContentDownload (Block BL.ByteString) where | ||
230 | scheduleBlocks n addr maskBF = do | ||
231 | ContentDownload {..} <- get | ||
232 | let wantPieces = maskBF `BF.difference` bitfield | ||
233 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ | ||
234 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) | ||
235 | inprogress | ||
236 | |||
237 | bixs <- if L.null wantBlocks | ||
238 | then do | ||
239 | mpix <- choosePiece wantPieces | ||
240 | case mpix of -- TODO return 'n' blocks | ||
241 | Nothing -> return [] | ||
242 | Just pix -> return [leadingBlock pix defaultTransferSize] | ||
243 | else chooseBlocks wantBlocks n | ||
244 | |||
245 | forM_ bixs $ \ bix -> do | ||
246 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | ||
247 | { pending = (addr, bix) : pending } | ||
248 | |||
249 | return bixs | ||
250 | where | ||
251 | -- TODO choose block nearest to pending or stalled sets to reduce disk | ||
252 | -- seeks on remote machines | ||
253 | --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] | ||
254 | chooseBlocks xs n = return (L.take n xs) | ||
255 | |||
256 | -- TODO use selection strategies from Exchange.Selector | ||
257 | --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) | ||
258 | choosePiece bf | ||
259 | | BF.null bf = return $ Nothing | ||
260 | | otherwise = return $ Just $ BF.findMin bf | ||
261 | |||
262 | getRequestQueueLength addr = do | ||
263 | m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
264 | return $ L.sum $ L.map L.length $ M.elems m | ||
265 | |||
266 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
267 | where | ||
268 | reset = fmap $ \ e -> e | ||
269 | { pending = L.filter (not . (==) addr . fst) (pending e) } | ||
270 | |||
271 | pushBlock addr blk @ Block {..} = do | ||
272 | mpe <- gets (M.lookup blkPiece . inprogress) | ||
273 | case mpe of | ||
274 | Nothing -> return Nothing | ||
275 | Just (pe @ PieceEntry {..}) | ||
276 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | ||
277 | | otherwise -> do | ||
278 | let bkt' = Block.insertLazy blkOffset blkData stalled | ||
279 | case toPiece bkt' of | ||
280 | Nothing -> do | ||
281 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | ||
282 | { pending = L.filter ((==) (blockIx blk) . snd) pending | ||
283 | , stalled = bkt' | ||
284 | } | ||
285 | return (Just False) | ||
286 | |||
287 | Just pieceData -> do | ||
288 | -- TODO verify | ||
289 | storage <- gets contentStorage | ||
290 | liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage | ||
291 | modify $ \ s @ ContentDownload {..} -> s | ||
292 | { inprogress = M.delete blkPiece inprogress | ||
293 | , bitfield = BF.insert blkPiece bitfield | ||
294 | } | ||
295 | return (Just True) | ||