summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Download.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Download.hs295
1 files changed, 295 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs
new file mode 100644
index 00000000..9a6b5f91
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Download.hs
@@ -0,0 +1,295 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8--
9--
10{-# LANGUAGE FlexibleInstances #-}
11{-# LANGUAGE MultiParamTypeClasses #-}
12{-# LANGUAGE FunctionalDependencies #-}
13{-# LANGUAGE TemplateHaskell #-}
14module Network.BitTorrent.Exchange.Download
15 ( -- * Downloading
16 Download (..)
17 , Updates
18 , runDownloadUpdates
19
20 -- ** Metadata
21 -- $metadata-download
22 , MetadataDownload
23 , metadataDownload
24
25 -- ** Content
26 -- $content-download
27 , ContentDownload
28 , contentDownload
29 ) where
30
31import Control.Applicative
32import Control.Concurrent
33import Control.Lens
34import Control.Monad.State
35import Data.BEncode as BE
36import Data.ByteString as BS
37import Data.ByteString.Lazy as BL
38import Data.Default
39import Data.List as L
40import Data.Maybe
41import Data.Map as M
42import Data.Tuple
43
44import Data.Torrent as Torrent
45import Network.BitTorrent.Address
46import Network.BitTorrent.Exchange.Bitfield as BF
47import Network.BitTorrent.Exchange.Block as Block
48import Network.BitTorrent.Exchange.Message as Msg
49import System.Torrent.Storage (Storage, writePiece)
50
51
52{-----------------------------------------------------------------------
53-- Class
54-----------------------------------------------------------------------}
55
56type Updates s a = StateT s IO a
57
58runDownloadUpdates :: MVar s -> Updates s a -> IO a
59runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)
60
61class Download s chunk | s -> chunk where
62 scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]
63
64 -- |
65 scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
66 scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf
67
68 -- | Get number of sent requests to this peer.
69 getRequestQueueLength :: PeerAddr IP -> Updates s Int
70
71 -- | Remove all pending block requests to the remote peer. May be used
72 -- when:
73 --
74 -- * a peer closes connection;
75 --
76 -- * remote peer choked this peer;
77 --
78 -- * timeout expired.
79 --
80 resetPending :: PeerAddr IP -> Updates s ()
81
82 -- | MAY write to storage, if a new piece have been completed.
83 --
84 -- You should check if a returned by peer block is actually have
85 -- been requested and in-flight. This is needed to avoid "I send
86 -- random corrupted block" attacks.
87 pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)
88
89{-----------------------------------------------------------------------
90-- Metadata download
91-----------------------------------------------------------------------}
92-- $metadata-download
93-- TODO
94
95data MetadataDownload = MetadataDownload
96 { _pendingPieces :: [(PeerAddr IP, PieceIx)]
97 , _bucket :: Bucket
98 , _topic :: InfoHash
99 }
100
101makeLenses ''MetadataDownload
102
103-- | Create a new scheduler for infodict of the given size.
104metadataDownload :: Int -> InfoHash -> MetadataDownload
105metadataDownload ps = MetadataDownload [] (Block.empty ps)
106
107instance Default MetadataDownload where
108 def = error "instance Default MetadataDownload"
109
110--cancelPending :: PieceIx -> Updates ()
111cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)
112
113instance Download MetadataDownload (Piece BS.ByteString) where
114 scheduleBlock addr bf = do
115 bkt <- use bucket
116 case spans metadataPieceSize bkt of
117 [] -> return Nothing
118 ((off, _ ) : _) -> do
119 let pix = off `div` metadataPieceSize
120 pendingPieces %= ((addr, pix) :)
121 return (Just (BlockIx pix 0 metadataPieceSize))
122
123 resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)
124
125 pushBlock addr Torrent.Piece {..} = do
126 p <- use pendingPieces
127 when ((addr, pieceIndex) `L.notElem` p) $
128 error "not requested"
129 cancelPending pieceIndex
130
131 bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
132 b <- use bucket
133 case toPiece b of
134 Nothing -> return Nothing
135 Just chunks -> do
136 t <- use topic
137 case parseInfoDict (BL.toStrict chunks) t of
138 Right x -> do
139 pendingPieces .= []
140 return undefined -- (Just x)
141 Left e -> do
142 pendingPieces .= []
143 bucket .= Block.empty (Block.size b)
144 return undefined -- Nothing
145 where
146 -- todo use incremental parsing to avoid BS.concat call
147 parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
148 parseInfoDict chunk topic =
149 case BE.decode chunk of
150 Right (infodict @ InfoDict {..})
151 | topic == idInfoHash -> return infodict
152 | otherwise -> Left "broken infodict"
153 Left err -> Left $ "unable to parse infodict " ++ err
154
155{-----------------------------------------------------------------------
156-- Content download
157-----------------------------------------------------------------------}
158-- $content-download
159--
160-- A block can have one of the following status:
161--
162-- 1) /not allowed/: Piece is not in download set.
163--
164-- 2) /waiting/: (allowed?) Block have been allowed to download,
165-- but /this/ peer did not send any 'Request' message for this
166-- block. To allow some piece use
167-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
168-- and 'allowPiece'.
169--
170-- 3) /inflight/: (pending?) Block have been requested but
171-- /remote/ peer did not send any 'Piece' message for this block.
172-- Related functions 'markInflight'
173--
174-- 4) /pending/: (stalled?) Block have have been downloaded
175-- Related functions 'insertBlock'.
176--
177-- Piece status:
178--
179-- 1) /assembled/: (downloaded?) All blocks in piece have been
180-- downloaded but the piece did not verified yet.
181--
182-- * Valid: go to completed;
183--
184-- * Invalid: go to waiting.
185--
186-- 2) /corrupted/:
187--
188-- 3) /downloaded/: (verified?) A piece have been successfully
189-- verified via the hash. Usually the piece should be stored to
190-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
191-- messages to the /remote/ peers.
192--
193
194data PieceEntry = PieceEntry
195 { pending :: [(PeerAddr IP, BlockIx)]
196 , stalled :: Bucket
197 }
198
199pieceEntry :: PieceSize -> PieceEntry
200pieceEntry s = PieceEntry [] (Block.empty s)
201
202isEmpty :: PieceEntry -> Bool
203isEmpty PieceEntry {..} = L.null pending && Block.null stalled
204
205_holes :: PieceIx -> PieceEntry -> [BlockIx]
206_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
207 where
208 mkBlockIx (off, sz) = BlockIx pix off sz
209
210data ContentDownload = ContentDownload
211 { inprogress :: !(Map PieceIx PieceEntry)
212 , bitfield :: !Bitfield
213 , pieceSize :: !PieceSize
214 , contentStorage :: Storage
215 }
216
217contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
218contentDownload = ContentDownload M.empty
219
220--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
221modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
222 { inprogress = alter (g pieceSize) pix inprogress }
223 where
224 g s = h . f . fromMaybe (pieceEntry s)
225 h e
226 | isEmpty e = Nothing
227 | otherwise = Just e
228
229instance Download ContentDownload (Block BL.ByteString) where
230 scheduleBlocks n addr maskBF = do
231 ContentDownload {..} <- get
232 let wantPieces = maskBF `BF.difference` bitfield
233 let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
234 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
235 inprogress
236
237 bixs <- if L.null wantBlocks
238 then do
239 mpix <- choosePiece wantPieces
240 case mpix of -- TODO return 'n' blocks
241 Nothing -> return []
242 Just pix -> return [leadingBlock pix defaultTransferSize]
243 else chooseBlocks wantBlocks n
244
245 forM_ bixs $ \ bix -> do
246 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
247 { pending = (addr, bix) : pending }
248
249 return bixs
250 where
251 -- TODO choose block nearest to pending or stalled sets to reduce disk
252 -- seeks on remote machines
253 --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
254 chooseBlocks xs n = return (L.take n xs)
255
256 -- TODO use selection strategies from Exchange.Selector
257 --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
258 choosePiece bf
259 | BF.null bf = return $ Nothing
260 | otherwise = return $ Just $ BF.findMin bf
261
262 getRequestQueueLength addr = do
263 m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
264 return $ L.sum $ L.map L.length $ M.elems m
265
266 resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
267 where
268 reset = fmap $ \ e -> e
269 { pending = L.filter (not . (==) addr . fst) (pending e) }
270
271 pushBlock addr blk @ Block {..} = do
272 mpe <- gets (M.lookup blkPiece . inprogress)
273 case mpe of
274 Nothing -> return Nothing
275 Just (pe @ PieceEntry {..})
276 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
277 | otherwise -> do
278 let bkt' = Block.insertLazy blkOffset blkData stalled
279 case toPiece bkt' of
280 Nothing -> do
281 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
282 { pending = L.filter ((==) (blockIx blk) . snd) pending
283 , stalled = bkt'
284 }
285 return (Just False)
286
287 Just pieceData -> do
288 -- TODO verify
289 storage <- gets contentStorage
290 liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
291 modify $ \ s @ ContentDownload {..} -> s
292 { inprogress = M.delete blkPiece inprogress
293 , bitfield = BF.insert blkPiece bitfield
294 }
295 return (Just True)