diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r-- | dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs | 296 |
1 files changed, 296 insertions, 0 deletions
diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..981db2fb --- /dev/null +++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -0,0 +1,296 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- | ||
9 | -- | ||
10 | {-# LANGUAGE FlexibleContexts #-} | ||
11 | {-# LANGUAGE FlexibleInstances #-} | ||
12 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
13 | {-# LANGUAGE FunctionalDependencies #-} | ||
14 | {-# LANGUAGE TemplateHaskell #-} | ||
15 | module Network.BitTorrent.Exchange.Download | ||
16 | ( -- * Downloading | ||
17 | Download (..) | ||
18 | , Updates | ||
19 | , runDownloadUpdates | ||
20 | |||
21 | -- ** Metadata | ||
22 | -- $metadata-download | ||
23 | , MetadataDownload | ||
24 | , metadataDownload | ||
25 | |||
26 | -- ** Content | ||
27 | -- $content-download | ||
28 | , ContentDownload | ||
29 | , contentDownload | ||
30 | ) where | ||
31 | |||
32 | import Control.Applicative | ||
33 | import Control.Concurrent | ||
34 | import Control.Lens | ||
35 | import Control.Monad.State | ||
36 | import Data.BEncode as BE | ||
37 | import Data.ByteString as BS | ||
38 | import Data.ByteString.Lazy as BL | ||
39 | import Data.Default | ||
40 | import Data.List as L | ||
41 | import Data.Maybe | ||
42 | import Data.Map as M | ||
43 | import Data.Tuple | ||
44 | |||
45 | import Data.Torrent as Torrent | ||
46 | import Network.Address | ||
47 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
48 | import Network.BitTorrent.Exchange.Block as Block | ||
49 | import Network.BitTorrent.Exchange.Message as Msg | ||
50 | import System.Torrent.Storage (Storage, writePiece) | ||
51 | |||
52 | |||
53 | {----------------------------------------------------------------------- | ||
54 | -- Class | ||
55 | -----------------------------------------------------------------------} | ||
56 | |||
57 | type Updates s a = StateT s IO a | ||
58 | |||
59 | runDownloadUpdates :: MVar s -> Updates s a -> IO a | ||
60 | runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
61 | |||
62 | class Download s chunk | s -> chunk where | ||
63 | scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx] | ||
64 | |||
65 | -- | | ||
66 | scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx) | ||
67 | scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf | ||
68 | |||
69 | -- | Get number of sent requests to this peer. | ||
70 | getRequestQueueLength :: PeerAddr IP -> Updates s Int | ||
71 | |||
72 | -- | Remove all pending block requests to the remote peer. May be used | ||
73 | -- when: | ||
74 | -- | ||
75 | -- * a peer closes connection; | ||
76 | -- | ||
77 | -- * remote peer choked this peer; | ||
78 | -- | ||
79 | -- * timeout expired. | ||
80 | -- | ||
81 | resetPending :: PeerAddr IP -> Updates s () | ||
82 | |||
83 | -- | MAY write to storage, if a new piece have been completed. | ||
84 | -- | ||
85 | -- You should check if a returned by peer block is actually have | ||
86 | -- been requested and in-flight. This is needed to avoid "I send | ||
87 | -- random corrupted block" attacks. | ||
88 | pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool) | ||
89 | |||
90 | {----------------------------------------------------------------------- | ||
91 | -- Metadata download | ||
92 | -----------------------------------------------------------------------} | ||
93 | -- $metadata-download | ||
94 | -- TODO | ||
95 | |||
96 | data MetadataDownload = MetadataDownload | ||
97 | { _pendingPieces :: [(PeerAddr IP, PieceIx)] | ||
98 | , _bucket :: Bucket | ||
99 | , _topic :: InfoHash | ||
100 | } | ||
101 | |||
102 | makeLenses ''MetadataDownload | ||
103 | |||
104 | -- | Create a new scheduler for infodict of the given size. | ||
105 | metadataDownload :: Int -> InfoHash -> MetadataDownload | ||
106 | metadataDownload ps = MetadataDownload [] (Block.empty ps) | ||
107 | |||
108 | instance Default MetadataDownload where | ||
109 | def = error "instance Default MetadataDownload" | ||
110 | |||
111 | --cancelPending :: PieceIx -> Updates () | ||
112 | cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd) | ||
113 | |||
114 | instance Download MetadataDownload (Piece BS.ByteString) where | ||
115 | scheduleBlock addr bf = do | ||
116 | bkt <- use bucket | ||
117 | case spans metadataPieceSize bkt of | ||
118 | [] -> return Nothing | ||
119 | ((off, _ ) : _) -> do | ||
120 | let pix = off `div` metadataPieceSize | ||
121 | pendingPieces %= ((addr, pix) :) | ||
122 | return (Just (BlockIx pix 0 metadataPieceSize)) | ||
123 | |||
124 | resetPending addr = pendingPieces %= L.filter ((addr ==) . fst) | ||
125 | |||
126 | pushBlock addr Torrent.Piece {..} = do | ||
127 | p <- use pendingPieces | ||
128 | when ((addr, pieceIndex) `L.notElem` p) $ | ||
129 | error "not requested" | ||
130 | cancelPending pieceIndex | ||
131 | |||
132 | bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData | ||
133 | b <- use bucket | ||
134 | case toPiece b of | ||
135 | Nothing -> return Nothing | ||
136 | Just chunks -> do | ||
137 | t <- use topic | ||
138 | case parseInfoDict (BL.toStrict chunks) t of | ||
139 | Right x -> do | ||
140 | pendingPieces .= [] | ||
141 | return undefined -- (Just x) | ||
142 | Left e -> do | ||
143 | pendingPieces .= [] | ||
144 | bucket .= Block.empty (Block.size b) | ||
145 | return undefined -- Nothing | ||
146 | where | ||
147 | -- todo use incremental parsing to avoid BS.concat call | ||
148 | parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict | ||
149 | parseInfoDict chunk topic = | ||
150 | case BE.decode chunk of | ||
151 | Right (infodict @ InfoDict {..}) | ||
152 | | topic == idInfoHash -> return infodict | ||
153 | | otherwise -> Left "broken infodict" | ||
154 | Left err -> Left $ "unable to parse infodict " ++ err | ||
155 | |||
156 | {----------------------------------------------------------------------- | ||
157 | -- Content download | ||
158 | -----------------------------------------------------------------------} | ||
159 | -- $content-download | ||
160 | -- | ||
161 | -- A block can have one of the following status: | ||
162 | -- | ||
163 | -- 1) /not allowed/: Piece is not in download set. | ||
164 | -- | ||
165 | -- 2) /waiting/: (allowed?) Block have been allowed to download, | ||
166 | -- but /this/ peer did not send any 'Request' message for this | ||
167 | -- block. To allow some piece use | ||
168 | -- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet' | ||
169 | -- and 'allowPiece'. | ||
170 | -- | ||
171 | -- 3) /inflight/: (pending?) Block have been requested but | ||
172 | -- /remote/ peer did not send any 'Piece' message for this block. | ||
173 | -- Related functions 'markInflight' | ||
174 | -- | ||
175 | -- 4) /pending/: (stalled?) Block have have been downloaded | ||
176 | -- Related functions 'insertBlock'. | ||
177 | -- | ||
178 | -- Piece status: | ||
179 | -- | ||
180 | -- 1) /assembled/: (downloaded?) All blocks in piece have been | ||
181 | -- downloaded but the piece did not verified yet. | ||
182 | -- | ||
183 | -- * Valid: go to completed; | ||
184 | -- | ||
185 | -- * Invalid: go to waiting. | ||
186 | -- | ||
187 | -- 2) /corrupted/: | ||
188 | -- | ||
189 | -- 3) /downloaded/: (verified?) A piece have been successfully | ||
190 | -- verified via the hash. Usually the piece should be stored to | ||
191 | -- the 'System.Torrent.Storage' and /this/ peer should send 'Have' | ||
192 | -- messages to the /remote/ peers. | ||
193 | -- | ||
194 | |||
195 | data PieceEntry = PieceEntry | ||
196 | { pending :: [(PeerAddr IP, BlockIx)] | ||
197 | , stalled :: Bucket | ||
198 | } | ||
199 | |||
200 | pieceEntry :: PieceSize -> PieceEntry | ||
201 | pieceEntry s = PieceEntry [] (Block.empty s) | ||
202 | |||
203 | isEmpty :: PieceEntry -> Bool | ||
204 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | ||
205 | |||
206 | _holes :: PieceIx -> PieceEntry -> [BlockIx] | ||
207 | _holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | ||
208 | where | ||
209 | mkBlockIx (off, sz) = BlockIx pix off sz | ||
210 | |||
211 | data ContentDownload = ContentDownload | ||
212 | { inprogress :: !(Map PieceIx PieceEntry) | ||
213 | , bitfield :: !Bitfield | ||
214 | , pieceSize :: !PieceSize | ||
215 | , contentStorage :: Storage | ||
216 | } | ||
217 | |||
218 | contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload | ||
219 | contentDownload = ContentDownload M.empty | ||
220 | |||
221 | --modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates () | ||
222 | modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s | ||
223 | { inprogress = alter (g pieceSize) pix inprogress } | ||
224 | where | ||
225 | g s = h . f . fromMaybe (pieceEntry s) | ||
226 | h e | ||
227 | | isEmpty e = Nothing | ||
228 | | otherwise = Just e | ||
229 | |||
230 | instance Download ContentDownload (Block BL.ByteString) where | ||
231 | scheduleBlocks n addr maskBF = do | ||
232 | ContentDownload {..} <- get | ||
233 | let wantPieces = maskBF `BF.difference` bitfield | ||
234 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $ | ||
235 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) | ||
236 | inprogress | ||
237 | |||
238 | bixs <- if L.null wantBlocks | ||
239 | then do | ||
240 | mpix <- choosePiece wantPieces | ||
241 | case mpix of -- TODO return 'n' blocks | ||
242 | Nothing -> return [] | ||
243 | Just pix -> return [leadingBlock pix defaultTransferSize] | ||
244 | else chooseBlocks wantBlocks n | ||
245 | |||
246 | forM_ bixs $ \ bix -> do | ||
247 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | ||
248 | { pending = (addr, bix) : pending } | ||
249 | |||
250 | return bixs | ||
251 | where | ||
252 | -- TODO choose block nearest to pending or stalled sets to reduce disk | ||
253 | -- seeks on remote machines | ||
254 | --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx] | ||
255 | chooseBlocks xs n = return (L.take n xs) | ||
256 | |||
257 | -- TODO use selection strategies from Exchange.Selector | ||
258 | --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx) | ||
259 | choosePiece bf | ||
260 | | BF.null bf = return $ Nothing | ||
261 | | otherwise = return $ Just $ BF.findMin bf | ||
262 | |||
263 | getRequestQueueLength addr = do | ||
264 | m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
265 | return $ L.sum $ L.map L.length $ M.elems m | ||
266 | |||
267 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
268 | where | ||
269 | reset = fmap $ \ e -> e | ||
270 | { pending = L.filter (not . (==) addr . fst) (pending e) } | ||
271 | |||
272 | pushBlock addr blk @ Block {..} = do | ||
273 | mpe <- gets (M.lookup blkPiece . inprogress) | ||
274 | case mpe of | ||
275 | Nothing -> return Nothing | ||
276 | Just (pe @ PieceEntry {..}) | ||
277 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | ||
278 | | otherwise -> do | ||
279 | let bkt' = Block.insertLazy blkOffset blkData stalled | ||
280 | case toPiece bkt' of | ||
281 | Nothing -> do | ||
282 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | ||
283 | { pending = L.filter ((==) (blockIx blk) . snd) pending | ||
284 | , stalled = bkt' | ||
285 | } | ||
286 | return (Just False) | ||
287 | |||
288 | Just pieceData -> do | ||
289 | -- TODO verify | ||
290 | storage <- gets contentStorage | ||
291 | liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage | ||
292 | modify $ \ s @ ContentDownload {..} -> s | ||
293 | { inprogress = M.delete blkPiece inprogress | ||
294 | , bitfield = BF.insert blkPiece bitfield | ||
295 | } | ||
296 | return (Just True) | ||