1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
|
-- |
-- Copyright : (c) Sam Truzjan 2013
-- License : BSD3
-- Maintainer : pxqr.sta@gmail.com
-- Stability : experimental
-- Portability : portable
--
--
--
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.BitTorrent.Exchange.Download
( -- * Downloading
Download (..)
, Updates
, runDownloadUpdates
-- ** Metadata
-- $metadata-download
, MetadataDownload
, metadataDownload
-- ** Content
-- $content-download
, ContentDownload
, contentDownload
) where
import Control.Applicative
import Control.Concurrent
import Control.Lens
import Control.Monad.State
import Data.BEncode as BE
import Data.ByteString as BS
import Data.ByteString.Lazy as BL
import Data.Default
import Data.List as L
import Data.Maybe
import Data.Map as M
import Data.Tuple
import Data.Torrent as Torrent
import Network.BitTorrent.Address
import Network.BitTorrent.Exchange.Bitfield as BF
import Network.BitTorrent.Exchange.Block as Block
import Network.BitTorrent.Exchange.Message as Msg
import System.Torrent.Storage (Storage, writePiece)
{-----------------------------------------------------------------------
-- Class
-----------------------------------------------------------------------}
type Updates s a = StateT s IO a
runDownloadUpdates :: MVar s -> Updates s a -> IO a
runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)
class Download s chunk | s -> chunk where
scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]
-- |
scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf
-- | Get number of sent requests to this peer.
getRequestQueueLength :: PeerAddr IP -> Updates s Int
-- | Remove all pending block requests to the remote peer. May be used
-- when:
--
-- * a peer closes connection;
--
-- * remote peer choked this peer;
--
-- * timeout expired.
--
resetPending :: PeerAddr IP -> Updates s ()
-- | MAY write to storage, if a new piece have been completed.
--
-- You should check if a returned by peer block is actually have
-- been requested and in-flight. This is needed to avoid "I send
-- random corrupted block" attacks.
pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)
{-----------------------------------------------------------------------
-- Metadata download
-----------------------------------------------------------------------}
-- $metadata-download
-- TODO
data MetadataDownload = MetadataDownload
{ _pendingPieces :: [(PeerAddr IP, PieceIx)]
, _bucket :: Bucket
, _topic :: InfoHash
}
makeLenses ''MetadataDownload
-- | Create a new scheduler for infodict of the given size.
metadataDownload :: Int -> InfoHash -> MetadataDownload
metadataDownload ps = MetadataDownload [] (Block.empty ps)
instance Default MetadataDownload where
def = error "instance Default MetadataDownload"
--cancelPending :: PieceIx -> Updates ()
cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)
instance Download MetadataDownload (Piece BS.ByteString) where
scheduleBlock addr bf = do
bkt <- use bucket
case spans metadataPieceSize bkt of
[] -> return Nothing
((off, _ ) : _) -> do
let pix = off `div` metadataPieceSize
pendingPieces %= ((addr, pix) :)
return (Just (BlockIx pix 0 metadataPieceSize))
resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)
pushBlock addr Torrent.Piece {..} = do
p <- use pendingPieces
when ((addr, pieceIndex) `L.notElem` p) $
error "not requested"
cancelPending pieceIndex
bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
b <- use bucket
case toPiece b of
Nothing -> return Nothing
Just chunks -> do
t <- use topic
case parseInfoDict (BL.toStrict chunks) t of
Right x -> do
pendingPieces .= []
return undefined -- (Just x)
Left e -> do
pendingPieces .= []
bucket .= Block.empty (Block.size b)
return undefined -- Nothing
where
-- todo use incremental parsing to avoid BS.concat call
parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
parseInfoDict chunk topic =
case BE.decode chunk of
Right (infodict @ InfoDict {..})
| topic == idInfoHash -> return infodict
| otherwise -> Left "broken infodict"
Left err -> Left $ "unable to parse infodict " ++ err
{-----------------------------------------------------------------------
-- Content download
-----------------------------------------------------------------------}
-- $content-download
--
-- A block can have one of the following status:
--
-- 1) /not allowed/: Piece is not in download set.
--
-- 2) /waiting/: (allowed?) Block have been allowed to download,
-- but /this/ peer did not send any 'Request' message for this
-- block. To allow some piece use
-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
-- and 'allowPiece'.
--
-- 3) /inflight/: (pending?) Block have been requested but
-- /remote/ peer did not send any 'Piece' message for this block.
-- Related functions 'markInflight'
--
-- 4) /pending/: (stalled?) Block have have been downloaded
-- Related functions 'insertBlock'.
--
-- Piece status:
--
-- 1) /assembled/: (downloaded?) All blocks in piece have been
-- downloaded but the piece did not verified yet.
--
-- * Valid: go to completed;
--
-- * Invalid: go to waiting.
--
-- 2) /corrupted/:
--
-- 3) /downloaded/: (verified?) A piece have been successfully
-- verified via the hash. Usually the piece should be stored to
-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
-- messages to the /remote/ peers.
--
data PieceEntry = PieceEntry
{ pending :: [(PeerAddr IP, BlockIx)]
, stalled :: Bucket
}
pieceEntry :: PieceSize -> PieceEntry
pieceEntry s = PieceEntry [] (Block.empty s)
isEmpty :: PieceEntry -> Bool
isEmpty PieceEntry {..} = L.null pending && Block.null stalled
_holes :: PieceIx -> PieceEntry -> [BlockIx]
_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
where
mkBlockIx (off, sz) = BlockIx pix off sz
data ContentDownload = ContentDownload
{ inprogress :: !(Map PieceIx PieceEntry)
, bitfield :: !Bitfield
, pieceSize :: !PieceSize
, contentStorage :: Storage
}
contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
contentDownload = ContentDownload M.empty
--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
{ inprogress = alter (g pieceSize) pix inprogress }
where
g s = h . f . fromMaybe (pieceEntry s)
h e
| isEmpty e = Nothing
| otherwise = Just e
instance Download ContentDownload (Block BL.ByteString) where
scheduleBlocks n addr maskBF = do
ContentDownload {..} <- get
let wantPieces = maskBF `BF.difference` bitfield
let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
inprogress
bixs <- if L.null wantBlocks
then do
mpix <- choosePiece wantPieces
case mpix of -- TODO return 'n' blocks
Nothing -> return []
Just pix -> return [leadingBlock pix defaultTransferSize]
else chooseBlocks wantBlocks n
forM_ bixs $ \ bix -> do
modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
{ pending = (addr, bix) : pending }
return bixs
where
-- TODO choose block nearest to pending or stalled sets to reduce disk
-- seeks on remote machines
--chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
chooseBlocks xs n = return (L.take n xs)
-- TODO use selection strategies from Exchange.Selector
--choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
choosePiece bf
| BF.null bf = return $ Nothing
| otherwise = return $ Just $ BF.findMin bf
getRequestQueueLength addr = do
m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
return $ L.sum $ L.map L.length $ M.elems m
resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
where
reset = fmap $ \ e -> e
{ pending = L.filter (not . (==) addr . fst) (pending e) }
pushBlock addr blk @ Block {..} = do
mpe <- gets (M.lookup blkPiece . inprogress)
case mpe of
Nothing -> return Nothing
Just (pe @ PieceEntry {..})
| blockIx blk `L.notElem` fmap snd pending -> return Nothing
| otherwise -> do
let bkt' = Block.insertLazy blkOffset blkData stalled
case toPiece bkt' of
Nothing -> do
modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
{ pending = L.filter ((==) (blockIx blk) . snd) pending
, stalled = bkt'
}
return (Just False)
Just pieceData -> do
-- TODO verify
storage <- gets contentStorage
liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
modify $ \ s @ ContentDownload {..} -> s
{ inprogress = M.delete blkPiece inprogress
, bitfield = BF.insert blkPiece bitfield
}
return (Just True)
|