diff options
-rw-r--r-- | src/Network/BitTorrent/Exchange/Message.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 112 |
2 files changed, 90 insertions, 24 deletions
diff --git a/src/Network/BitTorrent/Exchange/Message.hs b/src/Network/BitTorrent/Exchange/Message.hs index e0a7dad7..6fcf22f7 100644 --- a/src/Network/BitTorrent/Exchange/Message.hs +++ b/src/Network/BitTorrent/Exchange/Message.hs | |||
@@ -243,7 +243,7 @@ instance Serialize ProtocolString where | |||
243 | -- to establish connection between peers. | 243 | -- to establish connection between peers. |
244 | -- | 244 | -- |
245 | data Handshake = Handshake { | 245 | data Handshake = Handshake { |
246 | -- | Identifier of the protocol. This is usually equal to defaultProtocol | 246 | -- | Identifier of the protocol. This is usually equal to 'def'. |
247 | hsProtocol :: ProtocolString | 247 | hsProtocol :: ProtocolString |
248 | 248 | ||
249 | -- | Reserved bytes used to specify supported BEP's. | 249 | -- | Reserved bytes used to specify supported BEP's. |
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 3ec01ca1..a6ee35d8 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -11,7 +11,7 @@ module Network.BitTorrent.Exchange.Wire | |||
11 | Wire | 11 | Wire |
12 | 12 | ||
13 | -- ** Exceptions | 13 | -- ** Exceptions |
14 | , ChannelSide | 14 | , ChannelSide (..) |
15 | , ProtocolError (..) | 15 | , ProtocolError (..) |
16 | , WireFailure (..) | 16 | , WireFailure (..) |
17 | , isWireFailure | 17 | , isWireFailure |
@@ -32,11 +32,23 @@ module Network.BitTorrent.Exchange.Wire | |||
32 | -- ** Query | 32 | -- ** Query |
33 | , getExtCaps | 33 | , getExtCaps |
34 | 34 | ||
35 | -- ** Messaging | ||
36 | , validate | ||
37 | , validateBoth | ||
38 | , keepStats | ||
39 | |||
35 | -- ** Stats | 40 | -- ** Stats |
36 | , ConnectionStats (..) | 41 | , ConnectionStats (..) |
37 | , getStats | 42 | , getStats |
43 | , askStats | ||
44 | |||
45 | , recvBytes | ||
46 | , sentBytes | ||
47 | , wastedBytes | ||
48 | , payloadBytes | ||
38 | ) where | 49 | ) where |
39 | 50 | ||
51 | import Control.Applicative | ||
40 | import Control.Exception | 52 | import Control.Exception |
41 | import Control.Monad.Reader | 53 | import Control.Monad.Reader |
42 | import Data.ByteString as BS | 54 | import Data.ByteString as BS |
@@ -71,7 +83,10 @@ import Network.BitTorrent.Exchange.Message | |||
71 | data ChannelSide | 83 | data ChannelSide |
72 | = ThisPeer | 84 | = ThisPeer |
73 | | RemotePeer | 85 | | RemotePeer |
74 | deriving (Show, Eq, Enum) | 86 | deriving (Show, Eq, Enum, Bounded) |
87 | |||
88 | instance Default ChannelSide where | ||
89 | def = ThisPeer | ||
75 | 90 | ||
76 | instance Pretty ChannelSide where | 91 | instance Pretty ChannelSide where |
77 | pretty = PP.text . show | 92 | pretty = PP.text . show |
@@ -115,33 +130,53 @@ isWireFailure _ = return () | |||
115 | -- Stats | 130 | -- Stats |
116 | -----------------------------------------------------------------------} | 131 | -----------------------------------------------------------------------} |
117 | 132 | ||
133 | type ByteCount = Int | ||
134 | |||
118 | data MessageStats = MessageStats | 135 | data MessageStats = MessageStats |
119 | { overhead :: {-# UNPACK #-} !Int | 136 | { overhead :: {-# UNPACK #-} !ByteCount |
120 | , payload :: {-# UNPACK #-} !Int | 137 | , payload :: {-# UNPACK #-} !ByteCount |
121 | } deriving Show | 138 | } deriving Show |
122 | 139 | ||
140 | instance Default MessageStats where | ||
141 | def = MessageStats 0 0 | ||
142 | |||
143 | instance Monoid MessageStats where | ||
144 | mempty = mempty | ||
145 | mappend a b = MessageStats | ||
146 | { overhead = overhead a + overhead b | ||
147 | , payload = payload a + payload b | ||
148 | } | ||
149 | |||
150 | |||
123 | messageSize :: MessageStats -> Int | 151 | messageSize :: MessageStats -> Int |
124 | messageSize = undefined | 152 | messageSize MessageStats {..} = overhead + payload |
153 | |||
154 | messageStats :: Message -> MessageStats | ||
155 | messageStats = undefined | ||
125 | 156 | ||
126 | data ConnectionStats = ConnectionStats | 157 | data ConnectionStats = ConnectionStats |
127 | { a :: !MessageStats | 158 | { incomingFlow :: !MessageStats |
128 | , b :: !MessageStats | 159 | , outcomingFlow :: !MessageStats |
129 | } | 160 | } deriving Show |
130 | 161 | ||
131 | sentBytes :: ConnectionStats -> Int | 162 | instance Default ConnectionStats where |
132 | sentBytes = undefined | 163 | def = ConnectionStats def def |
164 | |||
165 | addStats :: ChannelSide -> MessageStats -> ConnectionStats -> ConnectionStats | ||
166 | addStats ThisPeer x s = s { outcomingFlow = outcomingFlow s <> x } | ||
167 | addStats RemotePeer x s = s { incomingFlow = incomingFlow s <> x } | ||
133 | 168 | ||
134 | recvBytes :: ConnectionStats -> Int | 169 | recvBytes :: ConnectionStats -> Int |
135 | recvBytes = undefined | 170 | recvBytes = messageSize . incomingFlow |
171 | |||
172 | sentBytes :: ConnectionStats -> Int | ||
173 | sentBytes = messageSize . outcomingFlow | ||
136 | 174 | ||
137 | wastedBytes :: ConnectionStats -> Int | 175 | wastedBytes :: ConnectionStats -> Int |
138 | wastedBytes = undefined | 176 | wastedBytes (ConnectionStats _in out) = overhead _in + overhead out |
139 | 177 | ||
140 | payloadBytes :: ConnectionStats -> Int | 178 | payloadBytes :: ConnectionStats -> Int |
141 | payloadBytes = undefined | 179 | payloadBytes (ConnectionStats _in out) = payload _in + payload out |
142 | |||
143 | getStats :: Wire ConnectionStats | ||
144 | getStats = undefined | ||
145 | 180 | ||
146 | {----------------------------------------------------------------------- | 181 | {----------------------------------------------------------------------- |
147 | -- Connection | 182 | -- Connection |
@@ -153,6 +188,7 @@ data Connection = Connection | |||
153 | , connTopic :: !InfoHash | 188 | , connTopic :: !InfoHash |
154 | , connRemotePeerId :: !PeerId | 189 | , connRemotePeerId :: !PeerId |
155 | , connThisPeerId :: !PeerId | 190 | , connThisPeerId :: !PeerId |
191 | , connStats :: !(IORef ConnectionStats) | ||
156 | } | 192 | } |
157 | 193 | ||
158 | instance Pretty Connection where | 194 | instance Pretty Connection where |
@@ -214,15 +250,36 @@ protocolError = monadThrow . ProtocolError | |||
214 | disconnectPeer :: Wire a | 250 | disconnectPeer :: Wire a |
215 | disconnectPeer = monadThrow DisconnectPeer | 251 | disconnectPeer = monadThrow DisconnectPeer |
216 | 252 | ||
253 | readRef :: (Connection -> IORef a) -> Wire a | ||
254 | readRef f = do | ||
255 | ref <- lift (asks f) | ||
256 | liftIO (readIORef ref) | ||
257 | |||
258 | writeRef :: (Connection -> IORef a) -> a -> Wire () | ||
259 | writeRef f v = do | ||
260 | ref <- lift (asks f) | ||
261 | liftIO (writeIORef ref v) | ||
262 | |||
263 | modifyRef :: (Connection -> IORef a) -> (a -> a) -> Wire () | ||
264 | modifyRef f m = do | ||
265 | ref <- lift (asks f) | ||
266 | liftIO (atomicModifyIORef' ref (\x -> (m x, ()))) | ||
267 | |||
217 | getExtCaps :: Wire ExtendedCaps | 268 | getExtCaps :: Wire ExtendedCaps |
218 | getExtCaps = do | 269 | getExtCaps = readRef connExtCaps |
219 | capsRef <- lift $ asks connExtCaps | ||
220 | liftIO $ readIORef capsRef | ||
221 | 270 | ||
222 | setExtCaps :: ExtendedCaps -> Wire () | 271 | setExtCaps :: ExtendedCaps -> Wire () |
223 | setExtCaps caps = do | 272 | setExtCaps = writeRef connExtCaps |
224 | capsRef <- lift $ asks connExtCaps | 273 | |
225 | liftIO $ writeIORef capsRef caps | 274 | getStats :: Wire ConnectionStats |
275 | getStats = readRef connStats | ||
276 | |||
277 | askStats :: (ConnectionStats -> a) -> Wire a | ||
278 | askStats f = f <$> getStats | ||
279 | |||
280 | putStats :: ChannelSide -> Message -> Wire () | ||
281 | putStats side msg = modifyRef connStats (addStats side (messageStats msg)) | ||
282 | |||
226 | 283 | ||
227 | getConnection :: Wire Connection | 284 | getConnection :: Wire Connection |
228 | getConnection = lift ask | 285 | getConnection = lift ask |
@@ -244,6 +301,13 @@ validateBoth action = do | |||
244 | action | 301 | action |
245 | validate ThisPeer | 302 | validate ThisPeer |
246 | 303 | ||
304 | keepStats :: Wire () | ||
305 | keepStats = do | ||
306 | mmsg <- await | ||
307 | case mmsg of | ||
308 | Nothing -> return () | ||
309 | Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer | ||
310 | |||
247 | runWire :: Wire () -> Socket -> Connection -> IO () | 311 | runWire :: Wire () -> Socket -> Connection -> IO () |
248 | runWire action sock = runReaderT $ | 312 | runWire action sock = runReaderT $ |
249 | sourceSocket sock $= | 313 | sourceSocket sock $= |
@@ -258,7 +322,7 @@ sendMessage msg = do | |||
258 | yield $ envelop ecaps msg | 322 | yield $ envelop ecaps msg |
259 | 323 | ||
260 | recvMessage :: Wire Message | 324 | recvMessage :: Wire Message |
261 | recvMessage = undefined | 325 | recvMessage = await >>= maybe (monadThrow PeerDisconnected) return |
262 | 326 | ||
263 | extendedHandshake :: ExtendedCaps -> Wire () | 327 | extendedHandshake :: ExtendedCaps -> Wire () |
264 | extendedHandshake caps = do | 328 | extendedHandshake caps = do |
@@ -289,12 +353,14 @@ connectWire hs addr extCaps wire = | |||
289 | else wire | 353 | else wire |
290 | 354 | ||
291 | extCapsRef <- newIORef def | 355 | extCapsRef <- newIORef def |
356 | statsRef <- newIORef def | ||
292 | runWire wire' sock $ Connection | 357 | runWire wire' sock $ Connection |
293 | { connCaps = caps | 358 | { connCaps = caps |
294 | , connExtCaps = extCapsRef | 359 | , connExtCaps = extCapsRef |
295 | , connTopic = hsInfoHash hs | 360 | , connTopic = hsInfoHash hs |
296 | , connRemotePeerId = hsPeerId hs' | 361 | , connRemotePeerId = hsPeerId hs' |
297 | , connThisPeerId = hsPeerId hs | 362 | , connThisPeerId = hsPeerId hs |
363 | , connStats = statsRef | ||
298 | } | 364 | } |
299 | 365 | ||
300 | acceptWire :: Wire () -> Socket -> IO () | 366 | acceptWire :: Wire () -> Socket -> IO () |