From de1fee157b87b62161ac68f32bd6e72ba9a11275 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Fri, 4 Apr 2014 01:03:55 +0400 Subject: [Exchange] newSession now can take infohash only --- src/Network/BitTorrent/Client/Handle.hs | 10 +- src/Network/BitTorrent/Exchange/Session.hs | 192 +++++++++++---------- .../BitTorrent/Exchange/Session/Metadata.hs | 4 + 3 files changed, 111 insertions(+), 95 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/Client/Handle.hs b/src/Network/BitTorrent/Client/Handle.hs index 9601e691..5bcb2d33 100644 --- a/src/Network/BitTorrent/Client/Handle.hs +++ b/src/Network/BitTorrent/Client/Handle.hs @@ -71,10 +71,10 @@ lookupHandle ih = do -- Initialization -----------------------------------------------------------------------} -newExchangeSession :: FilePath -> InfoDict -> BitTorrent Exchange.Session -newExchangeSession rootPath dict = do +newExchangeSession :: FilePath -> Either InfoHash InfoDict -> BitTorrent Exchange.Session +newExchangeSession rootPath source = do c @ Client {..} <- getClient - liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath dict + liftIO $ Exchange.newSession clientLogger (externalAddr c) rootPath source -- | Open a torrent in 'stop'ed state. Use 'nullTorrent' to open -- handle from 'InfoDict'. This operation do not block. @@ -83,7 +83,7 @@ openTorrent rootPath t @ Torrent {..} = do let ih = idInfoHash tInfoDict allocHandle ih $ do tses <- liftIO $ Tracker.newSession ih (trackerList t) - eses <- newExchangeSession rootPath tInfoDict + eses <- newExchangeSession rootPath (Right tInfoDict) return $ Handle { handleTopic = ih , handlePrivate = idPrivate tInfoDict @@ -96,7 +96,7 @@ openMagnet :: FilePath -> Magnet -> BitTorrent Handle openMagnet rootPath uri @ Magnet {..} = do allocHandle exactTopic $ do tses <- liftIO $ Tracker.newSession exactTopic def - eses <- newExchangeSession rootPath (error "openMagnet" exactTopic) + eses <- newExchangeSession rootPath (Left exactTopic) return $ Handle { handleTopic = exactTopic , handlePrivate = False diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 57d21579..b6d7f810 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -76,7 +76,7 @@ packException :: Exception e => (e -> ExchangeError) -> IO a -> IO a packException f m = try m >>= either (throwIO . f) return {----------------------------------------------------------------------- --- Session +-- Session state -----------------------------------------------------------------------} -- TODO unmap storage on zero connections @@ -91,18 +91,56 @@ cache s = Cached s (BE.encode s) -- | Logger function. type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () +--data SessionStatus = Seeder | Leecher + +data SessionState + = WaitingMetadata + { metadataDownload :: MVar Metadata.Status + , metadataCompleted :: MVar InfoDict -- ^ used to unblock waiters + , contentRootPath :: FilePath + } + | HavingMetadata + { metadataCache :: Cached InfoDict + , contentDownload :: MVar SessionStatus + , contentStorage :: Storage + } + +newSessionState :: FilePath -> Either InfoHash InfoDict -> IO SessionState +newSessionState rootPath (Left ih ) = do + WaitingMetadata <$> newMVar def <*> newEmptyMVar <*> pure rootPath +newSessionState rootPath (Right dict) = do + storage <- openInfoDict ReadWriteEx rootPath dict + download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + return $ HavingMetadata (cache dict) download storage + +closeSessionState :: SessionState -> IO () +closeSessionState WaitingMetadata {..} = return () +closeSessionState HavingMetadata {..} = close contentStorage + +haveMetadata :: InfoDict -> SessionState -> IO SessionState +haveMetadata dict WaitingMetadata {..} = do + storage <- openInfoDict ReadWriteEx contentRootPath dict + download <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) + (piPieceLength (idPieceInfo dict)) + return HavingMetadata + { metadataCache = cache dict + , contentDownload = download + , contentStorage = storage + } +haveMetadata _ s = return s + +{----------------------------------------------------------------------- +-- Session +-----------------------------------------------------------------------} + data Session = Session { sessionPeerId :: !(PeerId) , sessionTopic :: !(InfoHash) , sessionLogger :: !(LogFun) , sessionEvents :: !(SendPort SessionEvent) ------------------------------------------------------------------------- - , metadata :: !(MVar Metadata.Status) - , infodict :: !(MVar (Cached InfoDict)) - - , status :: !(MVar SessionStatus) - , sessionStorage :: !(MVar Storage) + , sessionState :: !(MVar SessionState) ------------------------------------------------------------------------ , connectionsPrefs :: !ConnectionPrefs @@ -124,44 +162,26 @@ data Session = Session , connectionsBroadcast :: !(Chan Message) } -{----------------------------------------------------------------------- --- Session construction ------------------------------------------------------------------------} - newSession :: LogFun - -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; - -> FilePath -- ^ root directory for content files; - -> InfoDict -- ^ torrent info dictionary; - -> IO Session -- ^ -newSession logFun addr rootPath dict = do + -> PeerAddr (Maybe IP) -- ^ /external/ address of this peer; + -> FilePath -- ^ root directory for content files; + -> Either InfoHash InfoDict -- ^ torrent info dictionary; + -> IO Session +newSession logFun addr rootPath source = do + let ih = either id idInfoHash source pid <- maybe genPeerId return (peerId addr) - let ih = idInfoHash dict eventStream <- newSendPort - - storage <- openInfoDict ReadWriteEx rootPath dict - storageVar <- newMVar storage - - statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) - (piPieceLength (idPieceInfo dict)) - - metadataVar <- newMVar (error "sessionMetadata") - infodictVar <- newMVar (cache dict) - + sState <- newSessionState rootPath source + sStateVar <- newMVar sState pSetVar <- newTVarIO S.empty eSetVar <- newTVarIO M.empty chan <- newChan - return Session { sessionPeerId = pid , sessionTopic = ih , sessionLogger = logFun , sessionEvents = eventStream - - , metadata = metadataVar - , infodict = infodictVar - , status = statusVar - , sessionStorage = storageVar - + , sessionState = sStateVar , connectionsPrefs = def , connectionsPending = pSetVar , connectionsEstablished = eSetVar @@ -171,8 +191,8 @@ newSession logFun addr rootPath dict = do closeSession :: Session -> IO () closeSession Session {..} = do - mstorage <- tryReadMVar sessionStorage - maybe (return ()) close mstorage + s <- readMVar sessionState + closeSessionState s {- hSet <- atomically $ do pSet <- swapTVar connectionsPending S.empty @@ -341,22 +361,6 @@ tryReadMVar m = do maybe (return ()) (putMVar m) ma return ma -withStatusUpdates :: StatusUpdates a -> Wire Session a -withStatusUpdates m = do - Session {..} <- asks connSession - liftIO $ runStatusUpdates status m - -withMetadataUpdates :: Updates a -> Connected Session a -withMetadataUpdates m = do - Session {..} <- asks connSession - addr <- asks connRemoteAddr - liftIO $ runUpdates metadata addr m - -getThisBitfield :: Wire Session Bitfield -getThisBitfield = do - ses <- asks connSession - liftIO $ SS.getBitfield (status ses) - readBlock :: BlockIx -> Storage -> IO (Block BL.ByteString) readBlock bix @ BlockIx {..} s = do p <- packException (InvalidRequest bix) $ do readPiece ixPiece s @@ -371,10 +375,10 @@ tryReadMetadataBlock :: PieceIx -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) tryReadMetadataBlock pix = do Session {..} <- asks connSession - mcached <- liftIO (tryReadMVar infodict) - case mcached of - Nothing -> error "tryReadMetadataBlock" - Just (Cached {..}) -> error "tryReadMetadataBlock" + s <- liftIO (readMVar sessionState) + case s of + WaitingMetadata {..} -> error "tryReadMetadataBlock" + HavingMetadata {..} -> error "tryReadMetadataBlock" sendBroadcast :: PeerMessage msg => msg -> Wire Session () sendBroadcast msg = do @@ -383,10 +387,18 @@ sendBroadcast msg = do -- liftIO $ msg `broadcast` sessionConnections waitMetadata :: Session -> IO InfoDict -waitMetadata Session {..} = cachedValue <$> readMVar infodict +waitMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> readMVar metadataCompleted + HavingMetadata {..} -> return (cachedValue metadataCache) takeMetadata :: Session -> IO (Maybe InfoDict) -takeMetadata Session {..} = fmap cachedValue <$> tryReadMVar infodict +takeMetadata Session {..} = do + s <- readMVar sessionState + case s of + WaitingMetadata {..} -> return Nothing + HavingMetadata {..} -> return (Just (cachedValue metadataCache)) {----------------------------------------------------------------------- -- Triggers @@ -405,12 +417,13 @@ interesting = do fillRequestQueue :: Trigger fillRequestQueue = do maxN <- lift getMaxQueueLength - rbf <- use connBitfield + rbf <- use connBitfield addr <- asks connRemoteAddr - blks <- withStatusUpdates $ do - n <- getRequestQueueLength addr - scheduleBlocks addr rbf (maxN - n) - mapM_ (sendMessage . Request) blks +-- blks <- withStatusUpdates $ do +-- n <- getRequestQueueLength addr +-- scheduleBlocks addr rbf (maxN - n) +-- mapM_ (sendMessage . Request) blks + return () tryFillRequestQueue :: Trigger tryFillRequestQueue = do @@ -431,7 +444,8 @@ handleStatus s = do Interested _ -> return () Choking True -> do addr <- asks connRemoteAddr - withStatusUpdates (SS.resetPending addr) +-- withStatusUpdates (SS.resetPending addr) + return () Choking False -> tryFillRequestQueue handleAvailable :: Handler Available @@ -440,7 +454,8 @@ handleAvailable msg = do Have ix -> BF.insert ix Bitfield bf -> const bf - thisBf <- getThisBitfield + --thisBf <- getThisBitfield + thisBf <- undefined case msg of Have ix | ix `BF.member` thisBf -> return () @@ -452,23 +467,23 @@ handleAvailable msg = do handleTransfer :: Handler Transfer handleTransfer (Request bix) = do Session {..} <- asks connSession - bitfield <- getThisBitfield - upload <- canUpload <$> use connStatus - when (upload && ixPiece bix `BF.member` bitfield) $ do - mstorage <- liftIO $ tryReadMVar sessionStorage - case mstorage of - Nothing -> return () - Just storage -> do - blk <- liftIO $ readBlock bix storage + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () + HavingMetadata {..} -> do + bitfield <- undefined -- getThisBitfield + upload <- canUpload <$> use connStatus + when (upload && ixPiece bix `BF.member` bitfield) $ do + blk <- liftIO $ readBlock bix contentStorage sendMessage (Message.Piece blk) handleTransfer (Message.Piece blk) = do Session {..} <- asks connSession - mstorage <- liftIO $ tryReadMVar sessionStorage - case mstorage of - Nothing -> return () -- TODO (?) break connection - Just storage -> do - isSuccess <- withStatusUpdates (SS.pushBlock blk storage) + s <- liftIO $ readMVar sessionState + case s of + WaitingMetadata {..} -> return () -- TODO (?) break connection + HavingMetadata {..} -> do + isSuccess <- undefined -- withStatusUpdates (SS.pushBlock blk storage) case isSuccess of Nothing -> liftIO $ throwIO $ userError "block is not requested" Just isCompleted -> do @@ -490,25 +505,20 @@ handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) waitForMetadata :: Trigger waitForMetadata = do Session {..} <- asks connSession - needFetch <- liftIO (isEmptyMVar infodict) + needFetch <- undefined --liftIO (isEmptyMVar infodict) when needFetch $ do canFetch <- allowed ExtMetadata <$> use connExtCaps if canFetch then tryRequestMetadataBlock - else liftIO (waitMVar infodict) + else undefined -- liftIO (waitMVar infodict) tryRequestMetadataBlock :: Trigger tryRequestMetadataBlock = do - mpix <- lift $ withMetadataUpdates Metadata.scheduleBlock + mpix <- lift $ undefined --withMetadataUpdates Metadata.scheduleBlock case mpix of Nothing -> error "tryRequestMetadataBlock" Just pix -> sendMessage (MetadataRequest pix) -metadataCompleted :: InfoDict -> Trigger -metadataCompleted dict = do - Session {..} <- asks connSession - liftIO $ putMVar infodict (cache dict) - handleMetadata :: Handler ExtendedMetadata handleMetadata (MetadataRequest pix) = lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse @@ -518,13 +528,15 @@ handleMetadata (MetadataRequest pix) = handleMetadata (MetadataData {..}) = do ih <- asks connTopic - mdict <- lift $ withMetadataUpdates (Metadata.pushBlock piece ih) + mdict <- lift $ undefined --withMetadataUpdates (Metadata.pushBlock piece ih) case mdict of Nothing -> tryRequestMetadataBlock -- not completed, need all blocks - Just dict -> metadataCompleted dict -- complete, wake up payload fetch + Just dict -> do -- complete, wake up payload fetch + Session {..} <- asks connSession + liftIO $ modifyMVar_ sessionState (haveMetadata dict) handleMetadata (MetadataReject pix) = do - lift $ withMetadataUpdates (Metadata.cancelPending pix) + lift $ undefined -- withMetadataUpdates (Metadata.cancelPending pix) handleMetadata (MetadataUnknown _ ) = do logInfoN "Unknown metadata message" @@ -553,7 +565,7 @@ handleMessage (Extended msg) = handleExtended msg exchange :: Wire Session () exchange = do waitForMetadata - bf <- getThisBitfield + bf <- undefined --getThisBitfield sendMessage (Bitfield bf) awaitForever handleMessage diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs index 79fd03b0..79156e2e 100644 --- a/src/Network/BitTorrent/Exchange/Session/Metadata.hs +++ b/src/Network/BitTorrent/Exchange/Session/Metadata.hs @@ -21,6 +21,7 @@ import Control.Monad.Reader import Control.Monad.State import Data.ByteString as BS import Data.ByteString.Lazy as BL +import Data.Default import Data.List as L import Data.Tuple @@ -41,6 +42,9 @@ data Status = Status makeLenses ''Status +instance Default Status where + def = error "default status" + -- | Create a new scheduler for infodict of the given size. nullStatus :: Int -> Status nullStatus ps = Status [] (Block.empty ps) -- cgit v1.2.3