From 937342955301e9820a9bcbafcf8922cc5dd1798d Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 3 Apr 2014 01:35:32 +0400 Subject: [Exchange] Storage is not available during metadata exchange --- src/Network/BitTorrent/Exchange/Session.hs | 53 +++++++++++++++++++----------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 4b9b0fdb..57d21579 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -97,12 +97,14 @@ data Session = Session , sessionLogger :: !(LogFun) , sessionEvents :: !(SendPort SessionEvent) +------------------------------------------------------------------------ , metadata :: !(MVar Metadata.Status) , infodict :: !(MVar (Cached InfoDict)) , status :: !(MVar SessionStatus) - , storage :: !(Storage) + , sessionStorage :: !(MVar Storage) +------------------------------------------------------------------------ , connectionsPrefs :: !ConnectionPrefs -- | Connections either waiting for TCP/uTP 'connect' or waiting @@ -133,28 +135,32 @@ newSession :: LogFun -> IO Session -- ^ newSession logFun addr rootPath dict = do pid <- maybe genPeerId return (peerId addr) - store <- openInfoDict ReadWriteEx rootPath dict - statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) + let ih = idInfoHash dict + eventStream <- newSendPort + + storage <- openInfoDict ReadWriteEx rootPath dict + storageVar <- newMVar storage + + statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces storage)) (piPieceLength (idPieceInfo dict)) + metadataVar <- newMVar (error "sessionMetadata") infodictVar <- newMVar (cache dict) pSetVar <- newTVarIO S.empty eSetVar <- newTVarIO M.empty chan <- newChan - eventStream <- newSendPort return Session { sessionPeerId = pid - , sessionTopic = idInfoHash dict + , sessionTopic = ih , sessionLogger = logFun , sessionEvents = eventStream , metadata = metadataVar , infodict = infodictVar - , status = statusVar - , storage = store + , sessionStorage = storageVar , connectionsPrefs = def , connectionsPending = pSetVar @@ -165,7 +171,8 @@ newSession logFun addr rootPath dict = do closeSession :: Session -> IO () closeSession Session {..} = do - close storage + mstorage <- tryReadMVar sessionStorage + maybe (return ()) close mstorage {- hSet <- atomically $ do pSet <- swapTVar connectionsPending S.empty @@ -448,19 +455,27 @@ handleTransfer (Request bix) = do bitfield <- getThisBitfield upload <- canUpload <$> use connStatus when (upload && ixPiece bix `BF.member` bitfield) $ do - blk <- liftIO $ readBlock bix storage - sendMessage (Message.Piece blk) + mstorage <- liftIO $ tryReadMVar sessionStorage + case mstorage of + Nothing -> return () + Just storage -> do + blk <- liftIO $ readBlock bix storage + sendMessage (Message.Piece blk) handleTransfer (Message.Piece blk) = do Session {..} <- asks connSession - isSuccess <- withStatusUpdates (SS.pushBlock blk storage) - case isSuccess of - Nothing -> liftIO $ throwIO $ userError "block is not requested" - Just isCompleted -> do - when isCompleted $ do - sendBroadcast (Have (blkPiece blk)) --- maybe send not interested - tryFillRequestQueue + mstorage <- liftIO $ tryReadMVar sessionStorage + case mstorage of + Nothing -> return () -- TODO (?) break connection + Just storage -> do + isSuccess <- withStatusUpdates (SS.pushBlock blk storage) + case isSuccess of + Nothing -> liftIO $ throwIO $ userError "block is not requested" + Just isCompleted -> do + when isCompleted $ do + sendBroadcast (Have (blkPiece blk)) +-- maybe send not interested + tryFillRequestQueue handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) where @@ -546,7 +561,7 @@ mainWire :: Wire Session () mainWire = do lift establishedConnection Session {..} <- asks connSession - lift $ resizeBitfield (totalPieces storage) +-- lift $ resizeBitfield (totalPieces storage) logEvent "Connection established" iterM logMessage =$= exchange =$= iterM logMessage lift finishedConnection -- cgit v1.2.3