summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-04-17 15:27:43 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-04-17 15:27:43 +0400
commitcb75f50f4cae778d1dfc57edff771a5145dd9894 (patch)
tree0d8133e881866a10138c35eac058cbb09f0148b5
parent6bd297aee69a74d19f81f240d170d9bca81c96cf (diff)
[Exchange] Move all download stuff to single module
-rw-r--r--bittorrent.cabal6
-rw-r--r--src/Network/BitTorrent/Exchange/Assembler.hs168
-rw-r--r--src/Network/BitTorrent/Exchange/Download.hs376
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs17
-rw-r--r--src/Network/BitTorrent/Exchange/Session/Metadata.hs102
-rw-r--r--tests/Network/BitTorrent/Exchange/DownloadSpec.hs (renamed from tests/Network/BitTorrent/Exchange/Session/MetadataSpec.hs)17
6 files changed, 268 insertions, 418 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index d8cf0a01..881361f8 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -63,15 +63,13 @@ library
63 Network.BitTorrent.DHT.Session 63 Network.BitTorrent.DHT.Session
64 Network.BitTorrent.DHT.Token 64 Network.BitTorrent.DHT.Token
65 Network.BitTorrent.Exchange 65 Network.BitTorrent.Exchange
66 Network.BitTorrent.Exchange.Assembler
67 Network.BitTorrent.Exchange.Bitfield 66 Network.BitTorrent.Exchange.Bitfield
68 Network.BitTorrent.Exchange.Block 67 Network.BitTorrent.Exchange.Block
69 Network.BitTorrent.Exchange.Connection 68 Network.BitTorrent.Exchange.Connection
69 Network.BitTorrent.Exchange.Download
70 Network.BitTorrent.Exchange.Manager 70 Network.BitTorrent.Exchange.Manager
71 Network.BitTorrent.Exchange.Message 71 Network.BitTorrent.Exchange.Message
72 Network.BitTorrent.Exchange.Session 72 Network.BitTorrent.Exchange.Session
73 Network.BitTorrent.Exchange.Session.Metadata
74 Network.BitTorrent.Exchange.Session.Status
75 Network.BitTorrent.Tracker 73 Network.BitTorrent.Tracker
76 Network.BitTorrent.Tracker.List 74 Network.BitTorrent.Tracker.List
77 Network.BitTorrent.Tracker.Message 75 Network.BitTorrent.Tracker.Message
@@ -207,9 +205,9 @@ test-suite spec
207 Network.BitTorrent.Tracker.SessionSpec 205 Network.BitTorrent.Tracker.SessionSpec
208 Network.BitTorrent.Exchange.BitfieldSpec 206 Network.BitTorrent.Exchange.BitfieldSpec
209 Network.BitTorrent.Exchange.ConnectionSpec 207 Network.BitTorrent.Exchange.ConnectionSpec
208 Network.BitTorrent.Exchange.DownloadSpec
210 Network.BitTorrent.Exchange.MessageSpec 209 Network.BitTorrent.Exchange.MessageSpec
211 Network.BitTorrent.Exchange.SessionSpec 210 Network.BitTorrent.Exchange.SessionSpec
212 Network.BitTorrent.Exchange.Session.MetadataSpec
213 System.Torrent.StorageSpec 211 System.Torrent.StorageSpec
214 System.Torrent.FileMapSpec 212 System.Torrent.FileMapSpec
215 build-depends: base == 4.* 213 build-depends: base == 4.*
diff --git a/src/Network/BitTorrent/Exchange/Assembler.hs b/src/Network/BitTorrent/Exchange/Assembler.hs
deleted file mode 100644
index 7abb8ab0..00000000
--- a/src/Network/BitTorrent/Exchange/Assembler.hs
+++ /dev/null
@@ -1,168 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Assembler is used to build pieces from blocks. In general
9-- 'Assembler' should be used to handle 'Transfer' messages when
10--
11-- A block can have one of the following status:
12--
13-- 1) /not allowed/: Piece is not in download set. 'null' and 'empty'.
14--
15--
16-- 2) /waiting/: (allowed?) Block have been allowed to download,
17-- but /this/ peer did not send any 'Request' message for this
18-- block. To allow some piece use
19-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
20-- and 'allowPiece'.
21--
22-- 3) /inflight/: (pending?) Block have been requested but
23-- /remote/ peer did not send any 'Piece' message for this block.
24-- Related functions 'markInflight'
25--
26-- 4) /pending/: (stalled?) Block have have been downloaded
27-- Related functions 'insertBlock'.
28--
29-- Piece status:
30--
31-- 1) /assembled/: (downloaded?) All blocks in piece have been
32-- downloaded but the piece did not verified yet.
33--
34-- * Valid: go to completed;
35--
36-- * Invalid: go to waiting.
37--
38-- 2) /corrupted/:
39--
40-- 3) /downloaded/: (verified?) A piece have been successfully
41-- verified via the hash. Usually the piece should be stored to
42-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
43-- messages to the /remote/ peers.
44--
45{-# LANGUAGE TemplateHaskell #-}
46module Network.BitTorrent.Exchange.Assembler
47 ( -- * Assembler
48 Assembler
49
50 -- * Query
51 , Network.BitTorrent.Exchange.Assembler.null
52 , Network.BitTorrent.Exchange.Assembler.size
53
54 -- *
55 , Network.BitTorrent.Exchange.Assembler.empty
56 , allowPiece
57
58 -- * Debugging
59 , Network.BitTorrent.Exchange.Assembler.valid
60 ) where
61
62import Control.Applicative
63import Control.Lens
64import Data.IntMap.Strict as IM
65import Data.List as L
66import Data.Map as M
67import Data.Maybe
68import Data.IP
69
70import Data.Torrent
71import Network.BitTorrent.Address
72import Network.BitTorrent.Exchange.Block as B
73
74{-----------------------------------------------------------------------
75-- Assembler
76-----------------------------------------------------------------------}
77
78type Timestamp = ()
79{-
80data BlockRequest = BlockRequest
81 { requestSent :: Timestamp
82 , requestedPeer :: PeerAddr IP
83 , requestedBlock :: BlockIx
84 }
85-}
86type BlockRange = (BlockOffset, BlockSize)
87type PieceMap = IntMap
88
89data Assembler = Assembler
90 { -- | A set of blocks that have been 'Request'ed but not yet acked.
91 _inflight :: Map (PeerAddr IP) (PieceMap [BlockRange])
92
93 -- | A set of blocks that but not yet assembled.
94 , _pending :: PieceMap Bucket
95
96 -- | Used for validation of assembled pieces.
97 , info :: PieceInfo
98 }
99
100$(makeLenses ''Assembler)
101
102
103valid :: Assembler -> Bool
104valid = undefined
105
106data Result a
107 = Completed (Piece a)
108 | Corrupted PieceIx
109 | NotRequested PieceIx
110 | Overlapped BlockIx
111
112null :: Assembler -> Bool
113null = undefined
114
115size :: Assembler -> Bool
116size = undefined
117
118empty :: PieceInfo -> Assembler
119empty = Assembler M.empty IM.empty
120
121allowPiece :: PieceIx -> Assembler -> Assembler
122allowPiece pix a @ Assembler {..} = over pending (IM.insert pix bkt) a
123 where
124 bkt = B.empty (piPieceLength info)
125
126allowedSet :: (PeerAddr IP) -> Assembler -> [BlockIx]
127allowedSet = undefined
128
129--inflight :: PeerAddr -> BlockIx -> Assembler -> Assembler
130--inflight = undefined
131
132-- You should check if a returned by peer block is actually have
133-- been requested and in-flight. This is needed to avoid "I send
134-- random corrupted block" attacks.
135insert :: PeerAddr IP -> Block a -> Assembler -> Assembler
136insert = undefined
137
138{-
139insert :: Block a -> Assembler a -> (Assembler a, Maybe (Result a))
140insert blk @ Block {..} a @ Assembler {..} = undefined
141{-
142 = let (pending, mpiece) = inserta blk piecePending
143 in (Assembler inflightSet pending pieceInfo, f <$> mpiece)
144 where
145 f p = undefined
146-- | checkPieceLazy pieceInfo p = Assembled p
147-- | otherwise = Corrupted ixPiece
148-}
149
150
151inflightPieces :: Assembler a -> [PieceIx]
152inflightPieces Assembler {..} = IM.keys piecePending
153
154completeBlocks :: PieceIx -> Assembler a -> [Block a]
155completeBlocks pix Assembler {..} = fromMaybe [] $ IM.lookup pix piecePending
156
157incompleteBlocks :: PieceIx -> Assembler a -> [BlockIx]
158incompleteBlocks = undefined
159
160nextBlock :: Assembler a -> Maybe (Assembler a, BlockIx)
161nextBlock Assembler {..} = undefined
162
163inserta :: Block a
164 -> PieceMap [Block a]
165 -> (PieceMap [Block a], Maybe (Piece a))
166inserta = undefined
167
168-}
diff --git a/src/Network/BitTorrent/Exchange/Download.hs b/src/Network/BitTorrent/Exchange/Download.hs
index fcc94485..9a6b5f91 100644
--- a/src/Network/BitTorrent/Exchange/Download.hs
+++ b/src/Network/BitTorrent/Exchange/Download.hs
@@ -1,44 +1,196 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8--
9--
10{-# LANGUAGE FlexibleInstances #-}
11{-# LANGUAGE MultiParamTypeClasses #-}
12{-# LANGUAGE FunctionalDependencies #-}
13{-# LANGUAGE TemplateHaskell #-}
1module Network.BitTorrent.Exchange.Download 14module Network.BitTorrent.Exchange.Download
2 ( -- * Environment 15 ( -- * Downloading
3 StatusUpdates 16 Download (..)
4 , runStatusUpdates 17 , Updates
5 18 , runDownloadUpdates
6 -- * Status 19
7 , SessionStatus 20 -- ** Metadata
8 , sessionStatus 21 -- $metadata-download
9 22 , MetadataDownload
10 -- * Query 23 , metadataDownload
11 , getBitfield 24
12 , getRequestQueueLength 25 -- ** Content
13 26 -- $content-download
14 -- * Control 27 , ContentDownload
15 , scheduleBlocks 28 , contentDownload
16 , resetPending
17 , pushBlock
18 ) where 29 ) where
19 30
20import Control.Applicative 31import Control.Applicative
21import Control.Concurrent 32import Control.Concurrent
33import Control.Lens
22import Control.Monad.State 34import Control.Monad.State
35import Data.BEncode as BE
36import Data.ByteString as BS
23import Data.ByteString.Lazy as BL 37import Data.ByteString.Lazy as BL
24import Data.Default 38import Data.Default
25import Data.List as L 39import Data.List as L
26import Data.Maybe 40import Data.Maybe
27import Data.Map as M 41import Data.Map as M
28import Data.Set as S
29import Data.Tuple 42import Data.Tuple
30 43
31import Data.Torrent 44import Data.Torrent as Torrent
32import Network.BitTorrent.Exchange.Bitfield as BF
33import Network.BitTorrent.Address 45import Network.BitTorrent.Address
34import Network.BitTorrent.Exchange.Block as Block 46import Network.BitTorrent.Exchange.Bitfield as BF
47import Network.BitTorrent.Exchange.Block as Block
48import Network.BitTorrent.Exchange.Message as Msg
35import System.Torrent.Storage (Storage, writePiece) 49import System.Torrent.Storage (Storage, writePiece)
36 50
37 51
38{----------------------------------------------------------------------- 52{-----------------------------------------------------------------------
39-- Piece entry 53-- Class
40-----------------------------------------------------------------------} 54-----------------------------------------------------------------------}
41 55
56type Updates s a = StateT s IO a
57
58runDownloadUpdates :: MVar s -> Updates s a -> IO a
59runDownloadUpdates var m = modifyMVar var (fmap swap . runStateT m)
60
61class Download s chunk | s -> chunk where
62 scheduleBlocks :: Int -> PeerAddr IP -> Bitfield -> Updates s [BlockIx]
63
64 -- |
65 scheduleBlock :: PeerAddr IP -> Bitfield -> Updates s (Maybe BlockIx)
66 scheduleBlock addr bf = listToMaybe <$> scheduleBlocks 1 addr bf
67
68 -- | Get number of sent requests to this peer.
69 getRequestQueueLength :: PeerAddr IP -> Updates s Int
70
71 -- | Remove all pending block requests to the remote peer. May be used
72 -- when:
73 --
74 -- * a peer closes connection;
75 --
76 -- * remote peer choked this peer;
77 --
78 -- * timeout expired.
79 --
80 resetPending :: PeerAddr IP -> Updates s ()
81
82 -- | MAY write to storage, if a new piece have been completed.
83 --
84 -- You should check if a returned by peer block is actually have
85 -- been requested and in-flight. This is needed to avoid "I send
86 -- random corrupted block" attacks.
87 pushBlock :: PeerAddr IP -> chunk -> Updates s (Maybe Bool)
88
89{-----------------------------------------------------------------------
90-- Metadata download
91-----------------------------------------------------------------------}
92-- $metadata-download
93-- TODO
94
95data MetadataDownload = MetadataDownload
96 { _pendingPieces :: [(PeerAddr IP, PieceIx)]
97 , _bucket :: Bucket
98 , _topic :: InfoHash
99 }
100
101makeLenses ''MetadataDownload
102
103-- | Create a new scheduler for infodict of the given size.
104metadataDownload :: Int -> InfoHash -> MetadataDownload
105metadataDownload ps = MetadataDownload [] (Block.empty ps)
106
107instance Default MetadataDownload where
108 def = error "instance Default MetadataDownload"
109
110--cancelPending :: PieceIx -> Updates ()
111cancelPending pix = pendingPieces %= L.filter ((pix ==) . snd)
112
113instance Download MetadataDownload (Piece BS.ByteString) where
114 scheduleBlock addr bf = do
115 bkt <- use bucket
116 case spans metadataPieceSize bkt of
117 [] -> return Nothing
118 ((off, _ ) : _) -> do
119 let pix = off `div` metadataPieceSize
120 pendingPieces %= ((addr, pix) :)
121 return (Just (BlockIx pix 0 metadataPieceSize))
122
123 resetPending addr = pendingPieces %= L.filter ((addr ==) . fst)
124
125 pushBlock addr Torrent.Piece {..} = do
126 p <- use pendingPieces
127 when ((addr, pieceIndex) `L.notElem` p) $
128 error "not requested"
129 cancelPending pieceIndex
130
131 bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
132 b <- use bucket
133 case toPiece b of
134 Nothing -> return Nothing
135 Just chunks -> do
136 t <- use topic
137 case parseInfoDict (BL.toStrict chunks) t of
138 Right x -> do
139 pendingPieces .= []
140 return undefined -- (Just x)
141 Left e -> do
142 pendingPieces .= []
143 bucket .= Block.empty (Block.size b)
144 return undefined -- Nothing
145 where
146 -- todo use incremental parsing to avoid BS.concat call
147 parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
148 parseInfoDict chunk topic =
149 case BE.decode chunk of
150 Right (infodict @ InfoDict {..})
151 | topic == idInfoHash -> return infodict
152 | otherwise -> Left "broken infodict"
153 Left err -> Left $ "unable to parse infodict " ++ err
154
155{-----------------------------------------------------------------------
156-- Content download
157-----------------------------------------------------------------------}
158-- $content-download
159--
160-- A block can have one of the following status:
161--
162-- 1) /not allowed/: Piece is not in download set.
163--
164-- 2) /waiting/: (allowed?) Block have been allowed to download,
165-- but /this/ peer did not send any 'Request' message for this
166-- block. To allow some piece use
167-- 'Network.BitTorrent.Exchange.Selector' and then 'allowedSet'
168-- and 'allowPiece'.
169--
170-- 3) /inflight/: (pending?) Block have been requested but
171-- /remote/ peer did not send any 'Piece' message for this block.
172-- Related functions 'markInflight'
173--
174-- 4) /pending/: (stalled?) Block have have been downloaded
175-- Related functions 'insertBlock'.
176--
177-- Piece status:
178--
179-- 1) /assembled/: (downloaded?) All blocks in piece have been
180-- downloaded but the piece did not verified yet.
181--
182-- * Valid: go to completed;
183--
184-- * Invalid: go to waiting.
185--
186-- 2) /corrupted/:
187--
188-- 3) /downloaded/: (verified?) A piece have been successfully
189-- verified via the hash. Usually the piece should be stored to
190-- the 'System.Torrent.Storage' and /this/ peer should send 'Have'
191-- messages to the /remote/ peers.
192--
193
42data PieceEntry = PieceEntry 194data PieceEntry = PieceEntry
43 { pending :: [(PeerAddr IP, BlockIx)] 195 { pending :: [(PeerAddr IP, BlockIx)]
44 , stalled :: Bucket 196 , stalled :: Bucket
@@ -50,44 +202,23 @@ pieceEntry s = PieceEntry [] (Block.empty s)
50isEmpty :: PieceEntry -> Bool 202isEmpty :: PieceEntry -> Bool
51isEmpty PieceEntry {..} = L.null pending && Block.null stalled 203isEmpty PieceEntry {..} = L.null pending && Block.null stalled
52 204
53holes :: PieceIx -> PieceEntry -> [BlockIx] 205_holes :: PieceIx -> PieceEntry -> [BlockIx]
54holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) 206_holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
55 where 207 where
56 mkBlockIx (off, sz) = BlockIx pix off sz 208 mkBlockIx (off, sz) = BlockIx pix off sz
57 209
58{----------------------------------------------------------------------- 210data ContentDownload = ContentDownload
59-- Session status 211 { inprogress :: !(Map PieceIx PieceEntry)
60-----------------------------------------------------------------------} 212 , bitfield :: !Bitfield
61 213 , pieceSize :: !PieceSize
62data SessionStatus = SessionStatus 214 , contentStorage :: Storage
63 { inprogress :: !(Map PieceIx PieceEntry)
64 , bitfield :: !Bitfield
65 , pieceSize :: !PieceSize
66 }
67
68sessionStatus :: Bitfield -> PieceSize -> SessionStatus
69sessionStatus bf ps = SessionStatus
70 { inprogress = M.empty
71 , bitfield = bf
72 , pieceSize = ps
73 } 215 }
74 216
75type StatusUpdates a = StateT SessionStatus IO a 217contentDownload :: Bitfield -> PieceSize -> Storage -> ContentDownload
76 218contentDownload = ContentDownload M.empty
77-- |
78runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a
79runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m)
80
81getBitfield :: MVar SessionStatus -> IO Bitfield
82getBitfield var = bitfield <$> readMVar var
83 219
84getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int 220--modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> DownloadUpdates ()
85getRequestQueueLength addr = do 221modifyEntry pix f = modify $ \ s @ ContentDownload {..} -> s
86 m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress)
87 return $ L.sum $ L.map L.length m
88
89modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates ()
90modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
91 { inprogress = alter (g pieceSize) pix inprogress } 222 { inprogress = alter (g pieceSize) pix inprogress }
92 where 223 where
93 g s = h . f . fromMaybe (pieceEntry s) 224 g s = h . f . fromMaybe (pieceEntry s)
@@ -95,81 +226,70 @@ modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
95 | isEmpty e = Nothing 226 | isEmpty e = Nothing
96 | otherwise = Just e 227 | otherwise = Just e
97 228
98{----------------------------------------------------------------------- 229instance Download ContentDownload (Block BL.ByteString) where
99-- Piece download 230 scheduleBlocks n addr maskBF = do
100-----------------------------------------------------------------------} 231 ContentDownload {..} <- get
232 let wantPieces = maskBF `BF.difference` bitfield
233 let wantBlocks = L.concat $ M.elems $ M.mapWithKey _holes $
234 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces)
235 inprogress
101 236
102-- TODO choose block nearest to pending or stalled sets to reduce disk 237 bixs <- if L.null wantBlocks
103-- seeks on remote machines 238 then do
104chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] 239 mpix <- choosePiece wantPieces
105chooseBlocks xs n = return (L.take n xs) 240 case mpix of -- TODO return 'n' blocks
106 241 Nothing -> return []
107-- TODO use selection strategies from Exchange.Selector 242 Just pix -> return [leadingBlock pix defaultTransferSize]
108choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) 243 else chooseBlocks wantBlocks n
109choosePiece bf 244
110 | BF.null bf = return $ Nothing 245 forM_ bixs $ \ bix -> do
111 | otherwise = return $ Just $ BF.findMin bf 246 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
112 247 { pending = (addr, bix) : pending }
113scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] 248
114scheduleBlocks addr maskBF n = do 249 return bixs
115 SessionStatus {..} <- get 250 where
116 let wantPieces = maskBF `BF.difference` bitfield 251 -- TODO choose block nearest to pending or stalled sets to reduce disk
117 let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ 252 -- seeks on remote machines
118 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress 253 --chooseBlocks :: [BlockIx] -> Int -> DownloadUpdates [BlockIx]
119 254 chooseBlocks xs n = return (L.take n xs)
120 bixs <- if L.null wantBlocks 255
121 then do 256 -- TODO use selection strategies from Exchange.Selector
122 mpix <- choosePiece wantPieces 257 --choosePiece :: Bitfield -> DownloadUpdates (Maybe PieceIx)
123 case mpix of -- TODO return 'n' blocks 258 choosePiece bf
124 Nothing -> return [] 259 | BF.null bf = return $ Nothing
125 Just pix -> return [leadingBlock pix defaultTransferSize] 260 | otherwise = return $ Just $ BF.findMin bf
126 else chooseBlocks wantBlocks n 261
127 262 getRequestQueueLength addr = do
128 forM_ bixs $ \ bix -> do 263 m <- gets (M.map (L.filter ((==) addr . fst) . pending) . inprogress)
129 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e 264 return $ L.sum $ L.map L.length $ M.elems m
130 { pending = (addr, bix) : pending } 265
131 266 resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
132 return bixs 267 where
133 268 reset = fmap $ \ e -> e
134
135-- | Remove all pending block requests to the remote peer. May be used
136-- when:
137--
138-- * a peer closes connection;
139--
140-- * remote peer choked this peer;
141--
142-- * timeout expired.
143--
144resetPending :: PeerAddr IP -> StatusUpdates ()
145resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
146 where
147 reset = fmap $ \ e -> e
148 { pending = L.filter (not . (==) addr . fst) (pending e) } 269 { pending = L.filter (not . (==) addr . fst) (pending e) }
149 270
150-- | MAY write to storage, if a new piece have been completed. 271 pushBlock addr blk @ Block {..} = do
151pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) 272 mpe <- gets (M.lookup blkPiece . inprogress)
152pushBlock blk @ Block {..} storage = do 273 case mpe of
153 mpe <- gets (M.lookup blkPiece . inprogress) 274 Nothing -> return Nothing
154 case mpe of 275 Just (pe @ PieceEntry {..})
155 Nothing -> return Nothing 276 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
156 Just (pe @ PieceEntry {..}) 277 | otherwise -> do
157 | blockIx blk `L.notElem` fmap snd pending -> return Nothing 278 let bkt' = Block.insertLazy blkOffset blkData stalled
158 | otherwise -> do 279 case toPiece bkt' of
159 let bkt' = Block.insertLazy blkOffset blkData stalled 280 Nothing -> do
160 case toPiece bkt' of 281 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
161 Nothing -> do 282 { pending = L.filter ((==) (blockIx blk) . snd) pending
162 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e 283 , stalled = bkt'
163 { pending = L.filter ((==) (blockIx blk) . snd) pending 284 }
164 , stalled = bkt' 285 return (Just False)
165 } 286
166 return (Just False) 287 Just pieceData -> do
167 288 -- TODO verify
168 Just pieceData -> do 289 storage <- gets contentStorage
169 -- TODO verify 290 liftIO $ writePiece (Torrent.Piece blkPiece pieceData) storage
170 liftIO $ writePiece (Piece blkPiece pieceData) storage 291 modify $ \ s @ ContentDownload {..} -> s
171 modify $ \ s @ SessionStatus {..} -> s 292 { inprogress = M.delete blkPiece inprogress
172 { inprogress = M.delete blkPiece inprogress 293 , bitfield = BF.insert blkPiece bitfield
173 , bitfield = BF.insert blkPiece bitfield 294 }
174 } 295 return (Just True)
175 return (Just True)
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 49bff44f..30b7ed0e 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -51,9 +51,8 @@ import Network.BitTorrent.Address
51import Network.BitTorrent.Exchange.Bitfield as BF 51import Network.BitTorrent.Exchange.Bitfield as BF
52import Network.BitTorrent.Exchange.Block as Block 52import Network.BitTorrent.Exchange.Block as Block
53import Network.BitTorrent.Exchange.Connection 53import Network.BitTorrent.Exchange.Connection
54import Network.BitTorrent.Exchange.Download as SS 54import Network.BitTorrent.Exchange.Download as D
55import Network.BitTorrent.Exchange.Message as Message 55import Network.BitTorrent.Exchange.Message as Message
56import Network.BitTorrent.Exchange.Session.Metadata as Metadata
57import System.Torrent.Storage 56import System.Torrent.Storage
58 57
59{----------------------------------------------------------------------- 58{-----------------------------------------------------------------------
@@ -90,13 +89,13 @@ type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
90 89
91data SessionState 90data SessionState
92 = WaitingMetadata 91 = WaitingMetadata
93 { metadataDownload :: MVar Metadata.Status 92 { metadataDownload :: MVar MetadataDownload
94 , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters 93 , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters
95 , contentRootPath :: FilePath 94 , contentRootPath :: FilePath
96 } 95 }
97 | HavingMetadata 96 | HavingMetadata
98 { metadataCache :: Cached InfoDict 97 { metadataCache :: Cached InfoDict
99 , contentDownload :: MVar SessionStatus 98 , contentDownload :: MVar ContentDownload
100 , contentStorage :: Storage 99 , contentStorage :: Storage
101 } 100 }
102 101
@@ -105,8 +104,9 @@ newSessionState rootPath (Left ih ) = do
105 WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath 104 WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath
106newSessionState rootPath (Right dict) = do 105newSessionState rootPath (Right dict) = do
107 storage <- openInfoDict ReadWriteEx rootPath dict 106 storage <- openInfoDict ReadWriteEx rootPath dict
108 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) 107 download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage))
109 (piPieceLength (idPieceInfo dict)) 108 (piPieceLength (idPieceInfo dict))
109 storage
110 return $ HavingMetadata (cache dict) download storage 110 return $ HavingMetadata (cache dict) download storage
111 111
112closeSessionState :: SessionState -> IO () 112closeSessionState :: SessionState -> IO ()
@@ -116,8 +116,9 @@ closeSessionState HavingMetadata {..} = close contentStorage
116haveMetadata :: InfoDict -> SessionState -> IO SessionState 116haveMetadata :: InfoDict -> SessionState -> IO SessionState
117haveMetadata dict WaitingMetadata {..} = do 117haveMetadata dict WaitingMetadata {..} = do
118 storage <- openInfoDict ReadWriteEx contentRootPath dict 118 storage <- openInfoDict ReadWriteEx contentRootPath dict
119 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) 119 download <- newMVar $ D.contentDownload (BF.haveNone (totalPieces storage))
120 (piPieceLength (idPieceInfo dict)) 120 (piPieceLength (idPieceInfo dict))
121 storage
121 return HavingMetadata 122 return HavingMetadata
122 { metadataCache = cache dict 123 { metadataCache = cache dict
123 , contentDownload = download 124 , contentDownload = download
diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs
deleted file mode 100644
index f08ebe00..00000000
--- a/src/Network/BitTorrent/Exchange/Session/Metadata.hs
+++ /dev/null
@@ -1,102 +0,0 @@
1{-# LANGUAGE TemplateHaskell #-}
2module Network.BitTorrent.Exchange.Session.Metadata
3 ( -- * Transfer state
4 Status
5 , nullStatus
6
7 -- * State updates
8 , Updates
9 , runUpdates
10
11 -- * Piece transfer control
12 , scheduleBlock
13 , resetPending
14 , cancelPending
15 , pushBlock
16 ) where
17
18import Control.Concurrent
19import Control.Lens
20import Control.Monad.Reader
21import Control.Monad.State
22import Data.ByteString as BS
23import Data.ByteString.Lazy as BL
24import Data.Default
25import Data.List as L
26import Data.Tuple
27
28import Data.BEncode as BE
29import Data.Torrent as Torrent
30import Network.BitTorrent.Address
31import Network.BitTorrent.Exchange.Block as Block
32import Network.BitTorrent.Exchange.Message as Message hiding (Status)
33
34
35-- | Current transfer status.
36data Status = Status
37 { _pending :: [(PeerAddr IP, PieceIx)]
38 , _bucket :: Bucket
39 }
40
41makeLenses ''Status
42
43instance Default Status where
44 def = error "default status"
45
46-- | Create a new scheduler for infodict of the given size.
47nullStatus :: Int -> Status
48nullStatus ps = Status [] (Block.empty ps)
49
50type Updates = ReaderT (PeerAddr IP) (State Status)
51
52runUpdates :: MVar Status -> PeerAddr IP -> Updates a -> IO a
53runUpdates v a m = modifyMVar v (return . swap . runState (runReaderT m a))
54
55scheduleBlock :: Updates (Maybe PieceIx)
56scheduleBlock = do
57 addr <- ask
58 bkt <- use bucket
59 case spans metadataPieceSize bkt of
60 [] -> return Nothing
61 ((off, _ ) : _) -> do
62 let pix = off `div` metadataPieceSize
63 pending %= ((addr, pix) :)
64 return (Just pix)
65
66cancelPending :: PieceIx -> Updates ()
67cancelPending pix = pending %= L.filter ((pix ==) . snd)
68
69resetPending :: Updates ()
70resetPending = do
71 addr <- ask
72 pending %= L.filter ((addr ==) . fst)
73
74parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict
75parseInfoDict chunk topic =
76 case BE.decode chunk of
77 Right (infodict @ InfoDict {..})
78 | topic == idInfoHash -> return infodict
79 | otherwise -> Left "broken infodict"
80 Left err -> Left $ "unable to parse infodict " ++ err
81
82-- todo use incremental parsing to avoid BS.concat call
83pushBlock :: Torrent.Piece BS.ByteString -> InfoHash -> Updates (Maybe InfoDict)
84pushBlock Torrent.Piece {..} topic = do
85 addr <- ask
86 p <- use pending
87 when ((addr, pieceIndex) `L.notElem` p) $ error "not requested"
88 cancelPending pieceIndex
89
90 bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData
91 b <- use bucket
92 case toPiece b of
93 Nothing -> return Nothing
94 Just chunks ->
95 case parseInfoDict (BL.toStrict chunks) topic of
96 Right x -> do
97 pending .= []
98 return (Just x)
99 Left e -> do
100 pending .= []
101 bucket .= Block.empty (Block.size b)
102 return Nothing
diff --git a/tests/Network/BitTorrent/Exchange/Session/MetadataSpec.hs b/tests/Network/BitTorrent/Exchange/DownloadSpec.hs
index fc5236da..a0d40af3 100644
--- a/tests/Network/BitTorrent/Exchange/Session/MetadataSpec.hs
+++ b/tests/Network/BitTorrent/Exchange/DownloadSpec.hs
@@ -1,5 +1,5 @@
1{-# LANGUAGE RecordWildCards #-} 1{-# LANGUAGE RecordWildCards #-}
2module Network.BitTorrent.Exchange.Session.MetadataSpec (spec) where 2module Network.BitTorrent.Exchange.DownloadSpec (spec) where
3import Control.Concurrent 3import Control.Concurrent
4import Data.ByteString as BS 4import Data.ByteString as BS
5import Data.ByteString.Lazy as BL 5import Data.ByteString.Lazy as BL
@@ -9,8 +9,8 @@ import Test.QuickCheck
9import Data.BEncode as BE 9import Data.BEncode as BE
10import Data.Torrent as Torrent 10import Data.Torrent as Torrent
11import Network.BitTorrent.Address 11import Network.BitTorrent.Address
12import Network.BitTorrent.Exchange.Download
12import Network.BitTorrent.Exchange.Message 13import Network.BitTorrent.Exchange.Message
13import Network.BitTorrent.Exchange.Session.Metadata
14 14
15import Config 15import Config
16import Network.BitTorrent.CoreSpec () 16import Network.BitTorrent.CoreSpec ()
@@ -24,14 +24,15 @@ chunkBy s bs
24 | BS.null bs = [] 24 | BS.null bs = []
25 | otherwise = BS.take s bs : chunkBy s (BS.drop s bs) 25 | otherwise = BS.take s bs : chunkBy s (BS.drop s bs)
26 26
27withUpdates :: Updates a -> IO a 27withUpdates :: Updates s a -> IO a
28withUpdates m = do 28withUpdates m = do
29 Torrent {..} <- getTestTorrent 29 Torrent {..} <- getTestTorrent
30 let infoDictLen = fromIntegral $ BL.length $ BE.encode tInfoDict 30 let infoDictLen = fromIntegral $ BL.length $ BE.encode tInfoDict
31 mvar <- newMVar (nullStatus infoDictLen) 31 --mvar <- newMVar (nullStatus infoDictLen)
32 runUpdates mvar placeholderAddr m 32 --runUpdates mvar placeholderAddr m
33 undefined
33 34
34simulateFetch :: InfoDict -> Updates (Maybe InfoDict) 35simulateFetch :: InfoDict -> Updates s (Maybe InfoDict)
35simulateFetch dict = go 36simulateFetch dict = go
36 where 37 where
37 blocks = chunkBy metadataPieceSize (BL.toStrict (BE.encode dict)) 38 blocks = chunkBy metadataPieceSize (BL.toStrict (BE.encode dict))
@@ -39,11 +40,11 @@ simulateFetch dict = go
39 ih = idInfoHash dict 40 ih = idInfoHash dict
40 41
41 go = do 42 go = do
42 mix <- scheduleBlock 43 mix <- scheduleBlock undefined undefined
43 case mix of 44 case mix of
44 Nothing -> return Nothing 45 Nothing -> return Nothing
45 Just ix -> do 46 Just ix -> do
46 mdict <- pushBlock (packPiece ix) ih 47 mdict <- pushBlock undefined (packPiece ix)
47 maybe go (return . Just) mdict 48 maybe go (return . Just) mdict
48 49
49spec :: Spec 50spec :: Spec