From 0fa6a0ee5eb1fbf648d3864626430efcbdb4aaae Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sat, 15 Feb 2014 04:18:05 +0400 Subject: Move metadata exchange from Wire to Session --- src/Network/BitTorrent/Exchange/Session.hs | 154 +++++++++++++++++---- .../BitTorrent/Exchange/Session/Metadata.hs | 93 +++++++++++++ src/Network/BitTorrent/Exchange/Wire.hs | 54 +------- 3 files changed, 218 insertions(+), 83 deletions(-) create mode 100644 src/Network/BitTorrent/Exchange/Session/Metadata.hs (limited to 'src') diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 1e72ba96..f10f601e 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -12,7 +12,7 @@ module Network.BitTorrent.Exchange.Session import Control.Applicative import Control.Concurrent -import Control.Exception +import Control.Exception hiding (Handler) import Control.Lens import Control.Monad.Logger import Control.Monad.Reader @@ -20,6 +20,7 @@ import Control.Monad.State import Data.ByteString as BS import Data.ByteString.Lazy as BL import Data.Conduit +import Data.Conduit.List as CL (iterM) import Data.Function import Data.IORef import Data.List as L @@ -34,16 +35,18 @@ import Text.PrettyPrint hiding ((<>)) import Text.PrettyPrint.Class import System.Log.FastLogger (LogStr, ToLogStr (..)) +import Data.BEncode as BE import Data.Torrent (InfoDict (..)) import Data.Torrent.Bitfield as BF import Data.Torrent.InfoHash -import Data.Torrent.Piece (pieceData, piPieceLength) +import Data.Torrent.Piece import qualified Data.Torrent.Piece as Torrent (Piece (Piece)) import Network.BitTorrent.Core import Network.BitTorrent.Exchange.Assembler -import Network.BitTorrent.Exchange.Block as Block -import Network.BitTorrent.Exchange.Message -import Network.BitTorrent.Exchange.Session.Status as SS +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Message as Message +import Network.BitTorrent.Exchange.Session.Metadata as Metadata +import Network.BitTorrent.Exchange.Session.Status as SS import Network.BitTorrent.Exchange.Status import Network.BitTorrent.Exchange.Wire import System.Torrent.Storage @@ -66,6 +69,14 @@ packException f m = try m >>= either (throwIO . f) return -- Session -----------------------------------------------------------------------} +data Cached a = Cached + { cachedValue :: !a + , cachedData :: BL.ByteString -- keep lazy + } + +cache :: BEncode a => a -> Cached a +cache s = Cached s (BE.encode s) + data ConnectionEntry = ConnectionEntry { initiatedBy :: !ChannelSide , connection :: !(Connection Session) @@ -74,12 +85,14 @@ data ConnectionEntry = ConnectionEntry data Session = Session { tpeerId :: PeerId , infohash :: InfoHash + , metadata :: MVar Metadata.Status , storage :: Storage , status :: MVar SessionStatus , unchoked :: [PeerAddr IP] , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) , broadcast :: Chan Message , logger :: LogFun + , infodict :: MVar (Cached InfoDict) } -- | Logger function. @@ -108,7 +121,13 @@ newSession logFun addr rootPath dict = do } closeSession :: Session -> IO () -closeSession = undefined +closeSession ses = do + deleteAll ses + undefined + +{----------------------------------------------------------------------- +-- Logging +-----------------------------------------------------------------------} instance MonadLogger (Connected Session) where monadLoggerLog loc src lvl msg = do @@ -118,11 +137,11 @@ instance MonadLogger (Connected Session) where let addrSrc = src <> " @ " <> T.pack (render (pretty addr)) liftIO $ logger ses loc addrSrc lvl (toLogStr msg) -logMessage :: Message -> Wire Session () +logMessage :: MonadLogger m => Message -> m () logMessage msg = logDebugN $ T.pack (render (pretty msg)) -logEvent :: Text -> Wire Session () -logEvent = logInfoN +logEvent :: MonadLogger m => Text -> m () +logEvent = logInfoN {----------------------------------------------------------------------- -- Connections @@ -132,11 +151,12 @@ logEvent = logInfoN insert :: PeerAddr IP -> {- Maybe Socket -> -} Session -> IO () -insert addr ses @ Session {..} = do - forkIO $ do - action `finally` runStatusUpdates status (resetPending addr) - return () +insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) where + cleanup = do + runStatusUpdates status (SS.resetPending addr) + -- TODO Metata.resetPending addr + action = do let caps = def let ecaps = def @@ -147,7 +167,7 @@ insert addr ses @ Session {..} = do -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn lift $ resizeBitfield (totalPieces storage) logEvent "Connection established" - exchange + iterM logMessage =$= exchange =$= iterM logMessage -- liftIO $ modifyMVar_ connections $ pure . M.delete addr delete :: PeerAddr IP -> Session -> IO () @@ -160,11 +180,26 @@ deleteAll = undefined -- Helpers -----------------------------------------------------------------------} +waitMVar :: MVar a -> IO () +waitMVar m = withMVar m (const (return ())) + +-- This function appear in new GHC "out of box". (moreover it is atomic) +tryReadMVar :: MVar a -> IO (Maybe a) +tryReadMVar m = do + ma <- tryTakeMVar m + maybe (return ()) (putMVar m) ma + return ma + withStatusUpdates :: StatusUpdates a -> Wire Session a withStatusUpdates m = do Session {..} <- asks connSession liftIO $ runStatusUpdates status m +withMetadataUpdates :: Updates a -> Connected Session a +withMetadataUpdates m = do + Session {..} <- asks connSession + liftIO $ runUpdates metadata m + getThisBitfield :: Wire Session Bitfield getThisBitfield = do ses <- asks connSession @@ -179,6 +214,16 @@ readBlock bix @ BlockIx {..} s = do then return $ Block ixPiece ixOffset chunk else throwIO $ InvalidRequest bix (InvalidSize ixLength) +-- | +tryReadMetadataBlock :: PieceIx + -> Connected Session (Maybe (Torrent.Piece BS.ByteString, Int)) +tryReadMetadataBlock pix = do + Session {..} <- asks connSession + mcached <- liftIO (tryReadMVar infodict) + case mcached of + Nothing -> undefined + Just (Cached {..}) -> undefined + sendBroadcast :: PeerMessage msg => msg -> Wire Session () sendBroadcast msg = do Session {..} <- asks connSession @@ -208,9 +253,7 @@ tryFillRequestQueue = do interesting :: Wire Session () interesting = do addr <- asks connRemoteAddr - logMessage (Status (Interested True)) sendMessage (Interested True) - logMessage (Status (Choking False)) sendMessage (Choking False) tryFillRequestQueue @@ -218,17 +261,19 @@ interesting = do -- Incoming message handling -----------------------------------------------------------------------} -handleStatus :: StatusUpdate -> Wire Session () +type Handler msg = msg -> Wire Session () + +handleStatus :: Handler StatusUpdate handleStatus s = do connStatus %= over remoteStatus (updateStatus s) case s of Interested _ -> return () Choking True -> do addr <- asks connRemoteAddr - withStatusUpdates (resetPending addr) + withStatusUpdates (SS.resetPending addr) Choking False -> tryFillRequestQueue -handleAvailable :: Available -> Wire Session () +handleAvailable :: Handler Available handleAvailable msg = do connBitfield %= case msg of Have ix -> BF.insert ix @@ -243,18 +288,18 @@ handleAvailable msg = do | bf `BF.isSubsetOf` thisBf -> return () | otherwise -> interesting -handleTransfer :: Transfer -> Wire Session () +handleTransfer :: Handler Transfer handleTransfer (Request bix) = do Session {..} <- asks connSession bitfield <- getThisBitfield upload <- canUpload <$> use connStatus when (upload && ixPiece bix `BF.member` bitfield) $ do blk <- liftIO $ readBlock bix storage - sendMessage (Piece blk) + sendMessage (Message.Piece blk) -handleTransfer (Piece blk) = do +handleTransfer (Message.Piece blk) = do Session {..} <- asks connSession - isSuccess <- withStatusUpdates (pushBlock blk storage) + isSuccess <- withStatusUpdates (SS.pushBlock blk storage) case isSuccess of Nothing -> liftIO $ throwIO $ userError "block is not requested" Just isCompleted -> do @@ -265,29 +310,78 @@ handleTransfer (Piece blk) = do handleTransfer (Cancel bix) = filterQueue (not . (transferResponse bix)) where - transferResponse bix (Transfer (Piece blk)) = blockIx blk == bix - transferResponse _ _ = False + transferResponse bix (Transfer (Message.Piece blk)) = blockIx blk == bix + transferResponse _ _ = False + +{----------------------------------------------------------------------- +-- Metadata exchange +-----------------------------------------------------------------------} +-- TODO introduce new metadata exchange specific exceptions + +tryRequestMetadataBlock :: Wire Session () +tryRequestMetadataBlock = do + addr <- asks connRemoteAddr + mpix <- lift $ withMetadataUpdates (Metadata.scheduleBlock addr) + case mpix of + Nothing -> undefined + Just pix -> sendMessage (MetadataRequest pix) + +handleMetadata :: Handler ExtendedMetadata +handleMetadata (MetadataRequest pix) = + lift (tryReadMetadataBlock pix) >>= sendMessage . mkResponse + where + mkResponse Nothing = MetadataReject pix + mkResponse (Just (piece, total)) = MetadataData piece total + +handleMetadata (MetadataData {..}) = do + addr <- asks connRemoteAddr + ih <- asks connTopic + lift $ withMetadataUpdates (Metadata.pushBlock addr piece ih) + tryRequestMetadataBlock + +handleMetadata (MetadataReject pix) = do + lift $ withMetadataUpdates (Metadata.cancelPending pix) + +handleMetadata (MetadataUnknown _ ) = do + logInfoN "Unknown metadata message" + +waitForMetadata :: Wire Session () +waitForMetadata = do + Session {..} <- asks connSession + needFetch <- liftIO (isEmptyMVar infodict) + when needFetch $ do + canFetch <- allowed ExtMetadata <$> use connExtCaps + if canFetch + then tryRequestMetadataBlock + else liftIO (waitMVar infodict) {----------------------------------------------------------------------- -- Event loop -----------------------------------------------------------------------} -handleMessage :: Message -> Wire Session () +acceptRehandshake :: ExtendedHandshake -> Wire s () +acceptRehandshake ehs = undefined + +handleExtended :: Handler ExtendedMessage +handleExtended (EHandshake ehs) = acceptRehandshake ehs +handleExtended (EMetadata _ msg) = handleMetadata msg +handleExtended (EUnknown _ _ ) = logWarnN "Unknown extension message" + +handleMessage :: Handler Message handleMessage KeepAlive = return () handleMessage (Status s) = handleStatus s handleMessage (Available msg) = handleAvailable msg handleMessage (Transfer msg) = handleTransfer msg handleMessage (Port n) = undefined handleMessage (Fast _) = undefined -handleMessage (Extended _) = undefined +handleMessage (Extended msg) = handleExtended msg exchange :: Wire Session () exchange = do + waitForMetadata bf <- getThisBitfield sendMessage (Bitfield bf) - awaitForever $ \ msg -> do - logMessage msg - handleMessage msg + awaitForever handleMessage data Event = NewMessage (PeerAddr IP) Message | Timeout -- for scheduling diff --git a/src/Network/BitTorrent/Exchange/Session/Metadata.hs b/src/Network/BitTorrent/Exchange/Session/Metadata.hs new file mode 100644 index 00000000..7e14f493 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Session/Metadata.hs @@ -0,0 +1,93 @@ +{-# LANGUAGE TemplateHaskell #-} +module Network.BitTorrent.Exchange.Session.Metadata + ( -- * Metadata transfer state + Status + , nullStatus + + -- * Metadata updates + , Updates + , runUpdates + + -- * Metadata piece control + , scheduleBlock + , resetPending + , cancelPending + , pushBlock + ) where + +import Control.Concurrent +import Control.Lens +import Control.Monad.State +import Data.ByteString as BS +import Data.ByteString.Lazy as BL +import Data.List as L + +import Data.BEncode as BE +import Data.Torrent +import Data.Torrent.InfoHash +import Data.Torrent.Piece as Torrent +import Network.BitTorrent.Core +import Network.BitTorrent.Exchange.Block as Block +import Network.BitTorrent.Exchange.Message as Message hiding (Status) + + +data Status = Status + { _pending :: [(PeerAddr IP, PieceIx)] + , _bucket :: Bucket + } + +makeLenses ''Status + +nullStatus :: PieceSize -> Status +nullStatus ps = Status [] (Block.empty ps) + +type Updates a = State Status a + +runUpdates :: MVar Status -> Updates a -> IO a +runUpdates v m = undefined + +scheduleBlock :: PeerAddr IP -> Updates (Maybe PieceIx) +scheduleBlock addr = do + bkt <- use bucket + case spans metadataPieceSize bkt of + [] -> return Nothing + ((off, _ ) : _) -> do + let pix = undefined + pending %= ((addr, pix) :) + return (Just pix) + +cancelPending :: PieceIx -> Updates () +cancelPending pix = pending %= L.filter ((pix ==) . snd) + +resetPending :: PeerAddr IP -> Updates () +resetPending addr = pending %= L.filter ((addr ==) . fst) + +parseInfoDict :: BS.ByteString -> InfoHash -> Result InfoDict +parseInfoDict chunk topic = + case BE.decode chunk of + Right (infodict @ InfoDict {..}) + | topic == idInfoHash -> return infodict + | otherwise -> Left "broken infodict" + Left err -> Left $ "unable to parse infodict " ++ err + +-- todo use incremental parsing to avoid BS.concat call +pushBlock :: PeerAddr IP -> Torrent.Piece BS.ByteString -> InfoHash + -> Updates (Maybe InfoDict) +pushBlock addr Torrent.Piece {..} topic = do + p <- use pending + when ((addr, pieceIndex) `L.notElem` p) $ error "not requested" + cancelPending pieceIndex + + bucket %= Block.insert (metadataPieceSize * pieceIndex) pieceData + b <- use bucket + case toPiece b of + Nothing -> return Nothing + Just chunks -> + case parseInfoDict (BL.toStrict chunks) topic of + Right x -> do + pending .= [] + return (Just x) + Left e -> do + pending .= [] + bucket .= Block.empty (Block.size b) + return Nothing diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 4aebdd24..4224a25d 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -17,6 +17,7 @@ module Network.BitTorrent.Exchange.Wire ( -- * Wire Connected , Wire + , ChannelSide (..) -- * Connection , Connection @@ -54,11 +55,7 @@ module Network.BitTorrent.Exchange.Wire , filterQueue , getMaxQueueLength - -- * Query - , getMetadata - -- * Exceptions - , ChannelSide (..) , ProtocolError (..) , WireFailure (..) , peerPenalty @@ -448,11 +445,6 @@ instance Default Options where -- Connection -----------------------------------------------------------------------} -data Cached a = Cached { unCache :: a, cached :: BS.ByteString } - -cache :: (BEncode a) => a -> Cached a -cache s = Cached s (BSL.toStrict $ BE.encode s) - data ConnectionState = ConnectionState { -- | If @not (allowed ExtExtended connCaps)@ then this set is always -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of @@ -477,9 +469,6 @@ data ConnectionState = ConnectionState { -- | Bitfield of remote endpoint. , _connBitfield :: !Bitfield - - -- | Infodict associated with this Connection's connTopic. - , _connMetadata :: Maybe (Cached InfoDict) } makeLenses ''ConnectionState @@ -722,7 +711,6 @@ connectWire session hs addr extCaps chan wire = do } , _connStatus = def , _connBitfield = BF.haveNone 0 - , _connMetadata = Nothing } -- TODO make KA interval configurable @@ -757,43 +745,3 @@ acceptWire sock peerAddr wire = do -- | Used when size of bitfield becomes known. resizeBitfield :: Int -> Connected s () resizeBitfield n = connBitfield %= adjustSize n - -{----------------------------------------------------------------------- --- Metadata exchange ------------------------------------------------------------------------} --- TODO introduce new metadata exchange specific exceptions - -fetchMetadata :: Wire s [BS.ByteString] -fetchMetadata = loop 0 - where - recvData = recvMessage >>= inspect - where - inspect (Extended (EMetadata _ meta)) = - case meta of - MetadataRequest pix -> do - sendMessage (MetadataReject pix) - recvData - MetadataData {..} -> return (piece, totalSize) - MetadataReject _ -> disconnectPeer - MetadataUnknown _ -> recvData - inspect _ = recvData - - loop i = do - sendMessage (MetadataRequest i) - (piece, totalSize) <- recvData - unless (pieceIndex piece == i) $ do - disconnectPeer - - if piece `isLastPiece` totalSize - then pure [pieceData piece] - else (pieceData piece :) <$> loop (succ i) - -getMetadata :: Wire s InfoDict -getMetadata = do - chunks <- fetchMetadata - Connection {..} <- ask - case BE.decode (BS.concat chunks) of - Right (infodict @ InfoDict {..}) - | connTopic == idInfoHash -> return infodict - | otherwise -> error "broken infodict" - Left err -> error $ "unable to parse infodict" ++ err -- cgit v1.2.3