diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-14 22:28:15 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-14 22:28:15 +0400 |
commit | 933e7d37aeafac38eae806fb4556d59803a03270 (patch) | |
tree | bfe885dbe83fb0b23e456b75d70ab5c4bb88d1ae /src/Network/BitTorrent | |
parent | c3594d389a0caba85ae4b5c3c97339c5705551c0 (diff) |
Move piece manager to separate module
Diffstat (limited to 'src/Network/BitTorrent')
-rw-r--r-- | src/Network/BitTorrent/Client/Handle.hs | 12 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 285 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session/Status.hs | 175 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 45 |
4 files changed, 418 insertions, 99 deletions
diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs index 7aaaf5aa..f539d53a 100644 --- a/src/Network/BitTorrent/Client/Handle.hs +++ b/src/Network/BitTorrent/Client/Handle.hs | |||
@@ -22,6 +22,7 @@ import Control.Applicative | |||
22 | import Control.Concurrent | 22 | import Control.Concurrent |
23 | import Control.Monad | 23 | import Control.Monad |
24 | import Control.Monad.Trans | 24 | import Control.Monad.Trans |
25 | import Data.List as L | ||
25 | import Data.HashMap.Strict as HM | 26 | import Data.HashMap.Strict as HM |
26 | 27 | ||
27 | import Data.Torrent | 28 | import Data.Torrent |
@@ -77,7 +78,8 @@ openTorrent rootPath t @ Torrent {..} = do | |||
77 | allocHandle ih $ do | 78 | allocHandle ih $ do |
78 | c @ Client {..} <- getClient | 79 | c @ Client {..} <- getClient |
79 | tses <- liftIO $ Tracker.newSession ih (trackerList t) | 80 | tses <- liftIO $ Tracker.newSession ih (trackerList t) |
80 | eses <- liftIO $ Exchange.newSession (externalAddr c) rootPath tInfoDict | 81 | eses <- liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath |
82 | tInfoDict | ||
81 | return $ Handle ih (idPrivate tInfoDict) tses eses | 83 | return $ Handle ih (idPrivate tInfoDict) tses eses |
82 | 84 | ||
83 | -- | Use 'nullMagnet' to open handle from 'InfoHash'. | 85 | -- | Use 'nullMagnet' to open handle from 'InfoHash'. |
@@ -109,9 +111,11 @@ start Handle {..} = do | |||
109 | liftIO $ Tracker.notify trackerManager trackers Tracker.Started | 111 | liftIO $ Tracker.notify trackerManager trackers Tracker.Started |
110 | unless private $ do | 112 | unless private $ do |
111 | liftDHT $ DHT.insert topic undefined | 113 | liftDHT $ DHT.insert topic undefined |
112 | peers <- liftIO $ askPeers trackerManager trackers | 114 | liftIO $ do |
113 | forM_ peers $ \ peer -> do | 115 | peers <- askPeers trackerManager trackers |
114 | liftIO $ Exchange.insert peer exchange | 116 | print $ "got: " ++ show (L.length peers) ++ " peers" |
117 | forM_ peers $ \ peer -> do | ||
118 | Exchange.insert peer exchange | ||
115 | 119 | ||
116 | -- | Stop downloading this torrent. | 120 | -- | Stop downloading this torrent. |
117 | pause :: Handle -> BitTorrent () | 121 | pause :: Handle -> BitTorrent () |
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 565ef7ab..0d4f3d02 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -1,7 +1,9 @@ | |||
1 | {-# LANGUAGE TemplateHaskell #-} | 1 | {-# LANGUAGE FlexibleInstances #-} |
2 | {-# LANGUAGE DeriveDataTypeable #-} | 2 | {-# LANGUAGE TemplateHaskell #-} |
3 | {-# LANGUAGE DeriveDataTypeable #-} | ||
3 | module Network.BitTorrent.Exchange.Session | 4 | module Network.BitTorrent.Exchange.Session |
4 | ( Session | 5 | ( Session |
6 | , LogFun | ||
5 | , newSession | 7 | , newSession |
6 | , closeSession | 8 | , closeSession |
7 | 9 | ||
@@ -12,76 +14,141 @@ import Control.Applicative | |||
12 | import Control.Concurrent | 14 | import Control.Concurrent |
13 | import Control.Exception | 15 | import Control.Exception |
14 | import Control.Lens | 16 | import Control.Lens |
17 | import Control.Monad.Logger | ||
15 | import Control.Monad.Reader | 18 | import Control.Monad.Reader |
16 | import Control.Monad.State | 19 | import Control.Monad.State |
20 | import Data.ByteString as BS | ||
21 | import Data.ByteString.Lazy as BL | ||
22 | import Data.Conduit | ||
17 | import Data.Function | 23 | import Data.Function |
18 | import Data.IORef | 24 | import Data.IORef |
25 | import Data.List as L | ||
19 | import Data.Maybe | 26 | import Data.Maybe |
20 | import Data.Map as M | 27 | import Data.Map as M |
28 | import Data.Monoid | ||
21 | import Data.Ord | 29 | import Data.Ord |
30 | import Data.Set as S | ||
31 | import Data.Text as T | ||
22 | import Data.Typeable | 32 | import Data.Typeable |
23 | import Text.PrettyPrint | 33 | import Text.PrettyPrint hiding ((<>)) |
34 | import Text.PrettyPrint.Class | ||
35 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | ||
24 | 36 | ||
25 | import Data.Torrent (InfoDict (..)) | 37 | import Data.Torrent (InfoDict (..)) |
26 | import Data.Torrent.Bitfield as BF | 38 | import Data.Torrent.Bitfield as BF |
27 | import Data.Torrent.InfoHash | 39 | import Data.Torrent.InfoHash |
40 | import Data.Torrent.Piece (pieceData, piPieceLength) | ||
41 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) | ||
28 | import Network.BitTorrent.Core | 42 | import Network.BitTorrent.Core |
29 | import Network.BitTorrent.Exchange.Assembler | 43 | import Network.BitTorrent.Exchange.Assembler |
30 | import Network.BitTorrent.Exchange.Block | 44 | import Network.BitTorrent.Exchange.Block as Block |
31 | import Network.BitTorrent.Exchange.Message | 45 | import Network.BitTorrent.Exchange.Message |
46 | import Network.BitTorrent.Exchange.Session.Status as SS | ||
32 | import Network.BitTorrent.Exchange.Status | 47 | import Network.BitTorrent.Exchange.Status |
33 | import Network.BitTorrent.Exchange.Wire | 48 | import Network.BitTorrent.Exchange.Wire |
34 | import System.Torrent.Storage | 49 | import System.Torrent.Storage |
35 | 50 | ||
51 | {----------------------------------------------------------------------- | ||
52 | -- Exceptions | ||
53 | -----------------------------------------------------------------------} | ||
54 | |||
55 | data ExchangeError | ||
56 | = InvalidRequest BlockIx StorageFailure | ||
57 | | CorruptedPiece PieceIx | ||
58 | deriving (Show, Typeable) | ||
59 | |||
60 | instance Exception ExchangeError | ||
61 | |||
62 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | ||
63 | packException f m = try m >>= either (throwIO . f) return | ||
64 | |||
65 | {----------------------------------------------------------------------- | ||
66 | -- Session | ||
67 | -----------------------------------------------------------------------} | ||
68 | |||
69 | data ConnectionEntry = ConnectionEntry | ||
70 | { initiatedBy :: !ChannelSide | ||
71 | , connection :: !(Connection Session) | ||
72 | } | ||
36 | 73 | ||
37 | data Session = Session | 74 | data Session = Session |
38 | { tpeerId :: PeerId | 75 | { tpeerId :: PeerId |
39 | , infohash :: InfoHash | 76 | , infohash :: InfoHash |
40 | , bitfield :: Bitfield | 77 | , storage :: Storage |
41 | , assembler :: Assembler | 78 | , status :: MVar SessionStatus |
42 | , storage :: Storage | 79 | , unchoked :: [PeerAddr IP] |
43 | , unchoked :: [PeerAddr IP] | 80 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) |
44 | , connections :: MVar (Map (PeerAddr IP) (Connection Session)) | 81 | , broadcast :: Chan Message |
45 | , broadcast :: Chan Message | 82 | , logger :: LogFun |
46 | } | 83 | } |
47 | 84 | ||
48 | newSession :: PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | 85 | -- | Logger function. |
86 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
87 | |||
88 | newSession :: LogFun | ||
89 | -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; | ||
49 | -> FilePath -- ^ root directory for content files; | 90 | -> FilePath -- ^ root directory for content files; |
50 | -> InfoDict -- ^ torrent info dictionary; | 91 | -> InfoDict -- ^ torrent info dictionary; |
51 | -> IO Session -- ^ | 92 | -> IO Session -- ^ |
52 | newSession addr rootPath dict = do | 93 | newSession logFun addr rootPath dict = do |
53 | connVar <- newMVar M.empty | 94 | connVar <- newMVar M.empty |
54 | store <- openInfoDict ReadWriteEx rootPath dict | 95 | store <- openInfoDict ReadWriteEx rootPath dict |
55 | chan <- newChan | 96 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) |
97 | (piPieceLength (idPieceInfo dict)) | ||
98 | chan <- newChan | ||
56 | return Session | 99 | return Session |
57 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) | 100 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) |
58 | , infohash = idInfoHash dict | 101 | , infohash = idInfoHash dict |
59 | , bitfield = BF.haveNone (totalPieces store) | 102 | , status = statusVar |
60 | , assembler = error "newSession" | ||
61 | , storage = store | 103 | , storage = store |
62 | , unchoked = [] | 104 | , unchoked = [] |
63 | , connections = connVar | 105 | , connections = connVar |
64 | , broadcast = chan | 106 | , broadcast = chan |
107 | , logger = logFun | ||
65 | } | 108 | } |
66 | 109 | ||
67 | closeSession :: Session -> IO () | 110 | closeSession :: Session -> IO () |
68 | closeSession = undefined | 111 | closeSession = undefined |
69 | 112 | ||
113 | instance MonadIO m => MonadLogger (Connected Session m) where | ||
114 | monadLoggerLog loc src lvl msg = do | ||
115 | conn <- ask | ||
116 | ses <- asks connSession | ||
117 | addr <- asks connRemoteAddr | ||
118 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) | ||
119 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) | ||
120 | |||
121 | logMessage :: Message -> Wire Session () | ||
122 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) | ||
123 | |||
124 | logEvent :: Text -> Wire Session () | ||
125 | logEvent = logInfoN | ||
126 | |||
127 | {----------------------------------------------------------------------- | ||
128 | -- Connections | ||
129 | -----------------------------------------------------------------------} | ||
130 | -- TODO unmap storage on zero connections | ||
131 | |||
70 | insert :: PeerAddr IP | 132 | insert :: PeerAddr IP |
71 | -> {- Maybe Socket | 133 | -> {- Maybe Socket |
72 | -> -} Session -> IO () | 134 | -> -} Session -> IO () |
73 | insert addr ses @ Session {..} = do | 135 | insert addr ses @ Session {..} = do |
74 | forkIO $ do | 136 | forkIO $ do |
75 | let caps = def | 137 | action `finally` runStatusUpdates status (resetPending addr) |
76 | let ecaps = def | 138 | return () |
77 | let hs = Handshake def caps infohash tpeerId | 139 | where |
78 | chan <- dupChan broadcast | 140 | action = do |
79 | connectWire ses hs addr ecaps chan $ do | 141 | let caps = def |
80 | conn <- getConnection | 142 | let ecaps = def |
143 | let hs = Handshake def caps infohash tpeerId | ||
144 | chan <- dupChan broadcast | ||
145 | connectWire ses hs addr ecaps chan $ do | ||
146 | conn <- getConnection | ||
81 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | 147 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn |
82 | exchange | 148 | resizeBitfield (totalPieces storage) |
149 | logEvent "Connection established" | ||
150 | exchange | ||
83 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | 151 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr |
84 | return () | ||
85 | 152 | ||
86 | delete :: PeerAddr IP -> Session -> IO () | 153 | delete :: PeerAddr IP -> Session -> IO () |
87 | delete = undefined | 154 | delete = undefined |
@@ -90,81 +157,125 @@ deleteAll :: Session -> IO () | |||
90 | deleteAll = undefined | 157 | deleteAll = undefined |
91 | 158 | ||
92 | {----------------------------------------------------------------------- | 159 | {----------------------------------------------------------------------- |
93 | -- Query | 160 | -- Helpers |
94 | -----------------------------------------------------------------------} | 161 | -----------------------------------------------------------------------} |
95 | 162 | ||
163 | withStatusUpdates :: StatusUpdates a -> Wire Session a | ||
164 | withStatusUpdates m = do | ||
165 | Session {..} <- getSession | ||
166 | liftIO $ runStatusUpdates status m | ||
167 | |||
96 | getThisBitfield :: Wire Session Bitfield | 168 | getThisBitfield :: Wire Session Bitfield |
97 | getThisBitfield = undefined | 169 | getThisBitfield = do |
170 | ses <- getSession | ||
171 | liftIO $ SS.getBitfield (status ses) | ||
172 | |||
173 | readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) | ||
174 | readBlock bix @ BlockIx {..} s = do | ||
175 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece s | ||
176 | let chunk = BL.take (fromIntegral ixLength) $ | ||
177 | BL.drop (fromIntegral ixOffset) (pieceData p) | ||
178 | if BL.length chunk == fromIntegral ixLength | ||
179 | then return $ Block ixPiece ixOffset chunk | ||
180 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | ||
181 | |||
182 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | ||
183 | sendBroadcast msg = do | ||
184 | Session {..} <- getSession | ||
185 | ecaps <- getExtCaps | ||
186 | liftIO $ writeChan broadcast (envelop ecaps msg) | ||
187 | |||
188 | {----------------------------------------------------------------------- | ||
189 | -- Triggers | ||
190 | -----------------------------------------------------------------------} | ||
98 | 191 | ||
99 | {- | 192 | fillRequestQueue :: Wire Session () |
100 | data PendingSet = PendingSet (Map (PeerAddr IP) [BlockIx]) | 193 | fillRequestQueue = do |
194 | maxN <- getAdvertisedQueueLength | ||
195 | rbf <- getRemoteBitfield | ||
196 | addr <- connRemoteAddr <$> getConnection | ||
197 | blks <- withStatusUpdates $ do | ||
198 | n <- getRequestQueueLength addr | ||
199 | scheduleBlocks addr rbf (maxN - n) | ||
200 | mapM_ (sendMessage . Request) blks | ||
101 | 201 | ||
102 | empty :: PendingSet | 202 | tryFillRequestQueue :: Wire Session () |
103 | empty = undefined | 203 | tryFillRequestQueue = do |
204 | allowed <- canDownload <$> getStatus | ||
205 | when allowed $ do | ||
206 | fillRequestQueue | ||
104 | 207 | ||
105 | member :: PeerAddr IP -> BlockIx -> PendingSet -> Bool | 208 | interesting :: Wire Session () |
106 | member addr bix = undefined | 209 | interesting = do |
210 | addr <- connRemoteAddr <$> getConnection | ||
211 | logMessage (Status (Interested True)) | ||
212 | sendMessage (Interested True) | ||
213 | logMessage (Status (Choking False)) | ||
214 | sendMessage (Choking False) | ||
215 | tryFillRequestQueue | ||
107 | 216 | ||
108 | insert :: PeerAddr IP -> BlockIx -> PendingSet -> PendingSet | ||
109 | insert addr bix = undefined | ||
110 | -} | ||
111 | {----------------------------------------------------------------------- | 217 | {----------------------------------------------------------------------- |
112 | -- Event loop | 218 | -- Incoming message handling |
113 | -----------------------------------------------------------------------} | 219 | -----------------------------------------------------------------------} |
114 | {- | ||
115 | data ExchangeError | ||
116 | = InvalidRequest BlockIx StorageFailure | ||
117 | | CorruptedPiece PieceIx | ||
118 | 220 | ||
119 | packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a | 221 | handleStatus :: StatusUpdate -> Wire Session () |
120 | packException f m = try >>= either (throwIO . f) m | 222 | handleStatus s = do |
223 | updateConnStatus RemotePeer s | ||
224 | case s of | ||
225 | Interested _ -> return () | ||
226 | Choking True -> do | ||
227 | addr <- connRemoteAddr <$> getConnection | ||
228 | withStatusUpdates (resetPending addr) | ||
229 | Choking False -> tryFillRequestQueue | ||
121 | 230 | ||
122 | readBlock :: BlockIx -> Storage -> IO (Block ByteString) | 231 | handleAvailable :: Available -> Wire Session () |
123 | readBlock bix @ BlockIx {..} s = do | 232 | handleAvailable msg = do |
124 | p <- packException (InvalidRequest bix) $ do readPiece ixPiece storage | 233 | updateRemoteBitfield $ case msg of |
125 | let chunk = BS.take ixLength $ BS.drop ixOffset p | 234 | Have ix -> BF.insert ix |
126 | if BS.length chunk == ixLength | 235 | Bitfield bf -> const bf |
127 | then return chunk | 236 | |
128 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | 237 | thisBf <- getThisBitfield |
129 | -} | 238 | case msg of |
239 | Have ix | ||
240 | | ix `BF.member` thisBf -> return () | ||
241 | | otherwise -> interesting | ||
242 | Bitfield bf | ||
243 | | bf `BF.isSubsetOf` thisBf -> return () | ||
244 | | otherwise -> interesting | ||
130 | 245 | ||
131 | handleTransfer :: Transfer -> Wire Session () | 246 | handleTransfer :: Transfer -> Wire Session () |
132 | handleTransfer (Request bix) = do | 247 | handleTransfer (Request bix) = do |
133 | -- Session {..} <- getSession | 248 | Session {..} <- getSession |
134 | -- addr <- getRemoteAddr | 249 | bitfield <- getThisBitfield |
135 | -- when (addr `elem` unchoked && ixPiece bix `BF.member` bitfield) $ do | 250 | upload <- canUpload <$> getStatus |
136 | -- blk <- liftIO $ readBlock bix storage | 251 | when (upload && ixPiece bix `BF.member` bitfield) $ do |
137 | -- sendMsg (Piece blk) | 252 | blk <- liftIO $ readBlock bix storage |
138 | return () | 253 | sendMessage (Piece blk) |
139 | 254 | ||
140 | handleTransfer (Piece blk) = do | 255 | handleTransfer (Piece blk) = do |
141 | {- | 256 | Session {..} <- getSession |
142 | Session {..} <- getSession | 257 | isSuccess <- withStatusUpdates (pushBlock blk storage) |
143 | when (blockIx blk `PS.member` pendingSet) $ do | 258 | case isSuccess of |
144 | insert blk stalledSet | 259 | Nothing -> liftIO $ throwIO $ userError "block is not requested" |
145 | sendBroadcast have | 260 | Just isCompleted -> do |
146 | maybe send not interested | 261 | when isCompleted $ do |
147 | -} | 262 | sendBroadcast (Have (blkPiece blk)) |
148 | return () | 263 | -- maybe send not interested |
264 | tryFillRequestQueue | ||
149 | 265 | ||
150 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | 266 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) |
151 | where | 267 | where |
152 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix | 268 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix |
153 | transferResponse _ _ = False | 269 | transferResponse _ _ = False |
154 | 270 | ||
271 | {----------------------------------------------------------------------- | ||
272 | -- Event loop | ||
273 | -----------------------------------------------------------------------} | ||
274 | |||
155 | handleMessage :: Message -> Wire Session () | 275 | handleMessage :: Message -> Wire Session () |
156 | handleMessage KeepAlive = return () | 276 | handleMessage KeepAlive = return () |
157 | handleMessage (Status s) = undefined | 277 | handleMessage (Status s) = handleStatus s |
158 | handleMessage (Available msg) = do | 278 | handleMessage (Available msg) = handleAvailable msg |
159 | thisBf <- getThisBitfield | ||
160 | case msg of | ||
161 | Have ix | ||
162 | | ix `BF.member` thisBf -> return () | ||
163 | | otherwise -> undefined | ||
164 | Bitfield bf | ||
165 | | bf `BF.isSubsetOf` thisBf -> return () | ||
166 | | otherwise -> undefined | ||
167 | |||
168 | handleMessage (Transfer msg) = handleTransfer msg | 279 | handleMessage (Transfer msg) = handleTransfer msg |
169 | handleMessage (Port n) = undefined | 280 | handleMessage (Port n) = undefined |
170 | handleMessage (Fast _) = undefined | 281 | handleMessage (Fast _) = undefined |
@@ -172,19 +283,17 @@ handleMessage (Extended _) = undefined | |||
172 | 283 | ||
173 | exchange :: Wire Session () | 284 | exchange :: Wire Session () |
174 | exchange = do | 285 | exchange = do |
175 | e <- recvMessage | 286 | bf <- getThisBitfield |
176 | liftIO $ print e | 287 | sendMessage (Bitfield bf) |
177 | 288 | awaitForever $ \ msg -> do | |
178 | type Exchange = StateT Session (ReaderT (Connection Session) IO) | 289 | logMessage msg |
179 | 290 | handleMessage msg | |
180 | --runExchange :: Exchange () -> [PeerAddr] -> IO () | ||
181 | --runExchange exchange peers = do | ||
182 | -- forM_ peers $ \ peer -> do | ||
183 | -- forkIO $ runReaderT (runStateT exchange session ) | ||
184 | 291 | ||
185 | data Event = NewMessage (PeerAddr IP) Message | 292 | data Event = NewMessage (PeerAddr IP) Message |
186 | | Timeout -- for scheduling | 293 | | Timeout -- for scheduling |
187 | 294 | ||
295 | type Exchange a = Wire Session a | ||
296 | |||
188 | awaitEvent :: Exchange Event | 297 | awaitEvent :: Exchange Event |
189 | awaitEvent = undefined | 298 | awaitEvent = undefined |
190 | 299 | ||
diff --git a/src/Network/BitTorrent/Exchange/Session/Status.hs b/src/Network/BitTorrent/Exchange/Session/Status.hs new file mode 100644 index 00000000..565c3bf3 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Session/Status.hs | |||
@@ -0,0 +1,175 @@ | |||
1 | module Network.BitTorrent.Exchange.Session.Status | ||
2 | ( -- * Environment | ||
3 | StatusUpdates | ||
4 | , runStatusUpdates | ||
5 | |||
6 | -- * Status | ||
7 | , SessionStatus | ||
8 | , sessionStatus | ||
9 | |||
10 | -- * Query | ||
11 | , getBitfield | ||
12 | , getRequestQueueLength | ||
13 | |||
14 | -- * Control | ||
15 | , scheduleBlocks | ||
16 | , resetPending | ||
17 | , pushBlock | ||
18 | ) where | ||
19 | |||
20 | import Control.Applicative | ||
21 | import Control.Concurrent | ||
22 | import Control.Monad.State | ||
23 | import Data.ByteString.Lazy as BL | ||
24 | import Data.Default | ||
25 | import Data.List as L | ||
26 | import Data.Maybe | ||
27 | import Data.Map as M | ||
28 | import Data.Set as S | ||
29 | import Data.Tuple | ||
30 | |||
31 | import Data.Torrent.Piece | ||
32 | import Data.Torrent.Bitfield as BF | ||
33 | import Network.BitTorrent.Core | ||
34 | import Network.BitTorrent.Exchange.Block as Block | ||
35 | import System.Torrent.Storage (Storage, writePiece) | ||
36 | |||
37 | |||
38 | {----------------------------------------------------------------------- | ||
39 | -- Piece entry | ||
40 | -----------------------------------------------------------------------} | ||
41 | |||
42 | data PieceEntry = PieceEntry | ||
43 | { pending :: [(PeerAddr IP, BlockIx)] | ||
44 | , stalled :: Bucket | ||
45 | } | ||
46 | |||
47 | pieceEntry :: PieceSize -> PieceEntry | ||
48 | pieceEntry s = PieceEntry [] (Block.empty s) | ||
49 | |||
50 | isEmpty :: PieceEntry -> Bool | ||
51 | isEmpty PieceEntry {..} = L.null pending && Block.null stalled | ||
52 | |||
53 | holes :: PieceIx -> PieceEntry -> [BlockIx] | ||
54 | holes pix PieceEntry {..} = fmap mkBlockIx (spans defaultTransferSize stalled) | ||
55 | where | ||
56 | mkBlockIx (off, sz) = BlockIx pix off sz | ||
57 | |||
58 | {----------------------------------------------------------------------- | ||
59 | -- Session status | ||
60 | -----------------------------------------------------------------------} | ||
61 | |||
62 | data SessionStatus = SessionStatus | ||
63 | { inprogress :: !(Map PieceIx PieceEntry) | ||
64 | , bitfield :: !Bitfield | ||
65 | , pieceSize :: !PieceSize | ||
66 | } | ||
67 | |||
68 | sessionStatus :: Bitfield -> PieceSize -> SessionStatus | ||
69 | sessionStatus bf ps = SessionStatus | ||
70 | { inprogress = M.empty | ||
71 | , bitfield = bf | ||
72 | , pieceSize = ps | ||
73 | } | ||
74 | |||
75 | type StatusUpdates a = StateT SessionStatus IO a | ||
76 | |||
77 | -- | | ||
78 | runStatusUpdates :: MVar SessionStatus -> StatusUpdates a -> IO a | ||
79 | runStatusUpdates var m = modifyMVar var (fmap swap . runStateT m) | ||
80 | |||
81 | getBitfield :: MVar SessionStatus -> IO Bitfield | ||
82 | getBitfield var = bitfield <$> readMVar var | ||
83 | |||
84 | getRequestQueueLength :: PeerAddr IP -> StatusUpdates Int | ||
85 | getRequestQueueLength addr = do | ||
86 | m <- gets (M.elems . M.map (L.filter ((==) addr . fst) . pending) . inprogress) | ||
87 | return $ L.sum $ L.map L.length m | ||
88 | |||
89 | modifyEntry :: PieceIx -> (PieceEntry -> PieceEntry) -> StatusUpdates () | ||
90 | modifyEntry pix f = modify $ \ s @ SessionStatus {..} -> s | ||
91 | { inprogress = alter (g pieceSize) pix inprogress } | ||
92 | where | ||
93 | g s = h . f . fromMaybe (pieceEntry s) | ||
94 | h e | ||
95 | | isEmpty e = Nothing | ||
96 | | otherwise = Just e | ||
97 | |||
98 | {----------------------------------------------------------------------- | ||
99 | -- Piece download | ||
100 | -----------------------------------------------------------------------} | ||
101 | |||
102 | -- TODO choose block nearest to pending or stalled sets to reduce disk | ||
103 | -- seeks on remote machines | ||
104 | chooseBlocks :: [BlockIx] -> Int -> StatusUpdates [BlockIx] | ||
105 | chooseBlocks xs n = return (L.take n xs) | ||
106 | |||
107 | -- TODO use selection strategies from Exchange.Selector | ||
108 | choosePiece :: Bitfield -> StatusUpdates (Maybe PieceIx) | ||
109 | choosePiece bf | ||
110 | | BF.null bf = return $ Nothing | ||
111 | | otherwise = return $ Just $ BF.findMin bf | ||
112 | |||
113 | scheduleBlocks :: PeerAddr IP -> Bitfield -> Int -> StatusUpdates [BlockIx] | ||
114 | scheduleBlocks addr maskBF n = do | ||
115 | SessionStatus {..} <- get | ||
116 | let wantPieces = maskBF `BF.difference` bitfield | ||
117 | let wantBlocks = L.concat $ M.elems $ M.mapWithKey holes $ | ||
118 | M.filterWithKey (\ pix _ -> pix `BF.member` wantPieces) inprogress | ||
119 | |||
120 | bixs <- if L.null wantBlocks | ||
121 | then do | ||
122 | mpix <- choosePiece wantPieces | ||
123 | case mpix of -- TODO return 'n' blocks | ||
124 | Nothing -> return [] | ||
125 | Just pix -> return [leadingBlock pix defaultTransferSize] | ||
126 | else chooseBlocks wantBlocks n | ||
127 | |||
128 | forM_ bixs $ \ bix -> do | ||
129 | modifyEntry (ixPiece bix) $ \ e @ PieceEntry {..} -> e | ||
130 | { pending = (addr, bix) : pending } | ||
131 | |||
132 | return bixs | ||
133 | |||
134 | |||
135 | -- | Remove all pending block requests to the remote peer. May be used | ||
136 | -- when: | ||
137 | -- | ||
138 | -- * a peer closes connection; | ||
139 | -- | ||
140 | -- * remote peer choked this peer; | ||
141 | -- | ||
142 | -- * timeout expired. | ||
143 | -- | ||
144 | resetPending :: PeerAddr IP -> StatusUpdates () | ||
145 | resetPending addr = modify $ \ s -> s { inprogress = reset (inprogress s) } | ||
146 | where | ||
147 | reset = fmap $ \ e -> e | ||
148 | { pending = L.filter (not . (==) addr . fst) (pending e) } | ||
149 | |||
150 | -- | MAY write to storage, if a new piece have been completed. | ||
151 | pushBlock :: Block BL.ByteString -> Storage -> StatusUpdates (Maybe Bool) | ||
152 | pushBlock blk @ Block {..} storage = do | ||
153 | mpe <- gets (M.lookup blkPiece . inprogress) | ||
154 | case mpe of | ||
155 | Nothing -> return Nothing | ||
156 | Just (pe @ PieceEntry {..}) | ||
157 | | blockIx blk `L.notElem` fmap snd pending -> return Nothing | ||
158 | | otherwise -> do | ||
159 | let bkt' = Block.insertLazy blkOffset blkData stalled | ||
160 | case toPiece bkt' of | ||
161 | Nothing -> do | ||
162 | modifyEntry blkPiece $ \ e @ PieceEntry {..} -> e | ||
163 | { pending = L.filter ((==) (blockIx blk) . snd) pending | ||
164 | , stalled = bkt' | ||
165 | } | ||
166 | return (Just False) | ||
167 | |||
168 | Just pieceData -> do | ||
169 | -- TODO verify | ||
170 | liftIO $ writePiece (Piece blkPiece pieceData) storage | ||
171 | modify $ \ s @ SessionStatus {..} -> s | ||
172 | { inprogress = M.delete blkPiece inprogress | ||
173 | , bitfield = BF.insert blkPiece bitfield | ||
174 | } | ||
175 | return (Just True) | ||
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 1fca6a66..8873546d 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -15,7 +15,8 @@ | |||
15 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 15 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
16 | module Network.BitTorrent.Exchange.Wire | 16 | module Network.BitTorrent.Exchange.Wire |
17 | ( -- * Wire | 17 | ( -- * Wire |
18 | Wire | 18 | Connected |
19 | , Wire | ||
19 | 20 | ||
20 | -- ** Exceptions | 21 | -- ** Exceptions |
21 | , ChannelSide (..) | 22 | , ChannelSide (..) |
@@ -38,12 +39,14 @@ module Network.BitTorrent.Exchange.Wire | |||
38 | 39 | ||
39 | -- ** Connection | 40 | -- ** Connection |
40 | , Connection | 41 | , Connection |
42 | , connRemoteAddr | ||
41 | , connProtocol | 43 | , connProtocol |
42 | , connCaps | 44 | , connCaps |
43 | , connTopic | 45 | , connTopic |
44 | , connRemotePeerId | 46 | , connRemotePeerId |
45 | , connThisPeerId | 47 | , connThisPeerId |
46 | , connOptions | 48 | , connOptions |
49 | , connSession | ||
47 | 50 | ||
48 | -- ** Setup | 51 | -- ** Setup |
49 | , runWire | 52 | , runWire |
@@ -55,6 +58,7 @@ module Network.BitTorrent.Exchange.Wire | |||
55 | , recvMessage | 58 | , recvMessage |
56 | , sendMessage | 59 | , sendMessage |
57 | , filterQueue | 60 | , filterQueue |
61 | , getAdvertisedQueueLength | ||
58 | 62 | ||
59 | -- ** Query | 63 | -- ** Query |
60 | , getConnection | 64 | , getConnection |
@@ -93,6 +97,7 @@ import Network.Socket.ByteString as BS | |||
93 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 97 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
94 | import Text.PrettyPrint.Class | 98 | import Text.PrettyPrint.Class |
95 | import Text.Show.Functions | 99 | import Text.Show.Functions |
100 | import System.Log.FastLogger (ToLogStr(..)) | ||
96 | import System.Timeout | 101 | import System.Timeout |
97 | 102 | ||
98 | import Data.BEncode as BE | 103 | import Data.BEncode as BE |
@@ -190,13 +195,15 @@ errorPenalty (DisallowedMessage _ _) = 1 | |||
190 | 195 | ||
191 | -- | Exceptions used to interrupt the current P2P session. | 196 | -- | Exceptions used to interrupt the current P2P session. |
192 | data WireFailure | 197 | data WireFailure |
198 | = ConnectionRefused IOError | ||
199 | |||
193 | -- | Force termination of wire connection. | 200 | -- | Force termination of wire connection. |
194 | -- | 201 | -- |
195 | -- Normally you should throw only this exception from event loop | 202 | -- Normally you should throw only this exception from event loop |
196 | -- using 'disconnectPeer', other exceptions are thrown | 203 | -- using 'disconnectPeer', other exceptions are thrown |
197 | -- automatically by functions from this module. | 204 | -- automatically by functions from this module. |
198 | -- | 205 | -- |
199 | = DisconnectPeer | 206 | | DisconnectPeer |
200 | 207 | ||
201 | -- | A peer not responding and did not send a 'KeepAlive' message | 208 | -- | A peer not responding and did not send a 'KeepAlive' message |
202 | -- for a specified period of time. | 209 | -- for a specified period of time. |
@@ -464,10 +471,12 @@ makeLenses ''ConnectionState | |||
464 | 471 | ||
465 | -- | Connection keep various info about both peers. | 472 | -- | Connection keep various info about both peers. |
466 | data Connection s = Connection | 473 | data Connection s = Connection |
467 | { -- | /Both/ peers handshaked with this protocol string. The only | 474 | { connRemoteAddr :: !(PeerAddr IP) |
475 | |||
476 | -- | /Both/ peers handshaked with this protocol string. The only | ||
468 | -- value is \"Bittorrent Protocol\" but this can be changed in | 477 | -- value is \"Bittorrent Protocol\" but this can be changed in |
469 | -- future. | 478 | -- future. |
470 | connProtocol :: !ProtocolName | 479 | , connProtocol :: !ProtocolName |
471 | 480 | ||
472 | -- | Set of enabled core extensions, i.e. the pre BEP10 extension | 481 | -- | Set of enabled core extensions, i.e. the pre BEP10 extension |
473 | -- mechanism. This value is used to check if a message is allowed | 482 | -- mechanism. This value is used to check if a message is allowed |
@@ -503,6 +512,17 @@ data Connection s = Connection | |||
503 | instance Pretty (Connection s) where | 512 | instance Pretty (Connection s) where |
504 | pretty Connection {..} = "Connection" | 513 | pretty Connection {..} = "Connection" |
505 | 514 | ||
515 | instance ToLogStr (Connection s) where | ||
516 | toLogStr Connection {..} = mconcat | ||
517 | [ toLogStr (show connRemoteAddr) | ||
518 | , toLogStr (show connProtocol) | ||
519 | , toLogStr (show connCaps) | ||
520 | , toLogStr (show connTopic) | ||
521 | , toLogStr (show connRemotePeerId) | ||
522 | , toLogStr (show connThisPeerId) | ||
523 | , toLogStr (show connOptions) | ||
524 | ] | ||
525 | |||
506 | -- TODO check extended messages too | 526 | -- TODO check extended messages too |
507 | isAllowed :: Connection s -> Message -> Bool | 527 | isAllowed :: Connection s -> Message -> Bool |
508 | isAllowed Connection {..} msg | 528 | isAllowed Connection {..} msg |
@@ -592,6 +612,15 @@ getConnection = lift ask | |||
592 | getSession :: Wire s s | 612 | getSession :: Wire s s |
593 | getSession = lift (asks connSession) | 613 | getSession = lift (asks connSession) |
594 | 614 | ||
615 | -- TODO configurable | ||
616 | defQueueLength :: Int | ||
617 | defQueueLength = 1 | ||
618 | |||
619 | getAdvertisedQueueLength :: Wire s Int | ||
620 | getAdvertisedQueueLength = do | ||
621 | ExtendedHandshake {..} <- getRemoteEhs | ||
622 | return $ fromMaybe defQueueLength ehsQueueLength | ||
623 | |||
595 | {----------------------------------------------------------------------- | 624 | {----------------------------------------------------------------------- |
596 | -- Wrapper | 625 | -- Wrapper |
597 | -----------------------------------------------------------------------} | 626 | -----------------------------------------------------------------------} |
@@ -685,8 +714,9 @@ reconnect = undefined | |||
685 | -- | 714 | -- |
686 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message | 715 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message |
687 | -> Wire s () -> IO () | 716 | -> Wire s () -> IO () |
688 | connectWire session hs addr extCaps chan wire = | 717 | connectWire session hs addr extCaps chan wire = do |
689 | bracket (peerSocket Stream addr) close $ \ sock -> do | 718 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return |
719 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | ||
690 | hs' <- initiateHandshake sock hs | 720 | hs' <- initiateHandshake sock hs |
691 | 721 | ||
692 | Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ | 722 | Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ |
@@ -723,7 +753,8 @@ connectWire session hs addr extCaps chan wire = | |||
723 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) | 753 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) |
724 | (killThread) $ \ _ -> | 754 | (killThread) $ \ _ -> |
725 | runWire wire' sock chan $ Connection | 755 | runWire wire' sock chan $ Connection |
726 | { connProtocol = hsProtocol hs | 756 | { connRemoteAddr = addr |
757 | , connProtocol = hsProtocol hs | ||
727 | , connCaps = caps | 758 | , connCaps = caps |
728 | , connTopic = hsInfoHash hs | 759 | , connTopic = hsInfoHash hs |
729 | , connRemotePeerId = hsPeerId hs' | 760 | , connRemotePeerId = hsPeerId hs' |