diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 154 |
1 files changed, 124 insertions, 30 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 1e72ba96..f10f601e 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -12,7 +12,7 @@ module Network.BitTorrent.Exchange.Session | |||
12 | 12 | ||
13 | import Control.Applicative | 13 | import Control.Applicative |
14 | import Control.Concurrent | 14 | import Control.Concurrent |
15 | import Control.Exception | 15 | import Control.Exception hiding (Handler) |
16 | import Control.Lens | 16 | import Control.Lens |
17 | import Control.Monad.Logger | 17 | import Control.Monad.Logger |
18 | import Control.Monad.Reader | 18 | import Control.Monad.Reader |
@@ -20,6 +20,7 @@ import Control.Monad.State | |||
20 | import Data.ByteString as BS | 20 | import Data.ByteString as BS |
21 | import Data.ByteString.Lazy as BL | 21 | import Data.ByteString.Lazy as BL |
22 | import Data.Conduit | 22 | import Data.Conduit |
23 | import Data.Conduit.List as CL (iterM) | ||
23 | import Data.Function | 24 | import Data.Function |
24 | import Data.IORef | 25 | import Data.IORef |
25 | import Data.List as L | 26 | import Data.List as L |
@@ -34,16 +35,18 @@ import Text.PrettyPrint hiding ((<>)) | |||
34 | import Text.PrettyPrint.Class | 35 | import Text.PrettyPrint.Class |
35 | import System.Log.FastLogger (LogStr, ToLogStr (..)) | 36 | import System.Log.FastLogger (LogStr, ToLogStr (..)) |
36 | 37 | ||
38 | import Data.BEncode as BE | ||
37 | import Data.Torrent (InfoDict (..)) | 39 | import Data.Torrent (InfoDict (..)) |
38 | import Data.Torrent.Bitfield as BF | 40 | import Data.Torrent.Bitfield as BF |
39 | import Data.Torrent.InfoHash | 41 | import Data.Torrent.InfoHash |
40 | import Data.Torrent.Piece (pieceData, piPieceLength) | 42 | import Data.Torrent.Piece |
41 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) | 43 | import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) |
42 | import Network.BitTorrent.Core | 44 | import Network.BitTorrent.Core |
43 | import Network.BitTorrent.Exchange.Assembler | 45 | import Network.BitTorrent.Exchange.Assembler |
44 | import Network.BitTorrent.Exchange.Block as Block | 46 | import Network.BitTorrent.Exchange.Block as Block |
45 | import Network.BitTorrent.Exchange.Message | 47 | import Network.BitTorrent.Exchange.Message as Message |
46 | import Network.BitTorrent.Exchange.Session.Status as SS | 48 | import Network.BitTorrent.Exchange.Session.Metadata as Metadata |
49 | import Network.BitTorrent.Exchange.Session.Status as SS | ||
47 | import Network.BitTorrent.Exchange.Status | 50 | import Network.BitTorrent.Exchange.Status |
48 | import Network.BitTorrent.Exchange.Wire | 51 | import Network.BitTorrent.Exchange.Wire |
49 | import System.Torrent.Storage | 52 | import System.Torrent.Storage |
@@ -66,6 +69,14 @@ packException f m = try m >>= either (throwIO . f) return | |||
66 | -- Session | 69 | -- Session |
67 | -----------------------------------------------------------------------} | 70 | -----------------------------------------------------------------------} |
68 | 71 | ||
72 | data Cached a = Cached | ||
73 | { cachedValue :: !a | ||
74 | , cachedData :: BL.ByteString -- keep lazy | ||
75 | } | ||
76 | |||
77 | cache :: BEncode a => a -> Cached a | ||
78 | cache s = Cached s (BE.encode s) | ||
79 | |||
69 | data ConnectionEntry = ConnectionEntry | 80 | data ConnectionEntry = ConnectionEntry |
70 | { initiatedBy :: !ChannelSide | 81 | { initiatedBy :: !ChannelSide |
71 | , connection :: !(Connection Session) | 82 | , connection :: !(Connection Session) |
@@ -74,12 +85,14 @@ data ConnectionEntry = ConnectionEntry | |||
74 | data Session = Session | 85 | data Session = Session |
75 | { tpeerId :: PeerId | 86 | { tpeerId :: PeerId |
76 | , infohash :: InfoHash | 87 | , infohash :: InfoHash |
88 | , metadata :: MVar Metadata.Status | ||
77 | , storage :: Storage | 89 | , storage :: Storage |
78 | , status :: MVar SessionStatus | 90 | , status :: MVar SessionStatus |
79 | , unchoked :: [PeerAddr IP] | 91 | , unchoked :: [PeerAddr IP] |
80 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) | 92 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) |
81 | , broadcast :: Chan Message | 93 | , broadcast :: Chan Message |
82 | , logger :: LogFun | 94 | , logger :: LogFun |
95 | , infodict :: MVar (Cached InfoDict) | ||
83 | } | 96 | } |
84 | 97 | ||
85 | -- | Logger function. | 98 | -- | Logger function. |
@@ -108,7 +121,13 @@ newSession logFun addr rootPath dict = do | |||
108 | } | 121 | } |
109 | 122 | ||
110 | closeSession :: Session -> IO () | 123 | closeSession :: Session -> IO () |
111 | closeSession = undefined | 124 | closeSession ses = do |
125 | deleteAll ses | ||
126 | undefined | ||
127 | |||
128 | {----------------------------------------------------------------------- | ||
129 | -- Logging | ||
130 | -----------------------------------------------------------------------} | ||
112 | 131 | ||
113 | instance MonadLogger (Connected Session) where | 132 | instance MonadLogger (Connected Session) where |
114 | monadLoggerLog loc src lvl msg = do | 133 | monadLoggerLog loc src lvl msg = do |
@@ -118,11 +137,11 @@ instance MonadLogger (Connected Session) where | |||
118 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) | 137 | let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) |
119 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) | 138 | liftIO $ logger ses loc addrSrc lvl (toLogStr msg) |
120 | 139 | ||
121 | logMessage :: Message -> Wire Session () | 140 | logMessage :: MonadLogger m => Message -> m () |
122 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) | 141 | logMessage msg = logDebugN $ T.pack (render (pretty msg)) |
123 | 142 | ||
124 | logEvent :: Text -> Wire Session () | 143 | logEvent :: MonadLogger m => Text -> m () |
125 | logEvent = logInfoN | 144 | logEvent = logInfoN |
126 | 145 | ||
127 | {----------------------------------------------------------------------- | 146 | {----------------------------------------------------------------------- |
128 | -- Connections | 147 | -- Connections |
@@ -132,11 +151,12 @@ logEvent = logInfoN | |||
132 | insert :: PeerAddr IP | 151 | insert :: PeerAddr IP |
133 | -> {- Maybe Socket | 152 | -> {- Maybe Socket |
134 | -> -} Session -> IO () | 153 | -> -} Session -> IO () |
135 | insert addr ses @ Session {..} = do | 154 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) |
136 | forkIO $ do | ||
137 | action `finally` runStatusUpdates status (resetPending addr) | ||
138 | return () | ||
139 | where | 155 | where |
156 | cleanup = do | ||
157 | runStatusUpdates status (SS.resetPending addr) | ||
158 | -- TODO Metata.resetPending addr | ||
159 | |||
140 | action = do | 160 | action = do |
141 | let caps = def | 161 | let caps = def |
142 | let ecaps = def | 162 | let ecaps = def |
@@ -147,7 +167,7 @@ insert addr ses @ Session {..} = do | |||
147 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | 167 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn |
148 | lift $ resizeBitfield (totalPieces storage) | 168 | lift $ resizeBitfield (totalPieces storage) |
149 | logEvent "Connection established" | 169 | logEvent "Connection established" |
150 | exchange | 170 | iterM logMessage =$= exchange =$= iterM logMessage |
151 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | 171 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr |
152 | 172 | ||
153 | delete :: PeerAddr IP -> Session -> IO () | 173 | delete :: PeerAddr IP -> Session -> IO () |
@@ -160,11 +180,26 @@ deleteAll = undefined | |||
160 | -- Helpers | 180 | -- Helpers |
161 | -----------------------------------------------------------------------} | 181 | -----------------------------------------------------------------------} |
162 | 182 | ||
183 | waitMVar :: MVar a -> IO () | ||
184 | waitMVar m = withMVar m (const (return ())) | ||
185 | |||
186 | -- This function appear in new GHC "out of box". (moreover it is atomic) | ||
187 | tryReadMVar :: MVar a -> IO (Maybe a) | ||
188 | tryReadMVar m = do | ||
189 | ma <- tryTakeMVar m | ||
190 | maybe (return ()) (putMVar m) ma | ||
191 | return ma | ||
192 | |||
163 | withStatusUpdates :: StatusUpdates a -> Wire Session a | 193 | withStatusUpdates :: StatusUpdates a -> Wire Session a |
164 | withStatusUpdates m = do | 194 | withStatusUpdates m = do |
165 | Session {..} <- asks connSession | 195 | Session {..} <- asks connSession |
166 | liftIO $ runStatusUpdates status m | 196 | liftIO $ runStatusUpdates status m |
167 | 197 | ||
198 | withMetadataUpdates :: Updates a -> Connected Session a | ||
199 | withMetadataUpdates m = do | ||
200 | Session {..} <- asks connSession | ||
201 | liftIO $ runUpdates metadata m | ||
202 | |||
168 | getThisBitfield :: Wire Session Bitfield | 203 | getThisBitfield :: Wire Session Bitfield |
169 | getThisBitfield = do | 204 | getThisBitfield = do |
170 | ses <- asks connSession | 205 | ses <- asks connSession |
@@ -179,6 +214,16 @@ readBlock bix @ BlockIx {..} s = do | |||
179 | then return $ Block ixPiece ixOffset chunk | 214 | then return $ Block ixPiece ixOffset chunk |
180 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) | 215 | else throwIO $ InvalidRequest bix (InvalidSize ixLength) |
181 | 216 | ||
217 | -- | | ||
218 | tryReadMetadataBlock :: PieceIx | ||
219 | -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) | ||
220 | tryReadMetadataBlock pix = do | ||
221 | Session {..} <- asks connSession | ||
222 | mcached <- liftIO (tryReadMVar infodict) | ||
223 | case mcached of | ||
224 | Nothing -> undefined | ||
225 | Just (Cached {..}) -> undefined | ||
226 | |||
182 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () | 227 | sendBroadcast :: PeerMessage msg => msg -> Wire Session () |
183 | sendBroadcast msg = do | 228 | sendBroadcast msg = do |
184 | Session {..} <- asks connSession | 229 | Session {..} <- asks connSession |
@@ -208,9 +253,7 @@ tryFillRequestQueue = do | |||
208 | interesting :: Wire Session () | 253 | interesting :: Wire Session () |
209 | interesting = do | 254 | interesting = do |
210 | addr <- asks connRemoteAddr | 255 | addr <- asks connRemoteAddr |
211 | logMessage (Status (Interested True)) | ||
212 | sendMessage (Interested True) | 256 | sendMessage (Interested True) |
213 | logMessage (Status (Choking False)) | ||
214 | sendMessage (Choking False) | 257 | sendMessage (Choking False) |
215 | tryFillRequestQueue | 258 | tryFillRequestQueue |
216 | 259 | ||
@@ -218,17 +261,19 @@ interesting = do | |||
218 | -- Incoming message handling | 261 | -- Incoming message handling |
219 | -----------------------------------------------------------------------} | 262 | -----------------------------------------------------------------------} |
220 | 263 | ||
221 | handleStatus :: StatusUpdate -> Wire Session () | 264 | type Handler msg = msg -> Wire Session () |
265 | |||
266 | handleStatus :: Handler StatusUpdate | ||
222 | handleStatus s = do | 267 | handleStatus s = do |
223 | connStatus %= over remoteStatus (updateStatus s) | 268 | connStatus %= over remoteStatus (updateStatus s) |
224 | case s of | 269 | case s of |
225 | Interested _ -> return () | 270 | Interested _ -> return () |
226 | Choking True -> do | 271 | Choking True -> do |
227 | addr <- asks connRemoteAddr | 272 | addr <- asks connRemoteAddr |
228 | withStatusUpdates (resetPending addr) | 273 | withStatusUpdates (SS.resetPending addr) |
229 | Choking False -> tryFillRequestQueue | 274 | Choking False -> tryFillRequestQueue |
230 | 275 | ||
231 | handleAvailable :: Available -> Wire Session () | 276 | handleAvailable :: Handler Available |
232 | handleAvailable msg = do | 277 | handleAvailable msg = do |
233 | connBitfield %= case msg of | 278 | connBitfield %= case msg of |
234 | Have ix -> BF.insert ix | 279 | Have ix -> BF.insert ix |
@@ -243,18 +288,18 @@ handleAvailable msg = do | |||
243 | | bf `BF.isSubsetOf` thisBf -> return () | 288 | | bf `BF.isSubsetOf` thisBf -> return () |
244 | | otherwise -> interesting | 289 | | otherwise -> interesting |
245 | 290 | ||
246 | handleTransfer :: Transfer -> Wire Session () | 291 | handleTransfer :: Handler Transfer |
247 | handleTransfer (Request bix) = do | 292 | handleTransfer (Request bix) = do |
248 | Session {..} <- asks connSession | 293 | Session {..} <- asks connSession |
249 | bitfield <- getThisBitfield | 294 | bitfield <- getThisBitfield |
250 | upload <- canUpload <$> use connStatus | 295 | upload <- canUpload <$> use connStatus |
251 | when (upload && ixPiece bix `BF.member` bitfield) $ do | 296 | when (upload && ixPiece bix `BF.member` bitfield) $ do |
252 | blk <- liftIO $ readBlock bix storage | 297 | blk <- liftIO $ readBlock bix storage |
253 | sendMessage (Piece blk) | 298 | sendMessage (Message.Piece blk) |
254 | 299 | ||
255 | handleTransfer (Piece blk) = do | 300 | handleTransfer (Message.Piece blk) = do |
256 | Session {..} <- asks connSession | 301 | Session {..} <- asks connSession |
257 | isSuccess <- withStatusUpdates (pushBlock blk storage) | 302 | isSuccess <- withStatusUpdates (SS.pushBlock blk storage) |
258 | case isSuccess of | 303 | case isSuccess of |
259 | Nothing -> liftIO $ throwIO $ userError "block is not requested" | 304 | Nothing -> liftIO $ throwIO $ userError "block is not requested" |
260 | Just isCompleted -> do | 305 | Just isCompleted -> do |
@@ -265,29 +310,78 @@ handleTransfer (Piece blk) = do | |||
265 | 310 | ||
266 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) | 311 | handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) |
267 | where | 312 | where |
268 | transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix | 313 | transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix |
269 | transferResponse _ _ = False | 314 | transferResponse _ _ = False |
315 | |||
316 | {----------------------------------------------------------------------- | ||
317 | -- Metadata exchange | ||
318 | -----------------------------------------------------------------------} | ||
319 | -- TODO introduce new metadata exchange specific exceptions | ||
320 | |||
321 | tryRequestMetadataBlock :: Wire Session () | ||
322 | tryRequestMetadataBlock = do | ||
323 | addr <- asks connRemoteAddr | ||
324 | mpix <- lift $ withMetadataUpdates (Metadata.scheduleBlock addr) | ||
325 | case mpix of | ||
326 | Nothing -> undefined | ||
327 | Just pix -> sendMessage (MetadataRequest pix) | ||
328 | |||
329 | handleMetadata :: Handler ExtendedMetadata | ||
330 | handleMetadata (MetadataRequest pix) = | ||
331 | lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse | ||
332 | where | ||
333 | mkResponse Nothing = MetadataReject pix | ||
334 | mkResponse (Just (piece, total)) = MetadataData piece total | ||
335 | |||
336 | handleMetadata (MetadataData {..}) = do | ||
337 | addr <- asks connRemoteAddr | ||
338 | ih <- asks connTopic | ||
339 | lift $ withMetadataUpdates (Metadata.pushBlock addr piece ih) | ||
340 | tryRequestMetadataBlock | ||
341 | |||
342 | handleMetadata (MetadataReject pix) = do | ||
343 | lift $ withMetadataUpdates (Metadata.cancelPending pix) | ||
344 | |||
345 | handleMetadata (MetadataUnknown _ ) = do | ||
346 | logInfoN "Unknown metadata message" | ||
347 | |||
348 | waitForMetadata :: Wire Session () | ||
349 | waitForMetadata = do | ||
350 | Session {..} <- asks connSession | ||
351 | needFetch <- liftIO (isEmptyMVar infodict) | ||
352 | when needFetch $ do | ||
353 | canFetch <- allowed ExtMetadata <$> use connExtCaps | ||
354 | if canFetch | ||
355 | then tryRequestMetadataBlock | ||
356 | else liftIO (waitMVar infodict) | ||
270 | 357 | ||
271 | {----------------------------------------------------------------------- | 358 | {----------------------------------------------------------------------- |
272 | -- Event loop | 359 | -- Event loop |
273 | -----------------------------------------------------------------------} | 360 | -----------------------------------------------------------------------} |
274 | 361 | ||
275 | handleMessage :: Message -> Wire Session () | 362 | acceptRehandshake :: ExtendedHandshake -> Wire s () |
363 | acceptRehandshake ehs = undefined | ||
364 | |||
365 | handleExtended :: Handler ExtendedMessage | ||
366 | handleExtended (EHandshake ehs) = acceptRehandshake ehs | ||
367 | handleExtended (EMetadata _ msg) = handleMetadata msg | ||
368 | handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" | ||
369 | |||
370 | handleMessage :: Handler Message | ||
276 | handleMessage KeepAlive = return () | 371 | handleMessage KeepAlive = return () |
277 | handleMessage (Status s) = handleStatus s | 372 | handleMessage (Status s) = handleStatus s |
278 | handleMessage (Available msg) = handleAvailable msg | 373 | handleMessage (Available msg) = handleAvailable msg |
279 | handleMessage (Transfer msg) = handleTransfer msg | 374 | handleMessage (Transfer msg) = handleTransfer msg |
280 | handleMessage (Port n) = undefined | 375 | handleMessage (Port n) = undefined |
281 | handleMessage (Fast _) = undefined | 376 | handleMessage (Fast _) = undefined |
282 | handleMessage (Extended _) = undefined | 377 | handleMessage (Extended msg) = handleExtended msg |
283 | 378 | ||
284 | exchange :: Wire Session () | 379 | exchange :: Wire Session () |
285 | exchange = do | 380 | exchange = do |
381 | waitForMetadata | ||
286 | bf <- getThisBitfield | 382 | bf <- getThisBitfield |
287 | sendMessage (Bitfield bf) | 383 | sendMessage (Bitfield bf) |
288 | awaitForever $ \ msg -> do | 384 | awaitForever handleMessage |
289 | logMessage msg | ||
290 | handleMessage msg | ||
291 | 385 | ||
292 | data Event = NewMessage (PeerAddr IP) Message | 386 | data Event = NewMessage (PeerAddr IP) Message |
293 | | Timeout -- for scheduling | 387 | | Timeout -- for scheduling |