summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network/BitTorrent/Exchange/Message.hs2
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs112
2 files changed, 90 insertions, 24 deletions
diff --git a/src/Network/BitTorrent/Exchange/Message.hs b/src/Network/BitTorrent/Exchange/Message.hs
index e0a7dad7..6fcf22f7 100644
--- a/src/Network/BitTorrent/Exchange/Message.hs
+++ b/src/Network/BitTorrent/Exchange/Message.hs
@@ -243,7 +243,7 @@ instance Serialize ProtocolString where
243-- to establish connection between peers. 243-- to establish connection between peers.
244-- 244--
245data Handshake = Handshake { 245data Handshake = Handshake {
246 -- | Identifier of the protocol. This is usually equal to defaultProtocol 246 -- | Identifier of the protocol. This is usually equal to 'def'.
247 hsProtocol :: ProtocolString 247 hsProtocol :: ProtocolString
248 248
249 -- | Reserved bytes used to specify supported BEP's. 249 -- | Reserved bytes used to specify supported BEP's.
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 3ec01ca1..a6ee35d8 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -11,7 +11,7 @@ module Network.BitTorrent.Exchange.Wire
11 Wire 11 Wire
12 12
13 -- ** Exceptions 13 -- ** Exceptions
14 , ChannelSide 14 , ChannelSide (..)
15 , ProtocolError (..) 15 , ProtocolError (..)
16 , WireFailure (..) 16 , WireFailure (..)
17 , isWireFailure 17 , isWireFailure
@@ -32,11 +32,23 @@ module Network.BitTorrent.Exchange.Wire
32 -- ** Query 32 -- ** Query
33 , getExtCaps 33 , getExtCaps
34 34
35 -- ** Messaging
36 , validate
37 , validateBoth
38 , keepStats
39
35 -- ** Stats 40 -- ** Stats
36 , ConnectionStats (..) 41 , ConnectionStats (..)
37 , getStats 42 , getStats
43 , askStats
44
45 , recvBytes
46 , sentBytes
47 , wastedBytes
48 , payloadBytes
38 ) where 49 ) where
39 50
51import Control.Applicative
40import Control.Exception 52import Control.Exception
41import Control.Monad.Reader 53import Control.Monad.Reader
42import Data.ByteString as BS 54import Data.ByteString as BS
@@ -71,7 +83,10 @@ import Network.BitTorrent.Exchange.Message
71data ChannelSide 83data ChannelSide
72 = ThisPeer 84 = ThisPeer
73 | RemotePeer 85 | RemotePeer
74 deriving (Show, Eq, Enum) 86 deriving (Show, Eq, Enum, Bounded)
87
88instance Default ChannelSide where
89 def = ThisPeer
75 90
76instance Pretty ChannelSide where 91instance Pretty ChannelSide where
77 pretty = PP.text . show 92 pretty = PP.text . show
@@ -115,33 +130,53 @@ isWireFailure _ = return ()
115-- Stats 130-- Stats
116-----------------------------------------------------------------------} 131-----------------------------------------------------------------------}
117 132
133type ByteCount = Int
134
118data MessageStats = MessageStats 135data MessageStats = MessageStats
119 { overhead :: {-# UNPACK #-} !Int 136 { overhead :: {-# UNPACK #-} !ByteCount
120 , payload :: {-# UNPACK #-} !Int 137 , payload :: {-# UNPACK #-} !ByteCount
121 } deriving Show 138 } deriving Show
122 139
140instance Default MessageStats where
141 def = MessageStats 0 0
142
143instance Monoid MessageStats where
144 mempty = mempty
145 mappend a b = MessageStats
146 { overhead = overhead a + overhead b
147 , payload = payload a + payload b
148 }
149
150
123messageSize :: MessageStats -> Int 151messageSize :: MessageStats -> Int
124messageSize = undefined 152messageSize MessageStats {..} = overhead + payload
153
154messageStats :: Message -> MessageStats
155messageStats = undefined
125 156
126data ConnectionStats = ConnectionStats 157data ConnectionStats = ConnectionStats
127 { a :: !MessageStats 158 { incomingFlow :: !MessageStats
128 , b :: !MessageStats 159 , outcomingFlow :: !MessageStats
129 } 160 } deriving Show
130 161
131sentBytes :: ConnectionStats -> Int 162instance Default ConnectionStats where
132sentBytes = undefined 163 def = ConnectionStats def def
164
165addStats :: ChannelSide -> MessageStats -> ConnectionStats -> ConnectionStats
166addStats ThisPeer x s = s { outcomingFlow = outcomingFlow s <> x }
167addStats RemotePeer x s = s { incomingFlow = incomingFlow s <> x }
133 168
134recvBytes :: ConnectionStats -> Int 169recvBytes :: ConnectionStats -> Int
135recvBytes = undefined 170recvBytes = messageSize . incomingFlow
171
172sentBytes :: ConnectionStats -> Int
173sentBytes = messageSize . outcomingFlow
136 174
137wastedBytes :: ConnectionStats -> Int 175wastedBytes :: ConnectionStats -> Int
138wastedBytes = undefined 176wastedBytes (ConnectionStats _in out) = overhead _in + overhead out
139 177
140payloadBytes :: ConnectionStats -> Int 178payloadBytes :: ConnectionStats -> Int
141payloadBytes = undefined 179payloadBytes (ConnectionStats _in out) = payload _in + payload out
142
143getStats :: Wire ConnectionStats
144getStats = undefined
145 180
146{----------------------------------------------------------------------- 181{-----------------------------------------------------------------------
147-- Connection 182-- Connection
@@ -153,6 +188,7 @@ data Connection = Connection
153 , connTopic :: !InfoHash 188 , connTopic :: !InfoHash
154 , connRemotePeerId :: !PeerId 189 , connRemotePeerId :: !PeerId
155 , connThisPeerId :: !PeerId 190 , connThisPeerId :: !PeerId
191 , connStats :: !(IORef ConnectionStats)
156 } 192 }
157 193
158instance Pretty Connection where 194instance Pretty Connection where
@@ -214,15 +250,36 @@ protocolError = monadThrow . ProtocolError
214disconnectPeer :: Wire a 250disconnectPeer :: Wire a
215disconnectPeer = monadThrow DisconnectPeer 251disconnectPeer = monadThrow DisconnectPeer
216 252
253readRef :: (Connection -> IORef a) -> Wire a
254readRef f = do
255 ref <- lift (asks f)
256 liftIO (readIORef ref)
257
258writeRef :: (Connection -> IORef a) -> a -> Wire ()
259writeRef f v = do
260 ref <- lift (asks f)
261 liftIO (writeIORef ref v)
262
263modifyRef :: (Connection -> IORef a) -> (a -> a) -> Wire ()
264modifyRef f m = do
265 ref <- lift (asks f)
266 liftIO (atomicModifyIORef' ref (\x -> (m x, ())))
267
217getExtCaps :: Wire ExtendedCaps 268getExtCaps :: Wire ExtendedCaps
218getExtCaps = do 269getExtCaps = readRef connExtCaps
219 capsRef <- lift $ asks connExtCaps
220 liftIO $ readIORef capsRef
221 270
222setExtCaps :: ExtendedCaps -> Wire () 271setExtCaps :: ExtendedCaps -> Wire ()
223setExtCaps caps = do 272setExtCaps = writeRef connExtCaps
224 capsRef <- lift $ asks connExtCaps 273
225 liftIO $ writeIORef capsRef caps 274getStats :: Wire ConnectionStats
275getStats = readRef connStats
276
277askStats :: (ConnectionStats -> a) -> Wire a
278askStats f = f <$> getStats
279
280putStats :: ChannelSide -> Message -> Wire ()
281putStats side msg = modifyRef connStats (addStats side (messageStats msg))
282
226 283
227getConnection :: Wire Connection 284getConnection :: Wire Connection
228getConnection = lift ask 285getConnection = lift ask
@@ -244,6 +301,13 @@ validateBoth action = do
244 action 301 action
245 validate ThisPeer 302 validate ThisPeer
246 303
304keepStats :: Wire ()
305keepStats = do
306 mmsg <- await
307 case mmsg of
308 Nothing -> return ()
309 Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer
310
247runWire :: Wire () -> Socket -> Connection -> IO () 311runWire :: Wire () -> Socket -> Connection -> IO ()
248runWire action sock = runReaderT $ 312runWire action sock = runReaderT $
249 sourceSocket sock $= 313 sourceSocket sock $=
@@ -258,7 +322,7 @@ sendMessage msg = do
258 yield $ envelop ecaps msg 322 yield $ envelop ecaps msg
259 323
260recvMessage :: Wire Message 324recvMessage :: Wire Message
261recvMessage = undefined 325recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
262 326
263extendedHandshake :: ExtendedCaps -> Wire () 327extendedHandshake :: ExtendedCaps -> Wire ()
264extendedHandshake caps = do 328extendedHandshake caps = do
@@ -289,12 +353,14 @@ connectWire hs addr extCaps wire =
289 else wire 353 else wire
290 354
291 extCapsRef <- newIORef def 355 extCapsRef <- newIORef def
356 statsRef <- newIORef def
292 runWire wire' sock $ Connection 357 runWire wire' sock $ Connection
293 { connCaps = caps 358 { connCaps = caps
294 , connExtCaps = extCapsRef 359 , connExtCaps = extCapsRef
295 , connTopic = hsInfoHash hs 360 , connTopic = hsInfoHash hs
296 , connRemotePeerId = hsPeerId hs' 361 , connRemotePeerId = hsPeerId hs'
297 , connThisPeerId = hsPeerId hs 362 , connThisPeerId = hsPeerId hs
363 , connStats = statsRef
298 } 364 }
299 365
300acceptWire :: Wire () -> Socket -> IO () 366acceptWire :: Wire () -> Socket -> IO ()