summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Session.hs
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-04-04 01:03:55 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-04-04 01:03:55 +0400
commitde1fee157b87b62161ac68f32bd6e72ba9a11275 (patch)
tree75b90c02937e49e1cb712f48d5823da3c90a002c /src/Network/BitTorrent/Exchange/Session.hs
parent937342955301e9820a9bcbafcf8922cc5dd1798d (diff)
[Exchange] newSession now can take infohash only
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Session.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Session.hs192
1 files changed, 102 insertions, 90 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs
index 57d21579..b6d7f810 100644
--- a/src/Network/BitTorrent/Exchange/Session.hs
+++ b/src/Network/BitTorrent/Exchange/Session.hs
@@ -76,7 +76,7 @@ packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a
76packException f m = try m >>= either (throwIO . f) return 76packException f m = try m >>= either (throwIO . f) return
77 77
78{----------------------------------------------------------------------- 78{-----------------------------------------------------------------------
79-- Session 79-- Session state
80-----------------------------------------------------------------------} 80-----------------------------------------------------------------------}
81-- TODO unmap storage on zero connections 81-- TODO unmap storage on zero connections
82 82
@@ -91,18 +91,56 @@ cache s = Cached s (BE.encode s)
91-- | Logger function. 91-- | Logger function.
92type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () 92type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
93 93
94--data SessionStatus = Seeder | Leecher
95
96data SessionState
97 = WaitingMetadata
98 { metadataDownload :: MVar Metadata.Status
99 , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters
100 , contentRootPath :: FilePath
101 }
102 | HavingMetadata
103 { metadataCache :: Cached InfoDict
104 , contentDownload :: MVar SessionStatus
105 , contentStorage :: Storage
106 }
107
108newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState
109newSessionState rootPath (Left ih ) = do
110 WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath
111newSessionState rootPath (Right dict) = do
112 storage <- openInfoDict ReadWriteEx rootPath dict
113 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
114 (piPieceLength (idPieceInfo dict))
115 return $ HavingMetadata (cache dict) download storage
116
117closeSessionState :: SessionState -> IO ()
118closeSessionState WaitingMetadata {..} = return ()
119closeSessionState HavingMetadata {..} = close contentStorage
120
121haveMetadata :: InfoDict -> SessionState -> IO SessionState
122haveMetadata dict WaitingMetadata {..} = do
123 storage <- openInfoDict ReadWriteEx contentRootPath dict
124 download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
125 (piPieceLength (idPieceInfo dict))
126 return HavingMetadata
127 { metadataCache = cache dict
128 , contentDownload = download
129 , contentStorage = storage
130 }
131haveMetadata _ s = return s
132
133{-----------------------------------------------------------------------
134-- Session
135-----------------------------------------------------------------------}
136
94data Session = Session 137data Session = Session
95 { sessionPeerId :: !(PeerId) 138 { sessionPeerId :: !(PeerId)
96 , sessionTopic :: !(InfoHash) 139 , sessionTopic :: !(InfoHash)
97 , sessionLogger :: !(LogFun) 140 , sessionLogger :: !(LogFun)
98 , sessionEvents :: !(SendPort SessionEvent) 141 , sessionEvents :: !(SendPort SessionEvent)
99 142
100------------------------------------------------------------------------ 143 , sessionState :: !(MVar SessionState)
101 , metadata :: !(MVar Metadata.Status)
102 , infodict :: !(MVar (Cached InfoDict))
103
104 , status :: !(MVar SessionStatus)
105 , sessionStorage :: !(MVar Storage)
106 144
107------------------------------------------------------------------------ 145------------------------------------------------------------------------
108 , connectionsPrefs :: !ConnectionPrefs 146 , connectionsPrefs :: !ConnectionPrefs
@@ -124,44 +162,26 @@ data Session = Session
124 , connectionsBroadcast :: !(Chan Message) 162 , connectionsBroadcast :: !(Chan Message)
125 } 163 }
126 164
127{-----------------------------------------------------------------------
128-- Session construction
129-----------------------------------------------------------------------}
130
131newSession :: LogFun 165newSession :: LogFun
132 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; 166 -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer;
133 -> FilePath -- ^ root directory for content files; 167 -> FilePath -- ^ root directory for content files;
134 -> InfoDict -- ^ torrent info dictionary; 168 -> Either InfoHash InfoDict -- ^ torrent info dictionary;
135 -> IO Session -- ^ 169 -> IO Session
136newSession logFun addr rootPath dict = do 170newSession logFun addr rootPath source = do
171 let ih = either id idInfoHash source
137 pid <- maybe genPeerId return (peerId addr) 172 pid <- maybe genPeerId return (peerId addr)
138 let ih = idInfoHash dict
139 eventStream <- newSendPort 173 eventStream <- newSendPort
140 174 sState <- newSessionState rootPath source
141 storage <- openInfoDict ReadWriteEx rootPath dict 175 sStateVar <- newMVar sState
142 storageVar <- newMVar storage
143
144 statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage))
145 (piPieceLength (idPieceInfo dict))
146
147 metadataVar <- newMVar (error "sessionMetadata")
148 infodictVar <- newMVar (cache dict)
149
150 pSetVar <- newTVarIO S.empty 176 pSetVar <- newTVarIO S.empty
151 eSetVar <- newTVarIO M.empty 177 eSetVar <- newTVarIO M.empty
152 chan <- newChan 178 chan <- newChan
153
154 return Session 179 return Session
155 { sessionPeerId = pid 180 { sessionPeerId = pid
156 , sessionTopic = ih 181 , sessionTopic = ih
157 , sessionLogger = logFun 182 , sessionLogger = logFun
158 , sessionEvents = eventStream 183 , sessionEvents = eventStream
159 184 , sessionState = sStateVar
160 , metadata = metadataVar
161 , infodict = infodictVar
162 , status = statusVar
163 , sessionStorage = storageVar
164
165 , connectionsPrefs = def 185 , connectionsPrefs = def
166 , connectionsPending = pSetVar 186 , connectionsPending = pSetVar
167 , connectionsEstablished = eSetVar 187 , connectionsEstablished = eSetVar
@@ -171,8 +191,8 @@ newSession logFun addr rootPath dict = do
171 191
172closeSession :: Session -> IO () 192closeSession :: Session -> IO ()
173closeSession Session {..} = do 193closeSession Session {..} = do
174 mstorage <- tryReadMVar sessionStorage 194 s <- readMVar sessionState
175 maybe (return ()) close mstorage 195 closeSessionState s
176{- 196{-
177 hSet <- atomically $ do 197 hSet <- atomically $ do
178 pSet <- swapTVar connectionsPending S.empty 198 pSet <- swapTVar connectionsPending S.empty
@@ -341,22 +361,6 @@ tryReadMVar m = do
341 maybe (return ()) (putMVar m) ma 361 maybe (return ()) (putMVar m) ma
342 return ma 362 return ma
343 363
344withStatusUpdates :: StatusUpdates a -> Wire Session a
345withStatusUpdates m = do
346 Session {..} <- asks connSession
347 liftIO $ runStatusUpdates status m
348
349withMetadataUpdates :: Updates a -> Connected Session a
350withMetadataUpdates m = do
351 Session {..} <- asks connSession
352 addr <- asks connRemoteAddr
353 liftIO $ runUpdates metadata addr m
354
355getThisBitfield :: Wire Session Bitfield
356getThisBitfield = do
357 ses <- asks connSession
358 liftIO $ SS.getBitfield (status ses)
359
360readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) 364readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString)
361readBlock bix @ BlockIx {..} s = do 365readBlock bix @ BlockIx {..} s = do
362 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s 366 p <- packException (InvalidRequest bix) $ do readPiece ixPiece s
@@ -371,10 +375,10 @@ tryReadMetadataBlock :: PieceIx
371 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) 375 -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int))
372tryReadMetadataBlock pix = do 376tryReadMetadataBlock pix = do
373 Session {..} <- asks connSession 377 Session {..} <- asks connSession
374 mcached <- liftIO (tryReadMVar infodict) 378 s <- liftIO (readMVar sessionState)
375 case mcached of 379 case s of
376 Nothing -> error "tryReadMetadataBlock" 380 WaitingMetadata {..} -> error "tryReadMetadataBlock"
377 Just (Cached {..}) -> error "tryReadMetadataBlock" 381 HavingMetadata {..} -> error "tryReadMetadataBlock"
378 382
379sendBroadcast :: PeerMessage msg => msg -> Wire Session () 383sendBroadcast :: PeerMessage msg => msg -> Wire Session ()
380sendBroadcast msg = do 384sendBroadcast msg = do
@@ -383,10 +387,18 @@ sendBroadcast msg = do
383-- liftIO $ msg `broadcast` sessionConnections 387-- liftIO $ msg `broadcast` sessionConnections
384 388
385waitMetadata :: Session -> IO InfoDict 389waitMetadata :: Session -> IO InfoDict
386waitMetadata Session {..} = cachedValue <$> readMVar infodict 390waitMetadata Session {..} = do
391 s <- readMVar sessionState
392 case s of
393 WaitingMetadata {..} -> readMVar metadataCompleted
394 HavingMetadata {..} -> return (cachedValue metadataCache)
387 395
388takeMetadata :: Session -> IO (Maybe InfoDict) 396takeMetadata :: Session -> IO (Maybe InfoDict)
389takeMetadata Session {..} = fmap cachedValue <$> tryReadMVar infodict 397takeMetadata Session {..} = do
398 s <- readMVar sessionState
399 case s of
400 WaitingMetadata {..} -> return Nothing
401 HavingMetadata {..} -> return (Just (cachedValue metadataCache))
390 402
391{----------------------------------------------------------------------- 403{-----------------------------------------------------------------------
392-- Triggers 404-- Triggers
@@ -405,12 +417,13 @@ interesting = do
405fillRequestQueue :: Trigger 417fillRequestQueue :: Trigger
406fillRequestQueue = do 418fillRequestQueue = do
407 maxN <- lift getMaxQueueLength 419 maxN <- lift getMaxQueueLength
408 rbf <- use connBitfield 420 rbf <- use connBitfield
409 addr <- asks connRemoteAddr 421 addr <- asks connRemoteAddr
410 blks <- withStatusUpdates $ do 422-- blks <- withStatusUpdates $ do
411 n <- getRequestQueueLength addr 423-- n <- getRequestQueueLength addr
412 scheduleBlocks addr rbf (maxN - n) 424-- scheduleBlocks addr rbf (maxN - n)
413 mapM_ (sendMessage . Request) blks 425-- mapM_ (sendMessage . Request) blks
426 return ()
414 427
415tryFillRequestQueue :: Trigger 428tryFillRequestQueue :: Trigger
416tryFillRequestQueue = do 429tryFillRequestQueue = do
@@ -431,7 +444,8 @@ handleStatus s = do
431 Interested _ -> return () 444 Interested _ -> return ()
432 Choking True -> do 445 Choking True -> do
433 addr <- asks connRemoteAddr 446 addr <- asks connRemoteAddr
434 withStatusUpdates (SS.resetPending addr) 447-- withStatusUpdates (SS.resetPending addr)
448 return ()
435 Choking False -> tryFillRequestQueue 449 Choking False -> tryFillRequestQueue
436 450
437handleAvailable :: Handler Available 451handleAvailable :: Handler Available
@@ -440,7 +454,8 @@ handleAvailable msg = do
440 Have ix -> BF.insert ix 454 Have ix -> BF.insert ix
441 Bitfield bf -> const bf 455 Bitfield bf -> const bf
442 456
443 thisBf <- getThisBitfield 457 --thisBf <- getThisBitfield
458 thisBf <- undefined
444 case msg of 459 case msg of
445 Have ix 460 Have ix
446 | ix `BF.member` thisBf -> return () 461 | ix `BF.member` thisBf -> return ()
@@ -452,23 +467,23 @@ handleAvailable msg = do
452handleTransfer :: Handler Transfer 467handleTransfer :: Handler Transfer
453handleTransfer (Request bix) = do 468handleTransfer (Request bix) = do
454 Session {..} <- asks connSession 469 Session {..} <- asks connSession
455 bitfield <- getThisBitfield 470 s <- liftIO $ readMVar sessionState
456 upload <- canUpload <$> use connStatus 471 case s of
457 when (upload && ixPiece bix `BF.member` bitfield) $ do 472 WaitingMetadata {..} -> return ()
458 mstorage <- liftIO $ tryReadMVar sessionStorage 473 HavingMetadata {..} -> do
459 case mstorage of 474 bitfield <- undefined -- getThisBitfield
460 Nothing -> return () 475 upload <- canUpload <$> use connStatus
461 Just storage -> do 476 when (upload && ixPiece bix `BF.member` bitfield) $ do
462 blk <- liftIO $ readBlock bix storage 477 blk <- liftIO $ readBlock bix contentStorage
463 sendMessage (Message.Piece blk) 478 sendMessage (Message.Piece blk)
464 479
465handleTransfer (Message.Piece blk) = do 480handleTransfer (Message.Piece blk) = do
466 Session {..} <- asks connSession 481 Session {..} <- asks connSession
467 mstorage <- liftIO $ tryReadMVar sessionStorage 482 s <- liftIO $ readMVar sessionState
468 case mstorage of 483 case s of
469 Nothing -> return () -- TODO (?) break connection 484 WaitingMetadata {..} -> return () -- TODO (?) break connection
470 Just storage -> do 485 HavingMetadata {..} -> do
471 isSuccess <- withStatusUpdates (SS.pushBlock blk storage) 486 isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage)
472 case isSuccess of 487 case isSuccess of
473 Nothing -> liftIO $ throwIO $ userError "block is not requested" 488 Nothing -> liftIO $ throwIO $ userError "block is not requested"
474 Just isCompleted -> do 489 Just isCompleted -> do
@@ -490,25 +505,20 @@ handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix))
490waitForMetadata :: Trigger 505waitForMetadata :: Trigger
491waitForMetadata = do 506waitForMetadata = do
492 Session {..} <- asks connSession 507 Session {..} <- asks connSession
493 needFetch <- liftIO (isEmptyMVar infodict) 508 needFetch <- undefined --liftIO (isEmptyMVar infodict)
494 when needFetch $ do 509 when needFetch $ do
495 canFetch <- allowed ExtMetadata <$> use connExtCaps 510 canFetch <- allowed ExtMetadata <$> use connExtCaps
496 if canFetch 511 if canFetch
497 then tryRequestMetadataBlock 512 then tryRequestMetadataBlock
498 else liftIO (waitMVar infodict) 513 else undefined -- liftIO (waitMVar infodict)
499 514
500tryRequestMetadataBlock :: Trigger 515tryRequestMetadataBlock :: Trigger
501tryRequestMetadataBlock = do 516tryRequestMetadataBlock = do
502 mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock 517 mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock
503 case mpix of 518 case mpix of
504 Nothing -> error "tryRequestMetadataBlock" 519 Nothing -> error "tryRequestMetadataBlock"
505 Just pix -> sendMessage (MetadataRequest pix) 520 Just pix -> sendMessage (MetadataRequest pix)
506 521
507metadataCompleted :: InfoDict -> Trigger
508metadataCompleted dict = do
509 Session {..} <- asks connSession
510 liftIO $ putMVar infodict (cache dict)
511
512handleMetadata :: Handler ExtendedMetadata 522handleMetadata :: Handler ExtendedMetadata
513handleMetadata (MetadataRequest pix) = 523handleMetadata (MetadataRequest pix) =
514 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse 524 lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse
@@ -518,13 +528,15 @@ handleMetadata (MetadataRequest pix) =
518 528
519handleMetadata (MetadataData {..}) = do 529handleMetadata (MetadataData {..}) = do
520 ih <- asks connTopic 530 ih <- asks connTopic
521 mdict <- lift $ withMetadataUpdates (Metadata.pushBlock piece ih) 531 mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih)
522 case mdict of 532 case mdict of
523 Nothing -> tryRequestMetadataBlock -- not completed, need all blocks 533 Nothing -> tryRequestMetadataBlock -- not completed, need all blocks
524 Just dict -> metadataCompleted dict -- complete, wake up payload fetch 534 Just dict -> do -- complete, wake up payload fetch
535 Session {..} <- asks connSession
536 liftIO $ modifyMVar_ sessionState (haveMetadata dict)
525 537
526handleMetadata (MetadataReject pix) = do 538handleMetadata (MetadataReject pix) = do
527 lift $ withMetadataUpdates (Metadata.cancelPending pix) 539 lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix)
528 540
529handleMetadata (MetadataUnknown _ ) = do 541handleMetadata (MetadataUnknown _ ) = do
530 logInfoN "Unknown metadata message" 542 logInfoN "Unknown metadata message"
@@ -553,7 +565,7 @@ handleMessage (Extended msg) = handleExtended msg
553exchange :: Wire Session () 565exchange :: Wire Session ()
554exchange = do 566exchange = do
555 waitForMetadata 567 waitForMetadata
556 bf <- getThisBitfield 568 bf <- undefined --getThisBitfield
557 sendMessage (Bitfield bf) 569 sendMessage (Bitfield bf)
558 awaitForever handleMessage 570 awaitForever handleMessage
559 571