summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network/BitTorrent/Client/Handle.hs10
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs192
-rw-r--r--src/Network/BitTorrent/Exchange/Session/Metadata.hs4
-rw-r--r--tests/Network/BitTorrent/Exchange/SessionSpec.hs2
4 files changed, 112 insertions, 96 deletions
diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs
index 9601e691..5bcb2d33 100644
--- a/src/Network/BitTorrent/Client/Handle.hs
+++ b/src/Network/BitTorrent/Client/Handle.hs
@@ -71,10 +71,10 @@ lookupHandle ih = do
71-- Initialization 71-- Initialization
72-----------------------------------------------------------------------} 72-----------------------------------------------------------------------}
73 73
74newExchangeSession :: FilePath -> InfoDict -> BitTorrent Exchange.Session 74newExchangeSession :: FilePath -> Either InfoHash InfoDict -> BitTorrent Exchange.Session
75newExchangeSession rootPath dict = do 75newExchangeSession rootPath source = do
76 c @ Client {..} <- getClient 76 c @ Client {..} <- getClient
77 liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath dict 77 liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath source
78 78
79-- | Open a torrent in 'stop'ed state. Use 'nullTorrent' to open 79-- | Open a torrent in 'stop'ed state. Use 'nullTorrent' to open
80-- handle from 'InfoDict'. This operation do not block. 80-- handle from 'InfoDict'. This operation do not block.
@@ -83,7 +83,7 @@ openTorrent rootPath t @ Torrent {..} = do
83 let ih = idInfoHash tInfoDict 83 let ih = idInfoHash tInfoDict
84 allocHandle ih $ do 84 allocHandle ih $ do
85 tses <- liftIO $ Tracker.newSession ih (trackerList t) 85 tses <- liftIO $ Tracker.newSession ih (trackerList t)
86 eses <- newExchangeSession rootPath tInfoDict 86 eses <- newExchangeSession rootPath (Right tInfoDict)
87 return $ Handle 87 return $ Handle
88 { handleTopic = ih 88 { handleTopic = ih
89 , handlePrivate = idPrivate tInfoDict 89 , handlePrivate = idPrivate tInfoDict
@@ -96,7 +96,7 @@ openMagnet :: FilePath -> Magnet -> BitTorrent Handle
96openMagnet rootPath uri @ Magnet {..} = do 96openMagnet rootPath uri @ Magnet {..} = do
97 allocHandle exactTopic $ do 97 allocHandle exactTopic $ do
98 tses <- liftIO $ Tracker.newSession exactTopic def 98 tses <- liftIO $ Tracker.newSession exactTopic def
99 eses <- newExchangeSession rootPath (error "openMagnet" exactTopic) 99 eses <- newExchangeSession rootPath (Left exactTopic)
100 return $ Handle 100 return $ Handle
101 { handleTopic = exactTopic 101 { handleTopic = exactTopic
102 , handlePrivate = False 102 , handlePrivate = False
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 57d21579..b6d7f810 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -76,7 +76,7 @@ packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
76packException f m = try m >>= either (throwIO . f) return 76packException f m = try m >>= either (throwIO . f) return
77 77
78{----------------------------------------------------------------------- 78{-----------------------------------------------------------------------
79-- Session 79-- Session state
80-----------------------------------------------------------------------} 80-----------------------------------------------------------------------}
81-- TODO unmap storage on zero connections 81-- TODO unmap storage on zero connections
82 82
@@ -91,18 +91,56 @@ cache s = Cached s (BE.encode s)
91-- | Logger function. 91-- | Logger function.
92type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 92type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
93 93
94--data SessionStatus = Seeder | Leecher
95
96data SessionState
97 = WaitingMetadata
98 { metadataDownload :: MVar Metadata.Status
99 , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters
100 , contentRootPath :: FilePath
101 }
102 | HavingMetadata
103 { metadataCache :: Cached InfoDict
104 , contentDownload :: MVar SessionStatus
105 , contentStorage :: Storage
106 }
107
108newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState
109newSessionState rootPath (Left ih ) = do
110 WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath
111newSessionState rootPath (Right dict) = do
112 storage <- openInfoDict ReadWriteEx rootPath dict
113 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
114 (piPieceLength (idPieceInfo dict))
115 return $ HavingMetadata (cache dict) download storage
116
117closeSessionState :: SessionState -> IO ()
118closeSessionState WaitingMetadata {..} = return ()
119closeSessionState HavingMetadata {..} = close contentStorage
120
121haveMetadata :: InfoDict -> SessionState -> IO SessionState
122haveMetadata dict WaitingMetadata {..} = do
123 storage <- openInfoDict ReadWriteEx contentRootPath dict
124 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
125 (piPieceLength (idPieceInfo dict))
126 return HavingMetadata
127 { metadataCache = cache dict
128 , contentDownload = download
129 , contentStorage = storage
130 }
131haveMetadata _ s = return s
132
133{-----------------------------------------------------------------------
134-- Session
135-----------------------------------------------------------------------}
136
94data Session = Session 137data Session = Session
95 { sessionPeerId :: !(PeerId) 138 { sessionPeerId :: !(PeerId)
96 , sessionTopic :: !(InfoHash) 139 , sessionTopic :: !(InfoHash)
97 , sessionLogger :: !(LogFun) 140 , sessionLogger :: !(LogFun)
98 , sessionEvents :: !(SendPort SessionEvent) 141 , sessionEvents :: !(SendPort SessionEvent)
99 142
100------------------------------------------------------------------------ 143 , sessionState :: !(MVar SessionState)
101 , metadata :: !(MVar Metadata.Status)
102 , infodict :: !(MVar (Cached InfoDict))
103
104 , status :: !(MVar SessionStatus)
105 , sessionStorage :: !(MVar Storage)
106 144
107------------------------------------------------------------------------ 145------------------------------------------------------------------------
108 , connectionsPrefs :: !ConnectionPrefs 146 , connectionsPrefs :: !ConnectionPrefs
@@ -124,44 +162,26 @@ data Session = Session
124 , connectionsBroadcast :: !(Chan Message) 162 , connectionsBroadcast :: !(Chan Message)
125 } 163 }
126 164
127{-----------------------------------------------------------------------
128-- Session construction
129-----------------------------------------------------------------------}
130
131newSession :: LogFun 165newSession :: LogFun
132 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 166 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
133 -> FilePath -- ^ root directory for content files; 167 -> FilePath -- ^ root directory for content files;
134 -> InfoDict -- ^ torrent info dictionary; 168 -> Either InfoHash InfoDict -- ^ torrent info dictionary;
135 -> IO Session -- ^ 169 -> IO Session
136newSession logFun addr rootPath dict = do 170newSession logFun addr rootPath source = do
171 let ih = either id idInfoHash source
137 pid <- maybe genPeerId return (peerId addr) 172 pid <- maybe genPeerId return (peerId addr)
138 let ih = idInfoHash dict
139 eventStream <- newSendPort 173 eventStream <- newSendPort
140 174 sState <- newSessionState rootPath source
141 storage <- openInfoDict ReadWriteEx rootPath dict 175 sStateVar <- newMVar sState
142 storageVar <- newMVar storage
143
144 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
145 (piPieceLength (idPieceInfo dict))
146
147 metadataVar <- newMVar (error "sessionMetadata")
148 infodictVar <- newMVar (cache dict)
149
150 pSetVar <- newTVarIO S.empty 176 pSetVar <- newTVarIO S.empty
151 eSetVar <- newTVarIO M.empty 177 eSetVar <- newTVarIO M.empty
152 chan <- newChan 178 chan <- newChan
153
154 return Session 179 return Session
155 { sessionPeerId = pid 180 { sessionPeerId = pid
156 , sessionTopic = ih 181 , sessionTopic = ih
157 , sessionLogger = logFun 182 , sessionLogger = logFun
158 , sessionEvents = eventStream 183 , sessionEvents = eventStream
159 184 , sessionState = sStateVar
160 , metadata = metadataVar
161 , infodict = infodictVar
162 , status = statusVar
163 , sessionStorage = storageVar
164
165 , connectionsPrefs = def 185 , connectionsPrefs = def
166 , connectionsPending = pSetVar 186 , connectionsPending = pSetVar
167 , connectionsEstablished = eSetVar 187 , connectionsEstablished = eSetVar
@@ -171,8 +191,8 @@ newSession logFun addr rootPath dict = do
171 191
172closeSession :: Session -> IO () 192closeSession :: Session -> IO ()
173closeSession Session {..} = do 193closeSession Session {..} = do
174 mstorage <- tryReadMVar sessionStorage 194 s <- readMVar sessionState
175 maybe (return ()) close mstorage 195 closeSessionState s
176{- 196{-
177 hSet <- atomically $ do 197 hSet <- atomically $ do
178 pSet <- swapTVar connectionsPending S.empty 198 pSet <- swapTVar connectionsPending S.empty
@@ -341,22 +361,6 @@ tryReadMVar m = do
341 maybe (return ()) (putMVar m) ma 361 maybe (return ()) (putMVar m) ma
342 return ma 362 return ma
343 363
344withStatusUpdates :: StatusUpdates a -> Wire Session a
345withStatusUpdates m = do
346 Session {..} <- asks connSession
347 liftIO $ runStatusUpdates status m
348
349withMetadataUpdates :: Updates a -> Connected Session a
350withMetadataUpdates m = do
351 Session {..} <- asks connSession
352 addr <- asks connRemoteAddr
353 liftIO $ runUpdates metadata addr m
354
355getThisBitfield :: Wire Session Bitfield
356getThisBitfield = do
357 ses <- asks connSession
358 liftIO $ SS.getBitfield (status ses)
359
360readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) 364readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString)
361readBlock bix @ BlockIx {..} s = do 365readBlock bix @ BlockIx {..} s = do
362 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s 366 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s
@@ -371,10 +375,10 @@ tryReadMetadataBlock :: PieceIx
371 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) 375 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int))
372tryReadMetadataBlock pix = do 376tryReadMetadataBlock pix = do
373 Session {..} <- asks connSession 377 Session {..} <- asks connSession
374 mcached <- liftIO (tryReadMVar infodict) 378 s <- liftIO (readMVar sessionState)
375 case mcached of 379 case s of
376 Nothing -> error "tryReadMetadataBlock" 380 WaitingMetadata {..} -> error "tryReadMetadataBlock"
377 Just (Cached {..}) -> error "tryReadMetadataBlock" 381 HavingMetadata {..} -> error "tryReadMetadataBlock"
378 382
379sendBroadcast :: PeerMessage msg => msg -> Wire Session () 383sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
380sendBroadcast msg = do 384sendBroadcast msg = do
@@ -383,10 +387,18 @@ sendBroadcast msg = do
383-- liftIO $ msg `broadcast` sessionConnections 387-- liftIO $ msg `broadcast` sessionConnections
384 388
385waitMetadata :: Session -> IO InfoDict 389waitMetadata :: Session -> IO InfoDict
386waitMetadata Session {..} = cachedValue <$> readMVar infodict 390waitMetadata Session {..} = do
391 s <- readMVar sessionState
392 case s of
393 WaitingMetadata {..} -> readMVar metadataCompleted
394 HavingMetadata {..} -> return (cachedValue metadataCache)
387 395
388takeMetadata :: Session -> IO (Maybe InfoDict) 396takeMetadata :: Session -> IO (Maybe InfoDict)
389takeMetadata Session {..} = fmap cachedValue <$> tryReadMVar infodict 397takeMetadata Session {..} = do
398 s <- readMVar sessionState
399 case s of
400 WaitingMetadata {..} -> return Nothing
401 HavingMetadata {..} -> return (Just (cachedValue metadataCache))
390 402
391{----------------------------------------------------------------------- 403{-----------------------------------------------------------------------
392-- Triggers 404-- Triggers
@@ -405,12 +417,13 @@ interesting = do
405fillRequestQueue :: Trigger 417fillRequestQueue :: Trigger
406fillRequestQueue = do 418fillRequestQueue = do
407 maxN <- lift getMaxQueueLength 419 maxN <- lift getMaxQueueLength
408 rbf <- use connBitfield 420 rbf <- use connBitfield
409 addr <- asks connRemoteAddr 421 addr <- asks connRemoteAddr
410 blks <- withStatusUpdates $ do 422-- blks <- withStatusUpdates $ do
411 n <- getRequestQueueLength addr 423-- n <- getRequestQueueLength addr
412 scheduleBlocks addr rbf (maxN - n) 424-- scheduleBlocks addr rbf (maxN - n)
413 mapM_ (sendMessage . Request) blks 425-- mapM_ (sendMessage . Request) blks
426 return ()
414 427
415tryFillRequestQueue :: Trigger 428tryFillRequestQueue :: Trigger
416tryFillRequestQueue = do 429tryFillRequestQueue = do
@@ -431,7 +444,8 @@ handleStatus s = do
431 Interested _ -> return () 444 Interested _ -> return ()
432 Choking True -> do 445 Choking True -> do
433 addr <- asks connRemoteAddr 446 addr <- asks connRemoteAddr
434 withStatusUpdates (SS.resetPending addr) 447-- withStatusUpdates (SS.resetPending addr)
448 return ()
435 Choking False -> tryFillRequestQueue 449 Choking False -> tryFillRequestQueue
436 450
437handleAvailable :: Handler Available 451handleAvailable :: Handler Available
@@ -440,7 +454,8 @@ handleAvailable msg = do
440 Have ix -> BF.insert ix 454 Have ix -> BF.insert ix
441 Bitfield bf -> const bf 455 Bitfield bf -> const bf
442 456
443 thisBf <- getThisBitfield 457 --thisBf <- getThisBitfield
458 thisBf <- undefined
444 case msg of 459 case msg of
445 Have ix 460 Have ix
446 | ix `BF.member` thisBf -> return () 461 | ix `BF.member` thisBf -> return ()
@@ -452,23 +467,23 @@ handleAvailable msg = do
452handleTransfer :: Handler Transfer 467handleTransfer :: Handler Transfer
453handleTransfer (Request bix) = do 468handleTransfer (Request bix) = do
454 Session {..} <- asks connSession 469 Session {..} <- asks connSession
455 bitfield <- getThisBitfield 470 s <- liftIO $ readMVar sessionState
456 upload <- canUpload <$> use connStatus 471 case s of
457 when (upload && ixPiece bix `BF.member` bitfield) $ do 472 WaitingMetadata {..} -> return ()
458 mstorage <- liftIO $ tryReadMVar sessionStorage 473 HavingMetadata {..} -> do
459 case mstorage of 474 bitfield <- undefined -- getThisBitfield
460 Nothing -> return () 475 upload <- canUpload <$> use connStatus
461 Just storage -> do 476 when (upload && ixPiece bix `BF.member` bitfield) $ do
462 blk <- liftIO $ readBlock bix storage 477 blk <- liftIO $ readBlock bix contentStorage
463 sendMessage (Message.Piece blk) 478 sendMessage (Message.Piece blk)
464 479
465handleTransfer (Message.Piece blk) = do 480handleTransfer (Message.Piece blk) = do
466 Session {..} <- asks connSession 481 Session {..} <- asks connSession
467 mstorage <- liftIO $ tryReadMVar sessionStorage 482 s <- liftIO $ readMVar sessionState
468 case mstorage of 483 case s of
469 Nothing -> return () -- TODO (?) break connection 484 WaitingMetadata {..} -> return () -- TODO (?) break connection
470 Just storage -> do 485 HavingMetadata {..} -> do
471 isSuccess <- withStatusUpdates (SS.pushBlock blk storage) 486 isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage)
472 case isSuccess of 487 case isSuccess of
473 Nothing -> liftIO $ throwIO $ userError "block is not requested" 488 Nothing -> liftIO $ throwIO $ userError "block is not requested"
474 Just isCompleted -> do 489 Just isCompleted -> do
@@ -490,25 +505,20 @@ handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
490waitForMetadata :: Trigger 505waitForMetadata :: Trigger
491waitForMetadata = do 506waitForMetadata = do
492 Session {..} <- asks connSession 507 Session {..} <- asks connSession
493 needFetch <- liftIO (isEmptyMVar infodict) 508 needFetch <- undefined --liftIO (isEmptyMVar infodict)
494 when needFetch $ do 509 when needFetch $ do
495 canFetch <- allowed ExtMetadata <$> use connExtCaps 510 canFetch <- allowed ExtMetadata <$> use connExtCaps
496 if canFetch 511 if canFetch
497 then tryRequestMetadataBlock 512 then tryRequestMetadataBlock
498 else liftIO (waitMVar infodict) 513 else undefined -- liftIO (waitMVar infodict)
499 514
500tryRequestMetadataBlock :: Trigger 515tryRequestMetadataBlock :: Trigger
501tryRequestMetadataBlock = do 516tryRequestMetadataBlock = do
502 mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock 517 mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock
503 case mpix of 518 case mpix of
504 Nothing -> error "tryRequestMetadataBlock" 519 Nothing -> error "tryRequestMetadataBlock"
505 Just pix -> sendMessage (MetadataRequest pix) 520 Just pix -> sendMessage (MetadataRequest pix)
506 521
507metadataCompleted :: InfoDict -> Trigger
508metadataCompleted dict = do
509 Session {..} <- asks connSession
510 liftIO $ putMVar infodict (cache dict)
511
512handleMetadata :: Handler ExtendedMetadata 522handleMetadata :: Handler ExtendedMetadata
513handleMetadata (MetadataRequest pix) = 523handleMetadata (MetadataRequest pix) =
514 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse 524 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse
@@ -518,13 +528,15 @@ handleMetadata (MetadataRequest pix) =
518 528
519handleMetadata (MetadataData {..}) = do 529handleMetadata (MetadataData {..}) = do
520 ih <- asks connTopic 530 ih <- asks connTopic
521 mdict <- lift $ withMetadataUpdates (Metadata.pushBlock piece ih) 531 mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih)
522 case mdict of 532 case mdict of
523 Nothing -> tryRequestMetadataBlock -- not completed, need all blocks 533 Nothing -> tryRequestMetadataBlock -- not completed, need all blocks
524 Just dict -> metadataCompleted dict -- complete, wake up payload fetch 534 Just dict -> do -- complete, wake up payload fetch
535 Session {..} <- asks connSession
536 liftIO $ modifyMVar_ sessionState (haveMetadata dict)
525 537
526handleMetadata (MetadataReject pix) = do 538handleMetadata (MetadataReject pix) = do
527 lift $ withMetadataUpdates (Metadata.cancelPending pix) 539 lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix)
528 540
529handleMetadata (MetadataUnknown _ ) = do 541handleMetadata (MetadataUnknown _ ) = do
530 logInfoN "Unknown metadata message" 542 logInfoN "Unknown metadata message"
@@ -553,7 +565,7 @@ handleMessage (Extended msg) = handleExtended msg
553exchange :: Wire Session () 565exchange :: Wire Session ()
554exchange = do 566exchange = do
555 waitForMetadata 567 waitForMetadata
556 bf <- getThisBitfield 568 bf <- undefined --getThisBitfield
557 sendMessage (Bitfield bf) 569 sendMessage (Bitfield bf)
558 awaitForever handleMessage 570 awaitForever handleMessage
559 571
diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs
index 79fd03b0..79156e2e 100644
--- a/src/Network/BitTorrent/Exchange/Session/Metadata.hs
+++ b/src/Network/BitTorrent/Exchange/Session/Metadata.hs
@@ -21,6 +21,7 @@ import Control.Monad.Reader
21import Control.Monad.State 21import Control.Monad.State
22import Data.ByteString as BS 22import Data.ByteString as BS
23import Data.ByteString.Lazy as BL 23import Data.ByteString.Lazy as BL
24import Data.Default
24import Data.List as L 25import Data.List as L
25import Data.Tuple 26import Data.Tuple
26 27
@@ -41,6 +42,9 @@ data Status = Status
41 42
42makeLenses ''Status 43makeLenses ''Status
43 44
45instance Default Status where
46 def = error "default status"
47
44-- | Create a new scheduler for infodict of the given size. 48-- | Create a new scheduler for infodict of the given size.
45nullStatus :: Int -> Status 49nullStatus :: Int -> Status
46nullStatus ps = Status [] (Block.empty ps) 50nullStatus ps = Status [] (Block.empty ps)
diff --git a/tests/Network/BitTorrent/Exchange/SessionSpec.hs b/tests/Network/BitTorrent/Exchange/SessionSpec.hs
index 0eec8f41..c2c76644 100644
--- a/tests/Network/BitTorrent/Exchange/SessionSpec.hs
+++ b/tests/Network/BitTorrent/Exchange/SessionSpec.hs
@@ -16,7 +16,7 @@ simpleSession :: InfoDict -> (Session -> IO ()) -> IO ()
16simpleSession dict action = do 16simpleSession dict action = do
17 withRemoteAddr $ \ addr -> do 17 withRemoteAddr $ \ addr -> do
18 myAddr <- getMyAddr 18 myAddr <- getMyAddr
19 ses <- newSession nullLogger myAddr "" dict 19 ses <- newSession nullLogger myAddr "" (Right dict)
20 connect addr ses 20 connect addr ses
21 action ses 21 action ses
22 closeSession ses 22 closeSession ses