summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Download.hs
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-04-17 15:27:43 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-04-17 15:27:43 +0400
commitcb75f50f4cae778d1dfc57edff771a5145dd9894 (patch)
tree0d8133e881866a10138c35eac058cbb09f0148b5 /src/Network/BitTorrent/Exchange/Download.hs
parent6bd297aee69a74d19f81f240d170d9bca81c96cf (diff)
[Exchange] Move all download stuff to single module
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Download.hs376
1 files changed, 248 insertions, 128 deletions
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs
index fcc94485..9a6b5f91 100644
--- a/src/Network/BitTorrent/Exchange/Download.hs
+++ b/src/Network/BitTorrent/Exchange/Download.hs
@@ -1,44 +1,196 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8--
9--
10{-# LANGUAGE FlexibleInstances #-}
11{-# LANGUAGE MultiParamTypeClasses #-}
12{-# LANGUAGE FunctionalDependencies #-}
13{-# LANGUAGE TemplateHaskell #-}
1module Network.BitTorrent.Exchange.Download 14module Network.BitTorrent.Exchange.Download
2 ( -- * Environment 15 ( -- * Downloading
3 StatusUpdates 16 Download (..)
4 , runStatusUpdates 17 , Updates
5 18 , runDownloadUpdates
6 -- * Status 19
7 , SessionStatus 20 -- ** Metadata
8 , sessionStatus 21 -- $metadata-download
9 22 , MetadataDownload
10 -- * Query 23 , metadataDownload
11 , getBitfield 24
12 , getRequestQueueLength 25 -- ** Content
13 26 -- $content-download
14 -- * Control 27 , ContentDownload
15 , scheduleBlocks 28 , contentDownload
16 , resetPending
17 , pushBlock
18 ) where 29 ) where
19 30
20import Control.Applicative 31import Control.Applicative
21import Control.Concurrent 32import Control.Concurrent
33import Control.Lens
22import Control.Monad.State 34import Control.Monad.State
35import Data.BEncode as BE
36import Data.ByteString as BS
23import Data.ByteString.Lazy as BL 37import Data.ByteString.Lazy as BL
24import Data.Default 38import Data.Default
25import Data.List as L 39import Data.List as L
26import Data.Maybe 40import Data.Maybe
27import Data.Map as M 41import Data.Map as M
28import Data.Set as S
29import Data.Tuple 42import Data.Tuple
30 43
31import Data.Torrent 44import Data.Torrent as Torrent
32import Network.BitTorrent.Exchange.Bitfield as BF
33import Network.BitTorrent.Address 45import Network.BitTorrent.Address
34import Network.BitTorrent.Exchange.Block as Block 46import Network.BitTorrent.Exchange.Bitfield as BF
47import Network.BitTorrent.Exchange.Block as Block
48import Network.BitTorrent.Exchange.Message as Msg
35import System.Torrent.Storage (Storage, writePiece) 49import System.Torrent.Storage (Storage, writePiece)
36 50
37 51
38{----------------------------------------------------------------------- 52{-----------------------------------------------------------------------
39-- Piece entry 53-- Class
40-----------------------------------------------------------------------} 54-----------------------------------------------------------------------}
41 55
56type Updates s a = StateT s IO a
57
58runDownloadUpdates :: MVar s -> Updates s a -> IO a
59runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)
60
61class Download s chunk | s -> chunk where
62 scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]
63
64 -- |
65 scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
66 scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf
67
68 -- | Get number of sent requests to this peer.
69 getRequestQueueLength :: PeerAddr IP -> Updates s Int
70
71 -- | Remove all pending block requests to the remote peer. May be used
72 -- when:
73 --
74 -- * a peer closes connection;
75 --
76 -- * remote peer choked this peer;
77 --
78 -- * timeout expired.
79 --
80 resetPending :: PeerAddr IP -> Updates s ()
81
82 -- | MAY write to storage, if a new piece have been completed.
83 --
84 -- You should check if a returned by peer block is actually have
85 -- been requested and in-flight. This is needed to avoid "I send
86 -- random corrupted block" attacks.
87 pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)
88
89{-----------------------------------------------------------------------
90-- Metadata download
91-----------------------------------------------------------------------}
92-- $metadata-download
93-- TODO
94
95data MetadataDownload = MetadataDownload
96 { _pendingPieces :: [(PeerAddr IP, PieceIx)]
97 , _bucket :: Bucket
98 , _topic :: InfoHash
99 }
100
101makeLenses ''MetadataDownload
102
103-- | Create a new scheduler for infodict of the given size.
104metadataDownload :: Int -> InfoHash -> MetadataDownload
105metadataDownload ps = MetadataDownload [] (Block.empty ps)
106
107instance Default MetadataDownload where
108 def = error "instance Default MetadataDownload"
109
110--cancelPending :: PieceIx -> Updates ()
111cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)
112
113instance Download MetadataDownload (Piece BS.ByteString) where
114 scheduleBlock addr bf = do
115 bkt <- use bucket
116 case spans metadataPieceSize bkt of
117 [] -> return Nothing
118 ((off, _ ) : _) -> do
119 let pix = off `div` metadataPieceSize
120 pendingPieces %= ((addr, pix) :)
121 return (Just (BlockIx pix 0 metadataPieceSize))
122
123 resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)
124
125 pushBlock addr Torrent.Piece {..} = do
126 p <- use pendingPieces
127 when ((addr, pieceIndex) `L.notElem` p) $
128 error "not requested"
129 cancelPending pieceIndex
130
131 bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
132 b <- use bucket
133 case toPiece b of
134 Nothing -> return Nothing
135 Just chunks -> do
136 t <- use topic
137 case parseInfoDict (BL.toStrict chunks) t of
138 Right x -> do
139 pendingPieces .= []
140 return undefined -- (Just x)
141 Left e -> do
142 pendingPieces .= []
143 bucket .= Block.empty (Block.size b)
144 return undefined -- Nothing
145 where
146 -- todo use incremental parsing to avoid BS.concat call
147 parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
148 parseInfoDict chunk topic =
149 case BE.decode chunk of
150 Right (infodict @ InfoDict {..})
151 | topic == idInfoHash -> return infodict
152 | otherwise -> Left "broken infodict"
153 Left err -> Left $ "unable to parse infodict " ++ err
154
155{-----------------------------------------------------------------------
156-- Content download
157-----------------------------------------------------------------------}
158-- $content-download
159--
160-- A block can have one of the following status:
161--
162-- 1) /not allowed/: Piece is not in download set.
163--
164-- 2) /waiting/: (allowed?) Block have been allowed to download,
165-- but /this/ peer did not send any 'Request' message for this
166-- block. To allow some piece use
167-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
168-- and 'allowPiece'.
169--
170-- 3) /inflight/: (pending?) Block have been requested but
171-- /remote/ peer did not send any 'Piece' message for this block.
172-- Related functions 'markInflight'
173--
174-- 4) /pending/: (stalled?) Block have have been downloaded
175-- Related functions 'insertBlock'.
176--
177-- Piece status:
178--
179-- 1) /assembled/: (downloaded?) All blocks in piece have been
180-- downloaded but the piece did not verified yet.
181--
182-- * Valid: go to completed;
183--
184-- * Invalid: go to waiting.
185--
186-- 2) /corrupted/:
187--
188-- 3) /downloaded/: (verified?) A piece have been successfully
189-- verified via the hash. Usually the piece should be stored to
190-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
191-- messages to the /remote/ peers.
192--
193
42data PieceEntry = PieceEntry 194data PieceEntry = PieceEntry
43 { pending :: [(PeerAddr IP, BlockIx)] 195 { pending :: [(PeerAddr IP, BlockIx)]
44 , stalled :: Bucket 196 , stalled :: Bucket
@@ -50,44 +202,23 @@ pieceEntry s = PieceEntry [] (Block.empty s)
50isEmpty :: PieceEntry -> Bool 202isEmpty :: PieceEntry -> Bool
51isEmpty PieceEntry {..} = L.null pending && Block.null stalled 203isEmpty PieceEntry {..} = L.null pending && Block.null stalled
52 204
53holes :: PieceIx -> PieceEntry -> [BlockIx] 205_holes :: PieceIx -> PieceEntry -> [BlockIx]
54holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) 206_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
55 where 207 where
56 mkBlockIx (off, sz) = BlockIx pix off sz 208 mkBlockIx (off, sz) = BlockIx pix off sz
57 209
58{----------------------------------------------------------------------- 210data ContentDownload = ContentDownload
59-- Session status 211 { inprogress :: !(Map PieceIx PieceEntry)
60-----------------------------------------------------------------------} 212 , bitfield :: !Bitfield
61 213 , pieceSize :: !PieceSize
62data SessionStatus = SessionStatus 214 , contentStorage :: Storage
63 { inprogress :: !(Map PieceIx PieceEntry)
64 , bitfield :: !Bitfield
65 , pieceSize :: !PieceSize
66 }
67
68sessionStatus :: Bitfield -> PieceSize -> SessionStatus
69sessionStatus bf ps = SessionStatus
70 { inprogress = M.empty
71 , bitfield = bf
72 , pieceSize = ps
73 } 215 }
74 216
75type StatusUpdates a = StateT SessionStatus IO a 217contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
76 218contentDownload = ContentDownload M.empty
77-- |
78runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a
79runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m)
80
81getBitfield :: MVar SessionStatus -> IO Bitfield
82getBitfield var = bitfield <$> readMVar var
83 219
84getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int 220--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
85getRequestQueueLength addr = do 221modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
86 m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress)
87 return $ L.sum $ L.map L.length m
88
89modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates ()
90modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
91 { inprogress = alter (g pieceSize) pix inprogress } 222 { inprogress = alter (g pieceSize) pix inprogress }
92 where 223 where
93 g s = h . f . fromMaybe (pieceEntry s) 224 g s = h . f . fromMaybe (pieceEntry s)
@@ -95,81 +226,70 @@ modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
95 | isEmpty e = Nothing 226 | isEmpty e = Nothing
96 | otherwise = Just e 227 | otherwise = Just e
97 228
98{----------------------------------------------------------------------- 229instance Download ContentDownload (Block BL.ByteString) where
99-- Piece download 230 scheduleBlocks n addr maskBF = do
100-----------------------------------------------------------------------} 231 ContentDownload {..} <- get
232 let wantPieces = maskBF `BF.difference` bitfield
233 let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
234 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
235 inprogress
101 236
102-- TODO choose block nearest to pending or stalled sets to reduce disk 237 bixs <- if L.null wantBlocks
103-- seeks on remote machines 238 then do
104chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] 239 mpix <- choosePiece wantPieces
105chooseBlocks xs n = return (L.take n xs) 240 case mpix of -- TODO return 'n' blocks
106 241 Nothing -> return []
107-- TODO use selection strategies from Exchange.Selector 242 Just pix -> return [leadingBlock pix defaultTransferSize]
108choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) 243 else chooseBlocks wantBlocks n
109choosePiece bf 244
110 | BF.null bf = return $ Nothing 245 forM_ bixs $ \ bix -> do
111 | otherwise = return $ Just $ BF.findMin bf 246 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
112 247 { pending = (addr, bix) : pending }
113scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] 248
114scheduleBlocks addr maskBF n = do 249 return bixs
115 SessionStatus {..} <- get 250 where
116 let wantPieces = maskBF `BF.difference` bitfield 251 -- TODO choose block nearest to pending or stalled sets to reduce disk
117 let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ 252 -- seeks on remote machines
118 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress 253 --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
119 254 chooseBlocks xs n = return (L.take n xs)
120 bixs <- if L.null wantBlocks 255
121 then do 256 -- TODO use selection strategies from Exchange.Selector
122 mpix <- choosePiece wantPieces 257 --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
123 case mpix of -- TODO return 'n' blocks 258 choosePiece bf
124 Nothing -> return [] 259 | BF.null bf = return $ Nothing
125 Just pix -> return [leadingBlock pix defaultTransferSize] 260 | otherwise = return $ Just $ BF.findMin bf
126 else chooseBlocks wantBlocks n 261
127 262 getRequestQueueLength addr = do
128 forM_ bixs $ \ bix -> do 263 m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
129 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e 264 return $ L.sum $ L.map L.length $ M.elems m
130 { pending = (addr, bix) : pending } 265
131 266 resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
132 return bixs 267 where
133 268 reset = fmap $ \ e -> e
134
135-- | Remove all pending block requests to the remote peer. May be used
136-- when:
137--
138-- * a peer closes connection;
139--
140-- * remote peer choked this peer;
141--
142-- * timeout expired.
143--
144resetPending :: PeerAddr IP -> StatusUpdates ()
145resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
146 where
147 reset = fmap $ \ e -> e
148 { pending = L.filter (not . (==) addr . fst) (pending e) } 269 { pending = L.filter (not . (==) addr . fst) (pending e) }
149 270
150-- | MAY write to storage, if a new piece have been completed. 271 pushBlock addr blk @ Block {..} = do
151pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) 272 mpe <- gets (M.lookup blkPiece . inprogress)
152pushBlock blk @ Block {..} storage = do 273 case mpe of
153 mpe <- gets (M.lookup blkPiece . inprogress) 274 Nothing -> return Nothing
154 case mpe of 275 Just (pe @ PieceEntry {..})
155 Nothing -> return Nothing 276 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
156 Just (pe @ PieceEntry {..}) 277 | otherwise -> do
157 | blockIx blk `L.notElem` fmap snd pending -> return Nothing 278 let bkt' = Block.insertLazy blkOffset blkData stalled
158 | otherwise -> do 279 case toPiece bkt' of
159 let bkt' = Block.insertLazy blkOffset blkData stalled 280 Nothing -> do
160 case toPiece bkt' of 281 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
161 Nothing -> do 282 { pending = L.filter ((==) (blockIx blk) . snd) pending
162 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e 283 , stalled = bkt'
163 { pending = L.filter ((==) (blockIx blk) . snd) pending 284 }
164 , stalled = bkt' 285 return (Just False)
165 } 286
166 return (Just False) 287 Just pieceData -> do
167 288 -- TODO verify
168 Just pieceData -> do 289 storage <- gets contentStorage
169 -- TODO verify 290 liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
170 liftIO $ writePiece (Piece blkPiece pieceData) storage 291 modify $ \ s @ ContentDownload {..} -> s
171 modify $ \ s @ SessionStatus {..} -> s 292 { inprogress = M.delete blkPiece inprogress
172 { inprogress = M.delete blkPiece inprogress 293 , bitfield = BF.insert blkPiece bitfield
173 , bitfield = BF.insert blkPiece bitfield 294 }
174 } 295 return (Just True)
175 return (Just True)