diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Download.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Download.hs | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs new file mode 100644 index 00000000..fcc94485 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Download.hs | |||
@@ -0,0 +1,175 @@ | |||
1 | module Network.BitTorrent.Exchange.Download | ||
2 | ( -- * Environment | ||
3 | StatusUpdates | ||
4 | , runStatusUpdates | ||
5 | |||
6 | -- * Status | ||
7 | , SessionStatus | ||
8 | , sessionStatus | ||
9 | |||
10 | -- * Query | ||
11 | , getBitfield | ||
12 | , getRequestQueueLength | ||
13 | |||
14 | -- * Control | ||
15 | , scheduleBlocks | ||
16 | , resetPending | ||
17 | , pushBlock | ||
18 | ) where | ||
19 | |||
20 | import Control.Applicative | ||
21 | import Control.Concurrent | ||
22 | import Control.Monad.State | ||
23 | import Data.ByteString.Lazy as BL | ||
24 | import Data.Default | ||
25 | import Data.List as L | ||
26 | import Data.Maybe | ||
27 | import Data.Map as M | ||
28 | import Data.Set as S | ||
29 | import Data.Tuple | ||
30 | |||
31 | import Data.Torrent | ||
32 | import Network.BitTorrent.Exchange.Bitfield as BF | ||
33 | import Network.BitTorrent.Address | ||
34 | import Network.BitTorrent.Exchange.Block as Block | ||
35 | import System.Torrent.Storage (Storage, writePiece) | ||
36 | |||
37 | |||
38 | {----------------------------------------------------------------------- | ||
39 | -- Piece entry | ||
40 | -----------------------------------------------------------------------} | ||
41 | |||
42 | data PieceEntry = PieceEntry | ||
43 | { pending :: [(PeerAddr IP, BlockIx)] | ||
44 | , stalled :: Bucket | ||
45 | } | ||
46 | |||
47 | pieceEntry :: PieceSize -> PieceEntry | ||
48 | pieceEntry s = PieceEntry [] (Block.empty s) | ||
49 | |||
50 | isEmpty :: PieceEntry -> Bool | ||
51 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | ||
52 | |||
53 | holes :: PieceIx -> PieceEntry -> [BlockIx] | ||
54 | holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | ||
55 | where | ||
56 | mkBlockIx (off, sz) = BlockIx pix off sz | ||
57 | |||
58 | {----------------------------------------------------------------------- | ||
59 | -- Session status | ||
60 | -----------------------------------------------------------------------} | ||
61 | |||
62 | data SessionStatus = SessionStatus | ||
63 | { inprogress :: !(Map PieceIx PieceEntry) | ||
64 | , bitfield :: !Bitfield | ||
65 | , pieceSize :: !PieceSize | ||
66 | } | ||
67 | |||
68 | sessionStatus :: Bitfield -> PieceSize -> SessionStatus | ||
69 | sessionStatus bf ps = SessionStatus | ||
70 | { inprogress = M.empty | ||
71 | , bitfield = bf | ||
72 | , pieceSize = ps | ||
73 | } | ||
74 | |||
75 | type StatusUpdates a = StateT SessionStatus IO a | ||
76 | |||
77 | -- | | ||
78 | runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a | ||
79 | runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
80 | |||
81 | getBitfield :: MVar SessionStatus -> IO Bitfield | ||
82 | getBitfield var = bitfield <$> readMVar var | ||
83 | |||
84 | getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int | ||
85 | getRequestQueueLength addr = do | ||
86 | m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
87 | return $ L.sum $ L.map L.length m | ||
88 | |||
89 | modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates () | ||
90 | modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | ||
91 | { inprogress = alter (g pieceSize) pix inprogress } | ||
92 | where | ||
93 | g s = h . f . fromMaybe (pieceEntry s) | ||
94 | h e | ||
95 | | isEmpty e = Nothing | ||
96 | | otherwise = Just e | ||
97 | |||
98 | {----------------------------------------------------------------------- | ||
99 | -- Piece download | ||
100 | -----------------------------------------------------------------------} | ||
101 | |||
102 | -- TODO choose block nearest to pending or stalled sets to reduce disk | ||
103 | -- seeks on remote machines | ||
104 | chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] | ||
105 | chooseBlocks xs n = return (L.take n xs) | ||
106 | |||
107 | -- TODO use selection strategies from Exchange.Selector | ||
108 | choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) | ||
109 | choosePiece bf | ||
110 | | BF.null bf = return $ Nothing | ||
111 | | otherwise = return $ Just $ BF.findMin bf | ||
112 | |||
113 | scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] | ||
114 | scheduleBlocks addr maskBF n = do | ||
115 | SessionStatus {..} <- get | ||
116 | let wantPieces = maskBF `BF.difference` bitfield | ||
117 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ | ||
118 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress | ||
119 | |||
120 | bixs <- if L.null wantBlocks | ||
121 | then do | ||
122 | mpix <- choosePiece wantPieces | ||
123 | case mpix of -- TODO return 'n' blocks | ||
124 | Nothing -> return [] | ||
125 | Just pix -> return [leadingBlock pix defaultTransferSize] | ||
126 | else chooseBlocks wantBlocks n | ||
127 | |||
128 | forM_ bixs $ \ bix -> do | ||
129 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | ||
130 | { pending = (addr, bix) : pending } | ||
131 | |||
132 | return bixs | ||
133 | |||
134 | |||
135 | -- | Remove all pending block requests to the remote peer. May be used | ||
136 | -- when: | ||
137 | -- | ||
138 | -- * a peer closes connection; | ||
139 | -- | ||
140 | -- * remote peer choked this peer; | ||
141 | -- | ||
142 | -- * timeout expired. | ||
143 | -- | ||
144 | resetPending :: PeerAddr IP -> StatusUpdates () | ||
145 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
146 | where | ||
147 | reset = fmap $ \ e -> e | ||
148 | { pending = L.filter (not . (==) addr . fst) (pending e) } | ||
149 | |||
150 | -- | MAY write to storage, if a new piece have been completed. | ||
151 | pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) | ||
152 | pushBlock blk @ Block {..} storage = do | ||
153 | mpe <- gets (M.lookup blkPiece . inprogress) | ||
154 | case mpe of | ||
155 | Nothing -> return Nothing | ||
156 | Just (pe @ PieceEntry {..}) | ||
157 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | ||
158 | | otherwise -> do | ||
159 | let bkt' = Block.insertLazy blkOffset blkData stalled | ||
160 | case toPiece bkt' of | ||
161 | Nothing -> do | ||
162 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | ||
163 | { pending = L.filter ((==) (blockIx blk) . snd) pending | ||
164 | , stalled = bkt' | ||
165 | } | ||
166 | return (Just False) | ||
167 | |||
168 | Just pieceData -> do | ||
169 | -- TODO verify | ||
170 | liftIO $ writePiece (Piece blkPiece pieceData) storage | ||
171 | modify $ \ s @ SessionStatus {..} -> s | ||
172 | { inprogress = M.delete blkPiece inprogress | ||
173 | , bitfield = BF.insert blkPiece bitfield | ||
174 | } | ||
175 | return (Just True) | ||