From 9913a611431efddd9f641b0ec3d85c7f7497ce72 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 8 Dec 2013 09:02:03 +0400 Subject: Add more stats bookkeeping --- src/Network/BitTorrent/Exchange/Message.hs | 2 +- src/Network/BitTorrent/Exchange/Wire.hs | 112 +++++++++++++++++++++++------ 2 files changed, 90 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/Exchange/Message.hs b/src/Network/BitTorrent/Exchange/Message.hs index e0a7dad7..6fcf22f7 100644 --- a/src/Network/BitTorrent/Exchange/Message.hs +++ b/src/Network/BitTorrent/Exchange/Message.hs @@ -243,7 +243,7 @@ instance Serialize ProtocolString where -- to establish connection between peers. -- data Handshake = Handshake { - -- | Identifier of the protocol. This is usually equal to defaultProtocol + -- | Identifier of the protocol. This is usually equal to 'def'. hsProtocol :: ProtocolString -- | Reserved bytes used to specify supported BEP's. diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 3ec01ca1..a6ee35d8 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs @@ -11,7 +11,7 @@ module Network.BitTorrent.Exchange.Wire Wire -- ** Exceptions - , ChannelSide + , ChannelSide (..) , ProtocolError (..) , WireFailure (..) , isWireFailure @@ -32,11 +32,23 @@ module Network.BitTorrent.Exchange.Wire -- ** Query , getExtCaps + -- ** Messaging + , validate + , validateBoth + , keepStats + -- ** Stats , ConnectionStats (..) , getStats + , askStats + + , recvBytes + , sentBytes + , wastedBytes + , payloadBytes ) where +import Control.Applicative import Control.Exception import Control.Monad.Reader import Data.ByteString as BS @@ -71,7 +83,10 @@ import Network.BitTorrent.Exchange.Message data ChannelSide = ThisPeer | RemotePeer - deriving (Show, Eq, Enum) + deriving (Show, Eq, Enum, Bounded) + +instance Default ChannelSide where + def = ThisPeer instance Pretty ChannelSide where pretty = PP.text . show @@ -115,33 +130,53 @@ isWireFailure _ = return () -- Stats -----------------------------------------------------------------------} +type ByteCount = Int + data MessageStats = MessageStats - { overhead :: {-# UNPACK #-} !Int - , payload :: {-# UNPACK #-} !Int + { overhead :: {-# UNPACK #-} !ByteCount + , payload :: {-# UNPACK #-} !ByteCount } deriving Show +instance Default MessageStats where + def = MessageStats 0 0 + +instance Monoid MessageStats where + mempty = mempty + mappend a b = MessageStats + { overhead = overhead a + overhead b + , payload = payload a + payload b + } + + messageSize :: MessageStats -> Int -messageSize = undefined +messageSize MessageStats {..} = overhead + payload + +messageStats :: Message -> MessageStats +messageStats = undefined data ConnectionStats = ConnectionStats - { a :: !MessageStats - , b :: !MessageStats - } + { incomingFlow :: !MessageStats + , outcomingFlow :: !MessageStats + } deriving Show -sentBytes :: ConnectionStats -> Int -sentBytes = undefined +instance Default ConnectionStats where + def = ConnectionStats def def + +addStats :: ChannelSide -> MessageStats -> ConnectionStats -> ConnectionStats +addStats ThisPeer x s = s { outcomingFlow = outcomingFlow s <> x } +addStats RemotePeer x s = s { incomingFlow = incomingFlow s <> x } recvBytes :: ConnectionStats -> Int -recvBytes = undefined +recvBytes = messageSize . incomingFlow + +sentBytes :: ConnectionStats -> Int +sentBytes = messageSize . outcomingFlow wastedBytes :: ConnectionStats -> Int -wastedBytes = undefined +wastedBytes (ConnectionStats _in out) = overhead _in + overhead out payloadBytes :: ConnectionStats -> Int -payloadBytes = undefined - -getStats :: Wire ConnectionStats -getStats = undefined +payloadBytes (ConnectionStats _in out) = payload _in + payload out {----------------------------------------------------------------------- -- Connection @@ -153,6 +188,7 @@ data Connection = Connection , connTopic :: !InfoHash , connRemotePeerId :: !PeerId , connThisPeerId :: !PeerId + , connStats :: !(IORef ConnectionStats) } instance Pretty Connection where @@ -214,15 +250,36 @@ protocolError = monadThrow . ProtocolError disconnectPeer :: Wire a disconnectPeer = monadThrow DisconnectPeer +readRef :: (Connection -> IORef a) -> Wire a +readRef f = do + ref <- lift (asks f) + liftIO (readIORef ref) + +writeRef :: (Connection -> IORef a) -> a -> Wire () +writeRef f v = do + ref <- lift (asks f) + liftIO (writeIORef ref v) + +modifyRef :: (Connection -> IORef a) -> (a -> a) -> Wire () +modifyRef f m = do + ref <- lift (asks f) + liftIO (atomicModifyIORef' ref (\x -> (m x, ()))) + getExtCaps :: Wire ExtendedCaps -getExtCaps = do - capsRef <- lift $ asks connExtCaps - liftIO $ readIORef capsRef +getExtCaps = readRef connExtCaps setExtCaps :: ExtendedCaps -> Wire () -setExtCaps caps = do - capsRef <- lift $ asks connExtCaps - liftIO $ writeIORef capsRef caps +setExtCaps = writeRef connExtCaps + +getStats :: Wire ConnectionStats +getStats = readRef connStats + +askStats :: (ConnectionStats -> a) -> Wire a +askStats f = f <$> getStats + +putStats :: ChannelSide -> Message -> Wire () +putStats side msg = modifyRef connStats (addStats side (messageStats msg)) + getConnection :: Wire Connection getConnection = lift ask @@ -244,6 +301,13 @@ validateBoth action = do action validate ThisPeer +keepStats :: Wire () +keepStats = do + mmsg <- await + case mmsg of + Nothing -> return () + Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer + runWire :: Wire () -> Socket -> Connection -> IO () runWire action sock = runReaderT $ sourceSocket sock $= @@ -258,7 +322,7 @@ sendMessage msg = do yield $ envelop ecaps msg recvMessage :: Wire Message -recvMessage = undefined +recvMessage = await >>= maybe (monadThrow PeerDisconnected) return extendedHandshake :: ExtendedCaps -> Wire () extendedHandshake caps = do @@ -289,12 +353,14 @@ connectWire hs addr extCaps wire = else wire extCapsRef <- newIORef def + statsRef <- newIORef def runWire wire' sock $ Connection { connCaps = caps , connExtCaps = extCapsRef , connTopic = hsInfoHash hs , connRemotePeerId = hsPeerId hs' , connThisPeerId = hsPeerId hs + , connStats = statsRef } acceptWire :: Wire () -> Socket -> IO () -- cgit v1.2.3