From cdb75165ee0e4f2c36f5766fba4c7bc4bd31db2b Mon Sep 17 00:00:00 2001 From: Sam T Date: Tue, 11 Jun 2013 04:27:12 +0400 Subject: ~ Add keepalive timeouts. --- src/Network/BitTorrent.hs | 7 +- src/Network/BitTorrent/Exchange.hs | 182 +++++++++++++++++--------------- src/Network/BitTorrent/Internal.hs | 210 +++++++++++++++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 87 deletions(-) create mode 100644 src/Network/BitTorrent/Internal.hs diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 5d6034f6..5fbc5ff6 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs @@ -7,6 +7,7 @@ -- module Network.BitTorrent ( module BT + , module Data.Torrent -- * Tracker @@ -16,12 +17,12 @@ module Network.BitTorrent , PeerSession ) where +import Data.Torrent import Network.BitTorrent.Internal - import Network.BitTorrent.Extension as BT import Network.BitTorrent.Peer as BT import Network.BitTorrent.Exchange as BT import Network.BitTorrent.Tracker as BT ---discover :: SwarmSession -> (Chan PeerAddr -> IO a) -> IO a ---discover = undefined +--discover :: SwarmSession -> ([PeerAddr] -> IO a) -> IO a +--discover = withTracker diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 4fe90cda..b23ca667 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -11,6 +11,7 @@ {-# LANGUAGE RecordWildCards #-} module Network.BitTorrent.Exchange ( P2P, withPeer + , awaitEvent, signalEvent ) where import Control.Applicative @@ -41,117 +42,130 @@ import Network.BitTorrent.Peer import Data.Bitfield as BF import Data.Torrent -{----------------------------------------------------------------------- - P2P monad ------------------------------------------------------------------------} - -type PeerWire = ConduitM Message Message IO - -waitMessage :: PeerWire Message -waitMessage = await >>= maybe waitMessage return - -signalMessage :: Message -> PeerWire () -signalMessage = C.yield - -newtype P2P a = P2P { - runP2P :: ReaderT PeerSession PeerWire a - } deriving (Monad, MonadReader PeerSession, MonadIO) - -instance MonadState Bitfield P2P where - -runConduit :: Socket -> Conduit Message IO Message -> IO () -runConduit sock p2p = - sourceSocket sock $= - conduitGet S.get $= - forever p2p $= - conduitPut S.put $$ - sinkSocket sock - -withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () -withPeer se addr p2p = - withPeerSession se addr $ \(sock, pses) -> do - runConduit sock (runReaderT (runP2P p2p) pses) data Event = Available Bitfield | Want | Block +{----------------------------------------------------------------------- + Peer wire +-----------------------------------------------------------------------} +type PeerWire = ConduitM Message Message IO -waitForEvent :: P2P Event -waitForEvent = P2P (ReaderT nextEvent) - where - nextEvent se @ PeerSession {..} = waitMessage >>= diff - where - -- diff finds difference between - diff KeepAlive = do - signalMessage KeepAlive - nextEvent se +waitMessage :: PeerSession -> PeerWire Message +waitMessage se = do + mmsg <- await + case mmsg of + Nothing -> waitMessage se + Just msg -> do + liftIO $ updateIncoming se + return msg - handleMessage Choke = do - SessionStatus {..} <- liftIO $ readIORef peerSessionStatus - if psChoking sePeerStatus - then nextEvent se - else undefined +signalMessage :: Message -> PeerSession -> PeerWire () +signalMessage msg se = do + C.yield msg + liftIO $ updateOutcoming se - handleMessage Unchoke = return $ Available BF.difference - handleMessage Interested = return undefined - handleMessage NotInterested = return undefined +getPieceCount :: PeerSession -> IO PieceCount +getPieceCount = undefined - handleMessage (Have ix) = do +nextEvent :: PeerSession -> PeerWire Event +nextEvent se @ PeerSession {..} = waitMessage se >>= diff + where + -- diff finds difference between +-- diff KeepAlive = nextEvent se + diff msg = do + liftIO $ print (ppMessage msg) + nextEvent se + + handleMessage Choke = do + SessionStatus {..} <- liftIO $ readIORef peerSessionStatus + if psChoking sePeerStatus + then nextEvent se + else undefined + + handleMessage Unchoke = undefined + --return $ Available BF.difference + + handleMessage Interested = return undefined + handleMessage NotInterested = return undefined + handleMessage (Have ix) = do + pc <- liftIO $ getPieceCount se + haveMessage $ have ix (haveNone pc) -- TODO singleton + + handleMessage (Bitfield bf) = undefined + handleMessage (Request bix) = do + undefined + + handleMessage msg @ (Piece blk) = undefined + handleMessage msg @ (Port _) + = checkExtension msg ExtDHT $ do + undefined + + handleMessage msg @ HaveAll + = checkExtension msg ExtFast $ do pc <- liftIO $ getPieceCount se - haveMessage $ have ix (haveNone pc) -- TODO singleton - - handleMessage (Bitfield bf) = undefined - handleMessage (Request bix) = do - undefined - - handleMessage (Piece blk) = undefined - handleMessage (Port _) - = checkExtension msg ExtDHT $ do - undefined + haveMessage (haveAll pc) - handleMessage msg @ HaveAll - = checkExtension msg ExtFast $ do - pc <- liftIO $ getPieceCount se - haveMessage (haveAll pc) - - handleMessage msg @ HaveNone - = checkExtension msg ExtFast $ do - pc <- liftIO $ getPieceCount se - haveMessage (haveNone pc) + handleMessage msg @ HaveNone + = checkExtension msg ExtFast $ do + pc <- liftIO $ getPieceCount se + haveMessage (haveNone pc) - handleMessage msg @ (SuggestPiece ix) + handleMessage msg @ (SuggestPiece ix) = checkExtension msg ExtFast $ do undefined - handleMessage msg @ (RejectRequest ix) + handleMessage msg @ (RejectRequest ix) = checkExtension msg ExtFast $ do undefined - handleMessage msg @ (AllowedFast pix) + handleMessage msg @ (AllowedFast pix) = checkExtension msg ExtFast $ do undefined - haveMessage bf = do - cbf <- liftIO $ readIORef $ clientBitfield swarmSession - if undefined -- ix `member` bf - then nextEvent se - else return $ Available diff + haveMessage bf = do + cbf <- liftIO $ readIORef $ clientBitfield swarmSession + if undefined -- ix `member` bf + then nextEvent se + else undefined -- return $ Available diff - checkExtension msg requredExtension action - | requredExtension `elem` enabledExtensions = action - | otherwise = liftIO $ throwIO $ userError errorMsg - where - errorMsg = show (ppExtension requredExtension) - ++ "not enabled, but peer sent" - ++ show (ppMessage msg) + checkExtension msg requredExtension action + | requredExtension `elem` enabledExtensions = action + | otherwise = liftIO $ throwIO $ userError errorMsg + where + errorMsg = show (ppExtension requredExtension) + ++ "not enabled, but peer sent" + ++ show (ppMessage msg) +{----------------------------------------------------------------------- + P2P monad +-----------------------------------------------------------------------} +newtype P2P a = P2P { + runP2P :: ReaderT PeerSession PeerWire a + } deriving (Monad, MonadReader PeerSession, MonadIO) -getPieceCount :: PeerSession -> IO PieceCount -getPieceCount = undefined +instance MonadState Bitfield P2P where + +runConduit :: Socket -> Conduit Message IO Message -> IO () +runConduit sock p2p = + sourceSocket sock $= + conduitGet S.get $= + forever p2p $= + conduitPut S.put $$ + sinkSocket sock + +withPeer :: SwarmSession -> PeerAddr -> P2P () -> IO () +withPeer se addr p2p = + withPeerSession se addr $ \(sock, pses) -> do + runConduit sock (runReaderT (runP2P p2p) pses) + + +awaitEvent :: P2P Event +awaitEvent = P2P (ReaderT nextEvent) signalEvent :: Event -> P2P () signalEvent = undefined diff --git a/src/Network/BitTorrent/Internal.hs b/src/Network/BitTorrent/Internal.hs new file mode 100644 index 00000000..d34c6236 --- /dev/null +++ b/src/Network/BitTorrent/Internal.hs @@ -0,0 +1,210 @@ +-- | +-- Copyright : (c) Sam T. 2013 +-- License : MIT +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- This module implement opaque broadcast message passing. It +-- provides sessions needed by Network.BitTorrent and +-- Network.BitTorrent.Exchange and modules. To hide some internals +-- of this module we detach it from Exchange. +-- +{-# LANGUAGE RecordWildCards #-} +module Network.BitTorrent.Internal + ( ClientSession(..), newClient + , SwarmSession(..), newLeacher, newSeeder + , PeerSession(..), withPeerSession + + -- * Timeouts + , updateIncoming, updateOutcoming + ) where + +import Control.Applicative +import Control.Concurrent +import Control.Concurrent.STM +import Control.Exception + +import Data.IORef +import Data.Function +import Data.Ord +import Data.Set as S + +import Data.Conduit +import Data.Conduit.Cereal +import Data.Conduit.Network +import Data.Serialize + +import Network +import Network.Socket +import Network.Socket.ByteString + +import GHC.Event as Ev + +import Data.Bitfield as BF +import Data.Torrent +import Network.BitTorrent.Extension +import Network.BitTorrent.Peer +import Network.BitTorrent.Exchange.Protocol as BT + + + +{----------------------------------------------------------------------- + Client session +-----------------------------------------------------------------------} + +-- | In one application you could have many clients. +data ClientSession = ClientSession { + clientPeerID :: PeerID -- ^ + , allowedExtensions :: [Extension] -- ^ + , swarmSessions :: TVar (Set SwarmSession) + , eventManager :: EventManager + } + +instance Eq ClientSession where + (==) = (==) `on` clientPeerID + +instance Ord ClientSession where + compare = comparing clientPeerID + +newClient :: [Extension] -> IO ClientSession +newClient exts = ClientSession <$> newPeerID + <*> pure exts + <*> newTVarIO S.empty + <*> Ev.new + +{----------------------------------------------------------------------- + Swarm session +-----------------------------------------------------------------------} + +-- | Extensions are set globally by +-- Swarm session are un +data SwarmSession = SwarmSession { + torrentInfoHash :: InfoHash + , clientSession :: ClientSession + , clientBitfield :: IORef Bitfield + , connectedPeers :: TVar (Set PeerSession) + } + +instance Eq SwarmSession where + (==) = (==) `on` torrentInfoHash + +instance Ord SwarmSession where + compare = comparing torrentInfoHash + +newSwarmSession :: Bitfield -> ClientSession -> Torrent -> IO SwarmSession +newSwarmSession bf cs @ ClientSession {..} Torrent {..} + = SwarmSession <$> pure tInfoHash + <*> pure cs + <*> newIORef bf + <*> newTVarIO S.empty + +newSeeder :: ClientSession -> Torrent -> IO SwarmSession +newSeeder cs t @ Torrent {..} + = newSwarmSession (haveAll (pieceCount tInfo)) cs t + +newLeacher :: ClientSession -> Torrent -> IO SwarmSession +newLeacher cs t @ Torrent {..} + = newSwarmSession (haveNone (pieceCount tInfo)) cs t + +isLeacher :: SwarmSession -> IO Bool +isLeacher = undefined + +{----------------------------------------------------------------------- + Peer session +-----------------------------------------------------------------------} + +data PeerSession = PeerSession { + connectedPeerAddr :: PeerAddr + , swarmSession :: SwarmSession + , enabledExtensions :: [Extension] + + -- | To dissconnect from died peers appropriately we should check + -- if a peer do not sent the KA message within given interval. If + -- yes, we should throw an exception in 'TimeoutCallback' and + -- close session between peers. + -- + -- We should update timeout if we /receive/ any message within + -- timeout interval to keep connection up. + , incomingTimeout :: TimeoutKey + + -- | To send KA message appropriately we should know when was last + -- time we sent a message to a peer. To do that we keep registered + -- timeout in event manager and if we do not sent any message to + -- the peer within given interval then we send KA message in + -- 'TimeoutCallback'. + -- + -- We should update timeout if we /send/ any message within timeout + -- to avoid reduntant KA messages. + , outcomingTimeout :: TimeoutKey + + , broadcastMessages :: Chan [Message] + , peerBitfield :: IORef Bitfield + , peerSessionStatus :: IORef SessionStatus + } + +instance Eq PeerSession where + (==) = (==) `on` connectedPeerAddr + +instance Ord PeerSession where + compare = comparing connectedPeerAddr + +-- TODO check if it connected yet peer +withPeerSession :: SwarmSession -> PeerAddr + -> ((Socket, PeerSession) -> IO a) + -> IO a + +withPeerSession ss @ SwarmSession {..} addr + = bracket openSession closeSession + where + openSession = do + let caps = encodeExts $ allowedExtensions $ clientSession + let pid = clientPeerID $ clientSession + let chs = Handshake defaultBTProtocol caps torrentInfoHash pid + + sock <- connectToPeer addr + phs <- handshake sock chs `onException` close sock + + let enabled = decodeExts (enabledCaps caps (handshakeCaps phs)) + ps <- PeerSession addr ss enabled + <$> registerTimeout (eventManager clientSession) + maxIncomingTime abortSession + <*> registerTimeout (eventManager clientSession) + maxOutcomingTime (sendKA sock) + <*> newChan + <*> pure clientBitfield + <*> newIORef initSessionStatus + return (sock, ps) + + closeSession = close . fst + +{----------------------------------------------------------------------- + Timeouts +-----------------------------------------------------------------------} + +sec :: Int +sec = 1000 * 1000 + +maxIncomingTime :: Int +maxIncomingTime = 120 * sec + +maxOutcomingTime :: Int +maxOutcomingTime = 60 * sec + +-- | Should be called after we have received any message from a peer. +updateIncoming :: PeerSession -> IO () +updateIncoming PeerSession {..} = do + updateTimeout (eventManager (clientSession swarmSession)) + incomingTimeout maxIncomingTime + +-- | Should be called before we have send any message to a peer. +updateOutcoming :: PeerSession -> IO () +updateOutcoming PeerSession {..} = + updateTimeout (eventManager (clientSession swarmSession)) + outcomingTimeout maxOutcomingTime + +sendKA :: Socket -> IO () +sendKA sock = sendAll sock (encode BT.KeepAlive) + +abortSession :: IO () +abortSession = error "abortSession: not implemented" -- cgit v1.2.3