summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Session.hs
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-02-15 04:18:05 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-02-15 04:18:05 +0400
commit0fa6a0ee5eb1fbf648d3864626430efcbdb4aaae (patch)
tree7d2c6b8db43943974772069efb22480db8186bb1 /src/Network/BitTorrent/Exchange/Session.hs
parentdaf978ddd1f0a07ce4711fa97f51d0ec02478f73 (diff)
Move metadata exchange from Wire to Session
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs154
1 files changed, 124 insertions, 30 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 1e72ba96..f10f601e 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -12,7 +12,7 @@ module Network.BitTorrent.Exchange.Session
12 12
13import Control.Applicative 13import Control.Applicative
14import Control.Concurrent 14import Control.Concurrent
15import Control.Exception 15import Control.Exception hiding (Handler)
16import Control.Lens 16import Control.Lens
17import Control.Monad.Logger 17import Control.Monad.Logger
18import Control.Monad.Reader 18import Control.Monad.Reader
@@ -20,6 +20,7 @@ import Control.Monad.State
20import Data.ByteString as BS 20import Data.ByteString as BS
21import Data.ByteString.Lazy as BL 21import Data.ByteString.Lazy as BL
22import Data.Conduit 22import Data.Conduit
23import Data.Conduit.List as CL (iterM)
23import Data.Function 24import Data.Function
24import Data.IORef 25import Data.IORef
25import Data.List as L 26import Data.List as L
@@ -34,16 +35,18 @@ import Text.PrettyPrint hiding ((<>))
34import Text.PrettyPrint.Class 35import Text.PrettyPrint.Class
35import System.Log.FastLogger (LogStr, ToLogStr (..)) 36import System.Log.FastLogger (LogStr, ToLogStr (..))
36 37
38import Data.BEncode as BE
37import Data.Torrent (InfoDict (..)) 39import Data.Torrent (InfoDict (..))
38import Data.Torrent.Bitfield as BF 40import Data.Torrent.Bitfield as BF
39import Data.Torrent.InfoHash 41import Data.Torrent.InfoHash
40import Data.Torrent.Piece (pieceData, piPieceLength) 42import Data.Torrent.Piece
41import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) 43import qualified Data.Torrent.Piece as Torrent (Piece (Piece))
42import Network.BitTorrent.Core 44import Network.BitTorrent.Core
43import Network.BitTorrent.Exchange.Assembler 45import Network.BitTorrent.Exchange.Assembler
44import Network.BitTorrent.Exchange.Block as Block 46import Network.BitTorrent.Exchange.Block as Block
45import Network.BitTorrent.Exchange.Message 47import Network.BitTorrent.Exchange.Message as Message
46import Network.BitTorrent.Exchange.Session.Status as SS 48import Network.BitTorrent.Exchange.Session.Metadata as Metadata
49import Network.BitTorrent.Exchange.Session.Status as SS
47import Network.BitTorrent.Exchange.Status 50import Network.BitTorrent.Exchange.Status
48import Network.BitTorrent.Exchange.Wire 51import Network.BitTorrent.Exchange.Wire
49import System.Torrent.Storage 52import System.Torrent.Storage
@@ -66,6 +69,14 @@ packException f m = try m >>= either (throwIO . f) return
66-- Session 69-- Session
67-----------------------------------------------------------------------} 70-----------------------------------------------------------------------}
68 71
72data Cached a = Cached
73 { cachedValue :: !a
74 , cachedData :: BL.ByteString -- keep lazy
75 }
76
77cache :: BEncode a => a -> Cached a
78cache s = Cached s (BE.encode s)
79
69data ConnectionEntry = ConnectionEntry 80data ConnectionEntry = ConnectionEntry
70 { initiatedBy :: !ChannelSide 81 { initiatedBy :: !ChannelSide
71 , connection :: !(Connection Session) 82 , connection :: !(Connection Session)
@@ -74,12 +85,14 @@ data ConnectionEntry = ConnectionEntry
74data Session = Session 85data Session = Session
75 { tpeerId :: PeerId 86 { tpeerId :: PeerId
76 , infohash :: InfoHash 87 , infohash :: InfoHash
88 , metadata :: MVar Metadata.Status
77 , storage :: Storage 89 , storage :: Storage
78 , status :: MVar SessionStatus 90 , status :: MVar SessionStatus
79 , unchoked :: [PeerAddr IP] 91 , unchoked :: [PeerAddr IP]
80 , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) 92 , connections :: MVar (Map (PeerAddr IP) ConnectionEntry)
81 , broadcast :: Chan Message 93 , broadcast :: Chan Message
82 , logger :: LogFun 94 , logger :: LogFun
95 , infodict :: MVar (Cached InfoDict)
83 } 96 }
84 97
85-- | Logger function. 98-- | Logger function.
@@ -108,7 +121,13 @@ newSession logFun addr rootPath dict = do
108 } 121 }
109 122
110closeSession :: Session -> IO () 123closeSession :: Session -> IO ()
111closeSession = undefined 124closeSession ses = do
125 deleteAll ses
126 undefined
127
128{-----------------------------------------------------------------------
129-- Logging
130-----------------------------------------------------------------------}
112 131
113instance MonadLogger (Connected Session) where 132instance MonadLogger (Connected Session) where
114 monadLoggerLog loc src lvl msg = do 133 monadLoggerLog loc src lvl msg = do
@@ -118,11 +137,11 @@ instance MonadLogger (Connected Session) where
118 let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) 137 let addrSrc = src <> " @ " <> T.pack (render (pretty addr))
119 liftIO $ logger ses loc addrSrc lvl (toLogStr msg) 138 liftIO $ logger ses loc addrSrc lvl (toLogStr msg)
120 139
121logMessage :: Message -> Wire Session () 140logMessage :: MonadLogger m => Message -> m ()
122logMessage msg = logDebugN $ T.pack (render (pretty msg)) 141logMessage msg = logDebugN $ T.pack (render (pretty msg))
123 142
124logEvent :: Text -> Wire Session () 143logEvent :: MonadLogger m => Text -> m ()
125logEvent = logInfoN 144logEvent = logInfoN
126 145
127{----------------------------------------------------------------------- 146{-----------------------------------------------------------------------
128-- Connections 147-- Connections
@@ -132,11 +151,12 @@ logEvent = logInfoN
132insert :: PeerAddr IP 151insert :: PeerAddr IP
133 -> {- Maybe Socket 152 -> {- Maybe Socket
134 -> -} Session -> IO () 153 -> -} Session -> IO ()
135insert addr ses @ Session {..} = do 154insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup)
136 forkIO $ do
137 action `finally` runStatusUpdates status (resetPending addr)
138 return ()
139 where 155 where
156 cleanup = do
157 runStatusUpdates status (SS.resetPending addr)
158 -- TODO Metata.resetPending addr
159
140 action = do 160 action = do
141 let caps = def 161 let caps = def
142 let ecaps = def 162 let ecaps = def
@@ -147,7 +167,7 @@ insert addr ses @ Session {..} = do
147-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn 167-- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn
148 lift $ resizeBitfield (totalPieces storage) 168 lift $ resizeBitfield (totalPieces storage)
149 logEvent "Connection established" 169 logEvent "Connection established"
150 exchange 170 iterM logMessage =$= exchange =$= iterM logMessage
151-- liftIO $ modifyMVar_ connections $ pure . M.delete addr 171-- liftIO $ modifyMVar_ connections $ pure . M.delete addr
152 172
153delete :: PeerAddr IP -> Session -> IO () 173delete :: PeerAddr IP -> Session -> IO ()
@@ -160,11 +180,26 @@ deleteAll = undefined
160-- Helpers 180-- Helpers
161-----------------------------------------------------------------------} 181-----------------------------------------------------------------------}
162 182
183waitMVar :: MVar a -> IO ()
184waitMVar m = withMVar m (const (return ()))
185
186-- This function appear in new GHC "out of box". (moreover it is atomic)
187tryReadMVar :: MVar a -> IO (Maybe a)
188tryReadMVar m = do
189 ma <- tryTakeMVar m
190 maybe (return ()) (putMVar m) ma
191 return ma
192
163withStatusUpdates :: StatusUpdates a -> Wire Session a 193withStatusUpdates :: StatusUpdates a -> Wire Session a
164withStatusUpdates m = do 194withStatusUpdates m = do
165 Session {..} <- asks connSession 195 Session {..} <- asks connSession
166 liftIO $ runStatusUpdates status m 196 liftIO $ runStatusUpdates status m
167 197
198withMetadataUpdates :: Updates a -> Connected Session a
199withMetadataUpdates m = do
200 Session {..} <- asks connSession
201 liftIO $ runUpdates metadata m
202
168getThisBitfield :: Wire Session Bitfield 203getThisBitfield :: Wire Session Bitfield
169getThisBitfield = do 204getThisBitfield = do
170 ses <- asks connSession 205 ses <- asks connSession
@@ -179,6 +214,16 @@ readBlock bix @ BlockIx {..} s = do
179 then return $ Block ixPiece ixOffset chunk 214 then return $ Block ixPiece ixOffset chunk
180 else throwIO $ InvalidRequest bix (InvalidSize ixLength) 215 else throwIO $ InvalidRequest bix (InvalidSize ixLength)
181 216
217-- |
218tryReadMetadataBlock :: PieceIx
219 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int))
220tryReadMetadataBlock pix = do
221 Session {..} <- asks connSession
222 mcached <- liftIO (tryReadMVar infodict)
223 case mcached of
224 Nothing -> undefined
225 Just (Cached {..}) -> undefined
226
182sendBroadcast :: PeerMessage msg => msg -> Wire Session () 227sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
183sendBroadcast msg = do 228sendBroadcast msg = do
184 Session {..} <- asks connSession 229 Session {..} <- asks connSession
@@ -208,9 +253,7 @@ tryFillRequestQueue = do
208interesting :: Wire Session () 253interesting :: Wire Session ()
209interesting = do 254interesting = do
210 addr <- asks connRemoteAddr 255 addr <- asks connRemoteAddr
211 logMessage (Status (Interested True))
212 sendMessage (Interested True) 256 sendMessage (Interested True)
213 logMessage (Status (Choking False))
214 sendMessage (Choking False) 257 sendMessage (Choking False)
215 tryFillRequestQueue 258 tryFillRequestQueue
216 259
@@ -218,17 +261,19 @@ interesting = do
218-- Incoming message handling 261-- Incoming message handling
219-----------------------------------------------------------------------} 262-----------------------------------------------------------------------}
220 263
221handleStatus :: StatusUpdate -> Wire Session () 264type Handler msg = msg -> Wire Session ()
265
266handleStatus :: Handler StatusUpdate
222handleStatus s = do 267handleStatus s = do
223 connStatus %= over remoteStatus (updateStatus s) 268 connStatus %= over remoteStatus (updateStatus s)
224 case s of 269 case s of
225 Interested _ -> return () 270 Interested _ -> return ()
226 Choking True -> do 271 Choking True -> do
227 addr <- asks connRemoteAddr 272 addr <- asks connRemoteAddr
228 withStatusUpdates (resetPending addr) 273 withStatusUpdates (SS.resetPending addr)
229 Choking False -> tryFillRequestQueue 274 Choking False -> tryFillRequestQueue
230 275
231handleAvailable :: Available -> Wire Session () 276handleAvailable :: Handler Available
232handleAvailable msg = do 277handleAvailable msg = do
233 connBitfield %= case msg of 278 connBitfield %= case msg of
234 Have ix -> BF.insert ix 279 Have ix -> BF.insert ix
@@ -243,18 +288,18 @@ handleAvailable msg = do
243 | bf `BF.isSubsetOf` thisBf -> return () 288 | bf `BF.isSubsetOf` thisBf -> return ()
244 | otherwise -> interesting 289 | otherwise -> interesting
245 290
246handleTransfer :: Transfer -> Wire Session () 291handleTransfer :: Handler Transfer
247handleTransfer (Request bix) = do 292handleTransfer (Request bix) = do
248 Session {..} <- asks connSession 293 Session {..} <- asks connSession
249 bitfield <- getThisBitfield 294 bitfield <- getThisBitfield
250 upload <- canUpload <$> use connStatus 295 upload <- canUpload <$> use connStatus
251 when (upload && ixPiece bix `BF.member` bitfield) $ do 296 when (upload && ixPiece bix `BF.member` bitfield) $ do
252 blk <- liftIO $ readBlock bix storage 297 blk <- liftIO $ readBlock bix storage
253 sendMessage (Piece blk) 298 sendMessage (Message.Piece blk)
254 299
255handleTransfer (Piece blk) = do 300handleTransfer (Message.Piece blk) = do
256 Session {..} <- asks connSession 301 Session {..} <- asks connSession
257 isSuccess <- withStatusUpdates (pushBlock blk storage) 302 isSuccess <- withStatusUpdates (SS.pushBlock blk storage)
258 case isSuccess of 303 case isSuccess of
259 Nothing -> liftIO $ throwIO $ userError "block is not requested" 304 Nothing -> liftIO $ throwIO $ userError "block is not requested"
260 Just isCompleted -> do 305 Just isCompleted -> do
@@ -265,29 +310,78 @@ handleTransfer (Piece blk) = do
265 310
266handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) 311handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
267 where 312 where
268 transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix 313 transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix
269 transferResponse _ _ = False 314 transferResponse _ _ = False
315
316{-----------------------------------------------------------------------
317-- Metadata exchange
318-----------------------------------------------------------------------}
319-- TODO introduce new metadata exchange specific exceptions
320
321tryRequestMetadataBlock :: Wire Session ()
322tryRequestMetadataBlock = do
323 addr <- asks connRemoteAddr
324 mpix <- lift $ withMetadataUpdates (Metadata.scheduleBlock addr)
325 case mpix of
326 Nothing -> undefined
327 Just pix -> sendMessage (MetadataRequest pix)
328
329handleMetadata :: Handler ExtendedMetadata
330handleMetadata (MetadataRequest pix) =
331 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse
332 where
333 mkResponse Nothing = MetadataReject pix
334 mkResponse (Just (piece, total)) = MetadataData piece total
335
336handleMetadata (MetadataData {..}) = do
337 addr <- asks connRemoteAddr
338 ih <- asks connTopic
339 lift $ withMetadataUpdates (Metadata.pushBlock addr piece ih)
340 tryRequestMetadataBlock
341
342handleMetadata (MetadataReject pix) = do
343 lift $ withMetadataUpdates (Metadata.cancelPending pix)
344
345handleMetadata (MetadataUnknown _ ) = do
346 logInfoN "Unknown metadata message"
347
348waitForMetadata :: Wire Session ()
349waitForMetadata = do
350 Session {..} <- asks connSession
351 needFetch <- liftIO (isEmptyMVar infodict)
352 when needFetch $ do
353 canFetch <- allowed ExtMetadata <$> use connExtCaps
354 if canFetch
355 then tryRequestMetadataBlock
356 else liftIO (waitMVar infodict)
270 357
271{----------------------------------------------------------------------- 358{-----------------------------------------------------------------------
272-- Event loop 359-- Event loop
273-----------------------------------------------------------------------} 360-----------------------------------------------------------------------}
274 361
275handleMessage :: Message -> Wire Session () 362acceptRehandshake :: ExtendedHandshake -> Wire s ()
363acceptRehandshake ehs = undefined
364
365handleExtended :: Handler ExtendedMessage
366handleExtended (EHandshake ehs) = acceptRehandshake ehs
367handleExtended (EMetadata _ msg) = handleMetadata msg
368handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message"
369
370handleMessage :: Handler Message
276handleMessage KeepAlive = return () 371handleMessage KeepAlive = return ()
277handleMessage (Status s) = handleStatus s 372handleMessage (Status s) = handleStatus s
278handleMessage (Available msg) = handleAvailable msg 373handleMessage (Available msg) = handleAvailable msg
279handleMessage (Transfer msg) = handleTransfer msg 374handleMessage (Transfer msg) = handleTransfer msg
280handleMessage (Port n) = undefined 375handleMessage (Port n) = undefined
281handleMessage (Fast _) = undefined 376handleMessage (Fast _) = undefined
282handleMessage (Extended _) = undefined 377handleMessage (Extended msg) = handleExtended msg
283 378
284exchange :: Wire Session () 379exchange :: Wire Session ()
285exchange = do 380exchange = do
381 waitForMetadata
286 bf <- getThisBitfield 382 bf <- getThisBitfield
287 sendMessage (Bitfield bf) 383 sendMessage (Bitfield bf)
288 awaitForever $ \ msg -> do 384 awaitForever handleMessage
289 logMessage msg
290 handleMessage msg
291 385
292data Event = NewMessage (PeerAddr IP) Message 386data Event = NewMessage (PeerAddr IP) Message
293 | Timeout -- for scheduling 387 | Timeout -- for scheduling