From 32b0f3570237e4d4742fc8874980f2b479c1ae75 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Thu, 5 Dec 2013 03:26:56 +0400 Subject: Add Wire module --- src/Network/BitTorrent/Exchange/Wire.hs | 205 ++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 src/Network/BitTorrent/Exchange/Wire.hs (limited to 'src/Network/BitTorrent/Exchange/Wire.hs') diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs new file mode 100644 index 00000000..dd77a915 --- /dev/null +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -0,0 +1,205 @@ +-- | +-- +-- Message flow +-- Duplex channell +-- This module control /integrity/ of data send and received. +-- + +{-# LANGUAGE DeriveDataTypeable #-} +module Network.BitTorrent.Exchange.Wire + ( -- * Exception + ProtocolError (..) + , WireFailure (..) + , isWireFailure + + -- * Wire + , Connection (..) + , Wire + , runWire + , connectWire + , acceptWire + ) where + +import Control.Exception +import Control.Monad.Reader +import Data.ByteString as BS +import Data.Conduit +import Data.Conduit.Cereal as S +import Data.Conduit.Network +import Data.Default +import Data.Maybe +import Data.Monoid +import Data.Serialize as S +import Data.Typeable +import Network +import Network.Socket +import Network.Socket.ByteString as BS +import Text.PrettyPrint as PP hiding (($$), (<>)) +import Text.PrettyPrint.Class + +import Data.Torrent.InfoHash +import Network.BitTorrent.Core.PeerId +import Network.BitTorrent.Core.PeerAddr +import Network.BitTorrent.Exchange.Message + + +{----------------------------------------------------------------------- +-- Exceptions +-----------------------------------------------------------------------} + +data ChannelSide + = ThisPeer + | RemotePeer + deriving (Show, Eq, Enum) + +-- | Errors occur when a remote peer violates protocol specification. +data ProtocolError + = UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash. + | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id. + | UnknownTopic InfoHash -- ^ peer requested unknown torrent. + | InvalidMessage + { violentSender :: ChannelSide -- ^ endpoint sent invalid message + , extensionRequired :: Extension -- ^ + } + deriving Show + +instance Pretty ProtocolError where + pretty = PP.text . show + +-- | Exceptions used to interrupt the current P2P session. +data WireFailure + = PeerDisconnected -- ^ A peer not responding. + | DisconnectPeer -- ^ + | ProtocolError ProtocolError + deriving (Show, Typeable) + +instance Exception WireFailure + +instance Pretty WireFailure where + pretty = PP.text . show + +-- | Do nothing with exception, used with 'handle' or 'try'. +isWireFailure :: Monad m => WireFailure -> m () +isWireFailure _ = return () + +{----------------------------------------------------------------------- +-- Connection +-----------------------------------------------------------------------} + +data Connection = Connection + { connCaps :: !Caps + , connExtCaps :: !ExtendedCaps -- TODO caps can be enabled during communication + , connTopic :: !InfoHash + , connRemotePeerId :: !PeerId + , connThisPeerId :: !PeerId + } deriving Show + +instance Pretty Connection where + pretty Connection {..} = "Connection" + +isAllowed :: Connection -> Message -> Bool +isAllowed Connection {..} msg + | Just ext <- requires msg = allowed connCaps ext + | otherwise = True + +{----------------------------------------------------------------------- +-- Hanshaking +-----------------------------------------------------------------------} + +-- | TODO remove socket stuff to corresponding module +sendHandshake :: Socket -> Handshake -> IO () +sendHandshake sock hs = sendAll sock (S.encode hs) + +recvHandshake :: Socket -> IO Handshake +recvHandshake sock = do + header <- BS.recv sock 1 + unless (BS.length header == 1) $ + throw $ userError "Unable to receive handshake header." + + let protocolLen = BS.head header + let restLen = handshakeSize protocolLen - 1 + + body <- BS.recv sock restLen + let resp = BS.cons protocolLen body + either (throwIO . userError) return $ S.decode resp + +-- | Handshaking with a peer specified by the second argument. +-- +-- It's important to send handshake first because /accepting/ peer +-- do not know handshake topic and will wait until /connecting/ peer +-- will send handshake. +-- +initiateHandshake :: Socket -> Handshake -> IO Handshake +initiateHandshake sock hs = do + sendHandshake sock hs + recvHandshake sock + +-- | Tries to connect to peer using reasonable default parameters. +connectToPeer :: PeerAddr -> IO Socket +connectToPeer p = do + sock <- socket AF_INET Stream Network.Socket.defaultProtocol + connect sock (peerSockAddr p) + return sock + +{----------------------------------------------------------------------- +-- Wire +-----------------------------------------------------------------------} + +type Wire = ConduitM Message Message (ReaderT Connection IO) + +validate :: Wire () +validate = do + mmsg <- await + case mmsg of + Nothing -> return () + Just msg -> do + valid <- lift $ asks (`isAllowed` msg) + if valid then yield msg else error "TODO" + + +runWire :: Wire () -> Socket -> Connection -> IO () +runWire action sock = runReaderT $ + sourceSocket sock $= + S.conduitGet S.get $= + action $= + S.conduitPut S.put $$ + sinkSocket sock + +sendMessage :: PeerMessage msg => msg -> Wire () +sendMessage msg = do + ecaps <- lift $ asks connExtCaps + yield $ envelop ecaps msg + +recvMessage :: Wire Message +recvMessage = undefined + + + +extendedHandshake :: Wire () +extendedHandshake = undefined + +connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO () +connectWire hs addr caps wire = + bracket (connectToPeer addr) close $ \ sock -> do + hs' <- initiateHandshake sock hs + + unless (hsInfoHash hs == hsInfoHash hs') $ do + throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs') + + unless (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerID addr)) $ do + throwIO $ ProtocolError $ UnexpectedPeerId (hsPeerId hs') + + let caps = hsReserved hs <> hsReserved hs' + if allowed caps ExtExtended + then return () else return () + + runWire wire sock $ Connection + { connCaps = caps + , connExtCaps = def + , connTopic = hsInfoHash hs + , connRemotePeerId = hsPeerId hs' + , connThisPeerId = hsPeerId hs + } + +acceptWire :: Wire () -> Socket -> IO () +acceptWire = undefined -- cgit v1.2.3