From 32b0f3570237e4d4742fc8874980f2b479c1ae75 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 5 Dec 2013 03:26:56 +0400 Subject: Add Wire module --- src/Network/BitTorrent/Core/PeerAddr.hs | 8 -- src/Network/BitTorrent/Exchange.hs | 1 - src/Network/BitTorrent/Exchange/Bus.hs | 66 ---------- src/Network/BitTorrent/Exchange/Session.hs | 29 +--- src/Network/BitTorrent/Exchange/Wire.hs | 205 +++++++++++++++++++++++++++++ 5 files changed, 210 insertions(+), 99 deletions(-) delete mode 100644 src/Network/BitTorrent/Exchange/Bus.hs create mode 100644 src/Network/BitTorrent/Exchange/Wire.hs (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/Core/PeerAddr.hs b/src/Network/BitTorrent/Core/PeerAddr.hs index ed2dc672..81754e5e 100644 --- a/src/Network/BitTorrent/Core/PeerAddr.hs +++ b/src/Network/BitTorrent/Core/PeerAddr.hs @@ -18,7 +18,6 @@ module Network.BitTorrent.Core.PeerAddr PeerAddr(..) , defaultPorts , peerSockAddr - , connectToPeer ) where import Control.Applicative @@ -116,10 +115,3 @@ peerSockAddr = SockAddrInet <$> (g . peerPort) <*> (htonl . peerIP) g :: PortNumber -> PortNumber g = id - --- | Tries to connect to peer using reasonable default parameters. -connectToPeer :: PeerAddr -> IO Socket -connectToPeer p = do - sock <- socket AF_INET Stream Network.Socket.defaultProtocol - connect sock (peerSockAddr p) - return sock diff --git a/src/Network/BitTorrent/Exchange.hs b/src/Network/BitTorrent/Exchange.hs index 57b2c81f..c1377449 100644 --- a/src/Network/BitTorrent/Exchange.hs +++ b/src/Network/BitTorrent/Exchange.hs @@ -84,7 +84,6 @@ import Control.Monad.Trans.Resource import Data.IORef import Data.Conduit as C import Data.Conduit.Cereal as S ---import Data.Conduit.Serialization.Binary as B import Data.Conduit.Network import Data.Serialize as S import Text.PrettyPrint as PP hiding (($$)) diff --git a/src/Network/BitTorrent/Exchange/Bus.hs b/src/Network/BitTorrent/Exchange/Bus.hs deleted file mode 100644 index 7de91180..00000000 --- a/src/Network/BitTorrent/Exchange/Bus.hs +++ /dev/null @@ -1,66 +0,0 @@ -module Network.BitTorrent.Exchange.Bus ( ) where - -type PeerWire = ConduitM Message Message IO - -runPeerWire :: Socket -> PeerWire () -> IO () -runPeerWire sock action = - sourceSocket sock $= - S.conduitGet S.get $= --- B.conduitDecode $= - action $= - S.conduitPut S.put $$ --- B.conduitEncode $$ - sinkSocket sock - -awaitMessage :: P2P Message -awaitMessage = P2P $ ReaderT $ const $ {-# SCC awaitMessage #-} go - where - go = await >>= maybe (monadThrow PeerDisconnected) return -{-# INLINE awaitMessage #-} - -yieldMessage :: Message -> P2P () -yieldMessage msg = P2P $ ReaderT $ const $ {-# SCC yieldMessage #-} do - C.yield msg -{-# INLINE yieldMessage #-} - --- TODO send vectored -flushPending :: P2P () -flushPending = {-# SCC flushPending #-} do - session <- ask - queue <- liftIO (getPending session) - mapM_ yieldMessage queue - -{----------------------------------------------------------------------- - P2P monad ------------------------------------------------------------------------} - -filterMeaninless :: P2P Message Message -filterMeaninless = undefined - --- | --- Exceptions: --- --- * SessionException: is visible only within one peer --- session. Use this exception to terminate P2P session, but not --- the swarm session. --- -newtype P2P a = P2P { - unP2P :: ReaderT PeerSession PeerWire a - } deriving ( Functor, Applicative, Monad - , MonadIO, MonadThrow, MonadActive - , MonadReader PeerSession - ) - -instance MonadState SessionState P2P where - get = asks sessionState >>= liftIO . readIORef - {-# INLINE get #-} - put !s = asks sessionState >>= \ref -> liftIO $ writeIORef ref s - {-# INLINE put #-} - -runP2P :: (Socket, PeerSession) -> P2P () -> IO () -runP2P (sock, ses) action = - handle isIOException $ - runPeerWire sock (runReaderT (unP2P action) ses) - where - isIOException :: IOException -> IO () - isIOException _ = return () diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index ffc7816e..d2a9aaaf 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs @@ -22,6 +22,11 @@ import Network.BitTorrent.Exchange.Status type Extension = () +data ExchangeError + = InvalidPieceIx PieceIx + | InvalidBlock BlockIx + | CorruptedPiece PieceIx + -- | Peer session contain all data necessary for peer to peer -- communication. data ExchangeSession = ExchangeSession @@ -69,30 +74,6 @@ $(makeLenses ''SessionState) --getSessionState :: PeerSession -> IO SessionState --getSessionState PeerSession {..} = readIORef sessionState -{----------------------------------------------------------------------- --- Exceptions ------------------------------------------------------------------------} - --- | Exceptions used to interrupt the current P2P session. This --- exceptions will NOT affect other P2P sessions, DHT, peer <-> --- tracker, or any other session. --- -data ExchangeFailure - = PeerDisconnected - | ProtocolError Doc - | UnknownTorrent InfoHash - deriving (Show, Typeable) - -instance Exception ExchangeFailure - --- | Do nothing with exception, used with 'handle' or 'try'. -isSessionException :: Monad m => ExchangeFailure -> m () -isSessionException _ = return () - --- | The same as 'isSessionException' but output to stdout the catched --- exception, for debugging purposes only. -putSessionException :: ExchangeFailure -> IO () -putSessionException = print {- {----------------------------------------------------------------------- -- Broadcasting: Have, Cancel, Bitfield, SuggestPiece diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs new file mode 100644 index 00000000..dd77a915 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -0,0 +1,205 @@ +-- | +-- +-- Message flow +-- Duplex channell +-- This module control /integrity/ of data send and received. +-- + +{-# LANGUAGE DeriveDataTypeable #-} +module Network.BitTorrent.Exchange.Wire + ( -- * Exception + ProtocolError (..) + , WireFailure (..) + , isWireFailure + + -- * Wire + , Connection (..) + , Wire + , runWire + , connectWire + , acceptWire + ) where + +import Control.Exception +import Control.Monad.Reader +import Data.ByteString as BS +import Data.Conduit +import Data.Conduit.Cereal as S +import Data.Conduit.Network +import Data.Default +import Data.Maybe +import Data.Monoid +import Data.Serialize as S +import Data.Typeable +import Network +import Network.Socket +import Network.Socket.ByteString as BS +import Text.PrettyPrint as PP hiding (($$), (<>)) +import Text.PrettyPrint.Class + +import Data.Torrent.InfoHash +import Network.BitTorrent.Core.PeerId +import Network.BitTorrent.Core.PeerAddr +import Network.BitTorrent.Exchange.Message + + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data ChannelSide + = ThisPeer + | RemotePeer + deriving (Show, Eq, Enum) + +-- | Errors occur when a remote peer violates protocol specification. +data ProtocolError + = UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash. + | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id. + | UnknownTopic InfoHash -- ^ peer requested unknown torrent. + | InvalidMessage + { violentSender :: ChannelSide -- ^ endpoint sent invalid message + , extensionRequired :: Extension -- ^ + } + deriving Show + +instance Pretty ProtocolError where + pretty = PP.text . show + +-- | Exceptions used to interrupt the current P2P session. +data WireFailure + = PeerDisconnected -- ^ A peer not responding. + | DisconnectPeer -- ^ + | ProtocolError ProtocolError + deriving (Show, Typeable) + +instance Exception WireFailure + +instance Pretty WireFailure where + pretty = PP.text . show + +-- | Do nothing with exception, used with 'handle' or 'try'. +isWireFailure :: Monad m => WireFailure -> m () +isWireFailure _ = return () + +{----------------------------------------------------------------------- +-- Connection +-----------------------------------------------------------------------} + +data Connection = Connection + { connCaps :: !Caps + , connExtCaps :: !ExtendedCaps -- TODO caps can be enabled during communication + , connTopic :: !InfoHash + , connRemotePeerId :: !PeerId + , connThisPeerId :: !PeerId + } deriving Show + +instance Pretty Connection where + pretty Connection {..} = "Connection" + +isAllowed :: Connection -> Message -> Bool +isAllowed Connection {..} msg + | Just ext <- requires msg = allowed connCaps ext + | otherwise = True + +{----------------------------------------------------------------------- +-- Hanshaking +-----------------------------------------------------------------------} + +-- | TODO remove socket stuff to corresponding module +sendHandshake :: Socket -> Handshake -> IO () +sendHandshake sock hs = sendAll sock (S.encode hs) + +recvHandshake :: Socket -> IO Handshake +recvHandshake sock = do + header <- BS.recv sock 1 + unless (BS.length header == 1) $ + throw $ userError "Unable to receive handshake header." + + let protocolLen = BS.head header + let restLen = handshakeSize protocolLen - 1 + + body <- BS.recv sock restLen + let resp = BS.cons protocolLen body + either (throwIO . userError) return $ S.decode resp + +-- | Handshaking with a peer specified by the second argument. +-- +-- It's important to send handshake first because /accepting/ peer +-- do not know handshake topic and will wait until /connecting/ peer +-- will send handshake. +-- +initiateHandshake :: Socket -> Handshake -> IO Handshake +initiateHandshake sock hs = do + sendHandshake sock hs + recvHandshake sock + +-- | Tries to connect to peer using reasonable default parameters. +connectToPeer :: PeerAddr -> IO Socket +connectToPeer p = do + sock <- socket AF_INET Stream Network.Socket.defaultProtocol + connect sock (peerSockAddr p) + return sock + +{----------------------------------------------------------------------- +-- Wire +-----------------------------------------------------------------------} + +type Wire = ConduitM Message Message (ReaderT Connection IO) + +validate :: Wire () +validate = do + mmsg <- await + case mmsg of + Nothing -> return () + Just msg -> do + valid <- lift $ asks (`isAllowed` msg) + if valid then yield msg else error "TODO" + + +runWire :: Wire () -> Socket -> Connection -> IO () +runWire action sock = runReaderT $ + sourceSocket sock $= + S.conduitGet S.get $= + action $= + S.conduitPut S.put $$ + sinkSocket sock + +sendMessage :: PeerMessage msg => msg -> Wire () +sendMessage msg = do + ecaps <- lift $ asks connExtCaps + yield $ envelop ecaps msg + +recvMessage :: Wire Message +recvMessage = undefined + + + +extendedHandshake :: Wire () +extendedHandshake = undefined + +connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO () +connectWire hs addr caps wire = + bracket (connectToPeer addr) close $ \ sock -> do + hs' <- initiateHandshake sock hs + + unless (hsInfoHash hs == hsInfoHash hs') $ do + throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs') + + unless (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerID addr)) $ do + throwIO $ ProtocolError $ UnexpectedPeerId (hsPeerId hs') + + let caps = hsReserved hs <> hsReserved hs' + if allowed caps ExtExtended + then return () else return () + + runWire wire sock $ Connection + { connCaps = caps + , connExtCaps = def + , connTopic = hsInfoHash hs + , connRemotePeerId = hsPeerId hs' + , connThisPeerId = hsPeerId hs + } + +acceptWire :: Wire () -> Socket -> IO () +acceptWire = undefined -- cgit v1.2.3