From 4471bd71343e1e259de4c67131e152ac45bcd33d Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 10 Dec 2013 04:18:07 +0400 Subject: Document Wire module --- src/Network/BitTorrent/Exchange/Wire.hs | 280 ++++++++++++++++++++++++-------- 1 file changed, 210 insertions(+), 70 deletions(-) diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index e0e652ec..a0f683c8 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -1,10 +1,14 @@ -- | +-- Module : Network.BitTorrent.Exchange.Wire +-- Copyright : (c) Sam Truzjan 2013 +-- (c) Daniel Gröber 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable -- --- Message flow --- Duplex channell -- This module control /integrity/ of data send and received. -- --- {-# LANGUAGE DeriveDataTypeable #-} module Network.BitTorrent.Exchange.Wire ( -- * Wire @@ -17,38 +21,37 @@ module Network.BitTorrent.Exchange.Wire , isWireFailure , disconnectPeer + -- ** Stats + , ByteStats (..) + , FlowStats (..) + , ConnectionStats (..) + -- ** Connection , Connection - ( connCaps, connTopic - , connRemotePeerId, connThisPeerId - ) - , getConnection + , connProtocol + , connCaps + , connTopic + , connRemotePeerId + , connThisPeerId -- ** Setup , runWire , connectWire , acceptWire - -- ** Query - , getExtCaps - -- ** Messaging , recvMessage , sendMessage + -- ** Query + , getConnection + , getExtCaps + , getStats + + -- ** Conduits , validate , validateBoth , trackStats - - -- ** Stats - , ConnectionStats (..) - , getStats - , askStats - - , recvBytes - , sentBytes - , wastedBytes - , payloadBytes ) where import Control.Applicative @@ -87,6 +90,7 @@ import Data.BEncode as BE -- Exceptions -----------------------------------------------------------------------} +-- | Used to specify initiator of 'ProtocolError'. data ChannelSide = ThisPeer | RemotePeer @@ -98,19 +102,52 @@ instance Default ChannelSide where instance Pretty ChannelSide where pretty = PP.text . show --- | Errors occur when a remote peer violates protocol specification. +-- | A protocol errors occur when a peer violates protocol +-- specification. data ProtocolError -- | Protocol string should be 'BitTorrent Protocol' but remote - -- peer send a different string. + -- peer have sent a different string. = InvalidProtocol ProtocolName - | UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash. - | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id. - | UnknownTopic InfoHash -- ^ peer requested unknown torrent. - | HandshakeRefused -- ^ peer do not send an extended handshake back. - | BitfieldAlreadSend ChannelSide - | InvalidMessage -- TODO caps violation - { violentSender :: ChannelSide -- ^ endpoint sent invalid message - , extensionRequired :: Extension -- ^ + + -- | Sent and received protocol strings do not match. Can occur + -- in 'connectWire' only. + | UnexpectedProtocol ProtocolName + + -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not + -- match with 'hsInfoHash' /this/ peer have sent. Can occur in + -- 'connectWire' only. + | UnexpectedTopic InfoHash + + -- | Some trackers or DHT can return 'PeerId' of a peer. If a + -- remote peer handshaked with different 'hsPeerId' then this + -- exception is raised. Can occur in 'connectWire' only. + | UnexpectedPeerId PeerId + + -- | Accepted peer have sent unknown torrent infohash in + -- 'hsInfoHash' field. This situation usually happen when /this/ + -- peer have deleted the requested torrent. The error can occur in + -- 'acceptWire' function only. + | UnknownTopic InfoHash + + -- | A remote peer have 'ExtExtended' enabled but did not send an + -- 'ExtendedHandshake' back. + | HandshakeRefused + + -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST + -- be send either once or zero times, but either this peer or + -- remote peer send a bitfield message the second time. + | BitfieldAlreadySend ChannelSide + + -- | Capabilities violation. For example this exception can occur + -- when a peer have sent 'Port' message but 'ExtDHT' is not + -- allowed in 'connCaps'. + | DisallowedMessage + { -- | Who sent invalid message. + violentSender :: ChannelSide + + -- | If the 'violentSender' reconnect with this extension + -- enabled then he can try to send this message. + , extensionRequired :: Extension } deriving Show @@ -119,9 +156,24 @@ instance Pretty ProtocolError where -- | Exceptions used to interrupt the current P2P session. data WireFailure - = PeerDisconnected -- ^ A peer not responding. - | DisconnectPeer -- ^ - | ProtocolError ProtocolError + -- | Force termination of wire connection. + -- + -- Normally you should throw only this exception from event loop + -- using 'disconnectPeer', other exceptions are thrown + -- automatically by functions from this module. + -- + = DisconnectPeer + + -- | A peer not responding and did not send a 'KeepAlive' message + -- for a specified period of time. + | PeerDisconnected + + -- | A remote peer have sent some unknown message we unable to + -- parse. + | DecodingError GetException + + -- | See 'ProtocolError' for more details. + | ProtocolError ProtocolError deriving (Show, Typeable) instance Exception WireFailure @@ -137,54 +189,105 @@ isWireFailure _ = return () -- Stats -----------------------------------------------------------------------} +-- | Message stats in one direction. data FlowStats = FlowStats - { messageBytes :: {-# UNPACK #-} !ByteStats + { -- | Sum of byte sequences of all messages. + messageBytes :: {-# UNPACK #-} !ByteStats + -- | Number of the messages sent or received. , messageCount :: {-# UNPACK #-} !Int - -- msgTypes :: Map MessageType Int } deriving Show --- | Note that this is stats is completely different from Progress: --- TODO explain why. +-- | Zeroed stats. +instance Default FlowStats where + def = FlowStats def 0 + +-- | Monoid under addition. +instance Monoid FlowStats where + mempty = def + mappend a b = FlowStats + { messageBytes = messageBytes a <> messageBytes b + , messageCount = messageCount a + messageCount b + } + +-- | Aggregate one more message stats in this direction. +addFlowStats :: ByteStats -> FlowStats -> FlowStats +addFlowStats x FlowStats {..} = FlowStats + { messageBytes = messageBytes <> x + , messageCount = succ messageCount + } + +-- | Message stats in both directions. This data can be retrieved +-- using 'getStats' function. +-- +-- Note that this stats is completely different from +-- 'Data.Torrent.Progress.Progress': payload bytes not necessary +-- equal to downloaded\/uploaded bytes since a peer can send a +-- broken block. +-- data ConnectionStats = ConnectionStats - { incomingFlow :: !ByteStats - , outcomingFlow :: !ByteStats + { -- | Received messages stats. + incomingFlow :: !FlowStats + -- | Sent messages stats. + , outcomingFlow :: !FlowStats } deriving Show +-- | Zeroed stats. instance Default ConnectionStats where def = ConnectionStats def def -addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats -addStats ThisPeer x s = s { outcomingFlow = outcomingFlow s <> x } -addStats RemotePeer x s = s { incomingFlow = incomingFlow s <> x } - -recvBytes :: ConnectionStats -> Int -recvBytes = byteLength . incomingFlow - -sentBytes :: ConnectionStats -> Int -sentBytes = byteLength . outcomingFlow - -wastedBytes :: ConnectionStats -> Int -wastedBytes (ConnectionStats _in out) = overhead _in + overhead out +-- | Monoid under addition. +instance Monoid ConnectionStats where + mempty = def + mappend a b = ConnectionStats + { incomingFlow = incomingFlow a <> incomingFlow b + , outcomingFlow = outcomingFlow a <> outcomingFlow b + } -payloadBytes :: ConnectionStats -> Int -payloadBytes (ConnectionStats _in out) = payload _in + payload out +-- | Aggregate one more message stats in the /specified/ direction. +addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats +addStats ThisPeer x s = s { outcomingFlow = addFlowStats x (outcomingFlow s) } +addStats RemotePeer x s = s { incomingFlow = addFlowStats x (incomingFlow s) } {----------------------------------------------------------------------- -- Connection -----------------------------------------------------------------------} +-- | Connection keep various info about both peers. data Connection = Connection - { connCaps :: !Caps + { -- | /Both/ peers handshaked with this protocol string. The only + -- value is \"Bittorrent Protocol\" but this can be changed in + -- future. + connProtocol :: !ProtocolName + + -- | A set of enabled extensions. This value used to check if a + -- message is allowed to be sent or received. + , connCaps :: !Caps + + -- | /Both/ peers handshaked with this infohash. A connection can + -- handle only one topic, use 'reconnect' to change the current + -- topic. , connTopic :: !InfoHash + + -- | Typically extracted from handshake. , connRemotePeerId :: !PeerId + + -- | Typically extracted from handshake. , connThisPeerId :: !PeerId - , connStats :: !(IORef ConnectionStats) + + -- | If @not (allowed ExtExtended connCaps)@ then this set is + -- always empty. Otherwise it has extension protocol 'MessageId' + -- map. , connExtCaps :: !(IORef ExtendedCaps) + + -- | Various stats about messages sent and received. Stats can be + -- used to protect /this/ peer against flood attacks. + , connStats :: !(IORef ConnectionStats) } instance Pretty Connection where pretty Connection {..} = "Connection" +-- TODO check extended messages too isAllowed :: Connection -> Message -> Bool isAllowed Connection {..} msg | Just ext <- requires msg = ext `allowed` connCaps @@ -233,11 +336,17 @@ connectToPeer p = do -- Wire -----------------------------------------------------------------------} -type Wire = ConduitM Message Message (ReaderT Connection IO) +-- | do not expose this so we can change it without breaking api +type Connectivity = ReaderT Connection + +-- | A duplex channel connected to a remote peer which keep tracks +-- connection parameters. +type Wire a = ConduitM Message Message (Connectivity IO) a protocolError :: ProtocolError -> Wire a protocolError = monadThrow . ProtocolError +-- | Forcefully terminate wire session and close socket. disconnectPeer :: Wire a disconnectPeer = monadThrow DisconnectPeer @@ -256,22 +365,24 @@ modifyRef f m = do ref <- lift (asks f) liftIO (atomicModifyIORef' ref (\x -> (m x, ()))) -getExtCaps :: Wire ExtendedCaps -getExtCaps = readRef connExtCaps - setExtCaps :: ExtendedCaps -> Wire () setExtCaps = writeRef connExtCaps +-- | Get current extended capabilities. Note that this value can +-- change in current session if either this or remote peer will +-- initiate rehandshaking. +getExtCaps :: Wire ExtendedCaps +getExtCaps = readRef connExtCaps + +-- | Get current stats. Note that this value will change with the next +-- sent or received message. getStats :: Wire ConnectionStats getStats = readRef connStats -askStats :: (ConnectionStats -> a) -> Wire a -askStats f = f <$> getStats - putStats :: ChannelSide -> Message -> Wire () putStats side msg = modifyRef connStats (addStats side (stats msg)) - +-- | See the 'Connection' section for more info. getConnection :: Wire Connection getConnection = lift ask @@ -284,7 +395,7 @@ validate side = await >>= maybe (return ()) yieldCheck Nothing -> return () Just ext | ext `allowed` caps -> yield msg - | otherwise -> protocolError $ InvalidMessage side ext + | otherwise -> protocolError $ DisallowedMessage side ext validateBoth :: Wire () -> Wire () validateBoth action = do @@ -299,6 +410,7 @@ trackStats = do Nothing -> return () Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer +-- | Normally you should use 'connectWire' or 'acceptWire'. runWire :: Wire () -> Socket -> Connection -> IO () runWire action sock = runReaderT $ sourceSocket sock $= @@ -307,16 +419,20 @@ runWire action sock = runReaderT $ S.conduitPut S.put $$ sinkSocket sock +-- | This function will block until a peer send new message. You can +-- also use 'await'. +recvMessage :: Wire Message +recvMessage = await >>= maybe (monadThrow PeerDisconnected) return + +-- | You can also use 'yield'. sendMessage :: PeerMessage msg => msg -> Wire () sendMessage msg = do ecaps <- getExtCaps yield $ envelop ecaps msg -recvMessage :: Wire Message -recvMessage = await >>= maybe (monadThrow PeerDisconnected) return - extendedHandshake :: ExtendedCaps -> Wire () extendedHandshake caps = do + -- TODO add other params to the handshake sendMessage $ nullExtendedHandshake caps msg <- recvMessage case msg of @@ -324,6 +440,18 @@ extendedHandshake caps = do setExtCaps $ ehsCaps <> caps _ -> protocolError HandshakeRefused +rehandshake :: ExtendedCaps -> Wire () +rehandshake caps = undefined + +reconnect :: Wire () +reconnect = undefined + +-- | Initiate 'Wire' connection and handshake with a peer. This +-- function will also do extension protocol handshake if 'ExtExtended' +-- is enabled on both sides. +-- +-- This function can throw 'WireFailure' exception. +-- connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO () connectWire hs addr extCaps wire = bracket (connectToPeer addr) close $ \ sock -> do @@ -332,6 +460,9 @@ connectWire hs addr extCaps wire = unless (def == hsProtocol hs') $ do throwIO $ ProtocolError $ InvalidProtocol (hsProtocol hs') + unless (hsProtocol hs == hsProtocol hs') $ do + throwIO $ ProtocolError $ UnexpectedProtocol (hsProtocol hs') + unless (hsInfoHash hs == hsInfoHash hs') $ do throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs') @@ -346,7 +477,8 @@ connectWire hs addr extCaps wire = extCapsRef <- newIORef def statsRef <- newIORef def runWire wire' sock $ Connection - { connCaps = caps + { connProtocol = hsProtocol hs + , connCaps = caps , connTopic = hsInfoHash hs , connRemotePeerId = hsPeerId hs' , connThisPeerId = hsPeerId hs @@ -354,5 +486,13 @@ connectWire hs addr extCaps wire = , connStats = statsRef } -acceptWire :: Wire () -> Socket -> IO () -acceptWire = undefined +-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed +-- socket. For peer listener loop the 'acceptSafe' should be +-- prefered against 'accept'. The socket will be close at exit. +-- +-- This function can throw 'WireFailure' exception. +-- +acceptWire :: Socket -> Wire () -> IO () +acceptWire sock wire = do + bracket (return sock) close $ \ _ -> do + error "acceptWire: not implemented" -- cgit v1.2.3