summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Session/Status.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session/Status.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Session/Status.hs175
1 files changed, 175 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session/Status.hs b/src/Network/BitTorrent/Exchange/Session/Status.hs
new file mode 100644
index 00000000..565c3bf3
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Session/Status.hs
@@ -0,0 +1,175 @@
1module Network.BitTorrent.Exchange.Session.Status
2 ( -- * Environment
3 StatusUpdates
4 , runStatusUpdates
5
6 -- * Status
7 , SessionStatus
8 , sessionStatus
9
10 -- * Query
11 , getBitfield
12 , getRequestQueueLength
13
14 -- * Control
15 , scheduleBlocks
16 , resetPending
17 , pushBlock
18 ) where
19
20import Control.Applicative
21import Control.Concurrent
22import Control.Monad.State
23import Data.ByteString.Lazy as BL
24import Data.Default
25import Data.List as L
26import Data.Maybe
27import Data.Map as M
28import Data.Set as S
29import Data.Tuple
30
31import Data.Torrent.Piece
32import Data.Torrent.Bitfield as BF
33import Network.BitTorrent.Core
34import Network.BitTorrent.Exchange.Block as Block
35import System.Torrent.Storage (Storage, writePiece)
36
37
38{-----------------------------------------------------------------------
39-- Piece entry
40-----------------------------------------------------------------------}
41
42data PieceEntry = PieceEntry
43 { pending :: [(PeerAddr IP, BlockIx)]
44 , stalled :: Bucket
45 }
46
47pieceEntry :: PieceSize -> PieceEntry
48pieceEntry s = PieceEntry [] (Block.empty s)
49
50isEmpty :: PieceEntry -> Bool
51isEmpty PieceEntry {..} = L.null pending && Block.null stalled
52
53holes :: PieceIx -> PieceEntry -> [BlockIx]
54holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled)
55 where
56 mkBlockIx (off, sz) = BlockIx pix off sz
57
58{-----------------------------------------------------------------------
59-- Session status
60-----------------------------------------------------------------------}
61
62data SessionStatus = SessionStatus
63 { inprogress :: !(Map PieceIx PieceEntry)
64 , bitfield :: !Bitfield
65 , pieceSize :: !PieceSize
66 }
67
68sessionStatus :: Bitfield -> PieceSize -> SessionStatus
69sessionStatus bf ps = SessionStatus
70 { inprogress = M.empty
71 , bitfield = bf
72 , pieceSize = ps
73 }
74
75type StatusUpdates a = StateT SessionStatus IO a
76
77-- |
78runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a
79runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m)
80
81getBitfield :: MVar SessionStatus -> IO Bitfield
82getBitfield var = bitfield <$> readMVar var
83
84getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int
85getRequestQueueLength addr = do
86 m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress)
87 return $ L.sum $ L.map L.length m
88
89modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates ()
90modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s
91 { inprogress = alter (g pieceSize) pix inprogress }
92 where
93 g s = h . f . fromMaybe (pieceEntry s)
94 h e
95 | isEmpty e = Nothing
96 | otherwise = Just e
97
98{-----------------------------------------------------------------------
99-- Piece download
100-----------------------------------------------------------------------}
101
102-- TODO choose block nearest to pending or stalled sets to reduce disk
103-- seeks on remote machines
104chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx]
105chooseBlocks xs n = return (L.take n xs)
106
107-- TODO use selection strategies from Exchange.Selector
108choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx)
109choosePiece bf
110 | BF.null bf = return $ Nothing
111 | otherwise = return $ Just $ BF.findMin bf
112
113scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx]
114scheduleBlocks addr maskBF n = do
115 SessionStatus {..} <- get
116 let wantPieces = maskBF `BF.difference` bitfield
117 let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $
118 M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress
119
120 bixs <- if L.null wantBlocks
121 then do
122 mpix <- choosePiece wantPieces
123 case mpix of -- TODO return 'n' blocks
124 Nothing -> return []
125 Just pix -> return [leadingBlock pix defaultTransferSize]
126 else chooseBlocks wantBlocks n
127
128 forM_ bixs $ \ bix -> do
129 modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e
130 { pending = (addr, bix) : pending }
131
132 return bixs
133
134
135-- | Remove all pending block requests to the remote peer. May be used
136-- when:
137--
138-- * a peer closes connection;
139--
140-- * remote peer choked this peer;
141--
142-- * timeout expired.
143--
144resetPending :: PeerAddr IP -> StatusUpdates ()
145resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) }
146 where
147 reset = fmap $ \ e -> e
148 { pending = L.filter (not . (==) addr . fst) (pending e) }
149
150-- | MAY write to storage, if a new piece have been completed.
151pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool)
152pushBlock blk @ Block {..} storage = do
153 mpe <- gets (M.lookup blkPiece . inprogress)
154 case mpe of
155 Nothing -> return Nothing
156 Just (pe @ PieceEntry {..})
157 | blockIx blk `L.notElem` fmap snd pending -> return Nothing
158 | otherwise -> do
159 let bkt' = Block.insertLazy blkOffset blkData stalled
160 case toPiece bkt' of
161 Nothing -> do
162 modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e
163 { pending = L.filter ((==) (blockIx blk) . snd) pending
164 , stalled = bkt'
165 }
166 return (Just False)
167
168 Just pieceData -> do
169 -- TODO verify
170 liftIO $ writePiece (Piece blkPiece pieceData) storage
171 modify $ \ s @ SessionStatus {..} -> s
172 { inprogress = M.delete blkPiece inprogress
173 , bitfield = BF.insert blkPiece bitfield
174 }
175 return (Just True)