summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2013-12-10 04:18:07 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2013-12-10 04:18:07 +0400
commit4471bd71343e1e259de4c67131e152ac45bcd33d (patch)
tree5e724c12596a834673aa7a602d778e95b624f76d
parent5da773256d10f244e3e38b8da57ad8e78e340709 (diff)
Document Wire module
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs280
1 files changed, 210 insertions, 70 deletions
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index e0e652ec..a0f683c8 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -1,10 +1,14 @@
1-- | 1-- |
2-- Module : Network.BitTorrent.Exchange.Wire
3-- Copyright : (c) Sam Truzjan 2013
4-- (c) Daniel Gröber 2013
5-- License : BSD3
6-- Maintainer : pxqr.sta@gmail.com
7-- Stability : experimental
8-- Portability : portable
2-- 9--
3-- Message flow
4-- Duplex channell
5-- This module control /integrity/ of data send and received. 10-- This module control /integrity/ of data send and received.
6-- 11--
7--
8{-# LANGUAGE DeriveDataTypeable #-} 12{-# LANGUAGE DeriveDataTypeable #-}
9module Network.BitTorrent.Exchange.Wire 13module Network.BitTorrent.Exchange.Wire
10 ( -- * Wire 14 ( -- * Wire
@@ -17,38 +21,37 @@ module Network.BitTorrent.Exchange.Wire
17 , isWireFailure 21 , isWireFailure
18 , disconnectPeer 22 , disconnectPeer
19 23
24 -- ** Stats
25 , ByteStats (..)
26 , FlowStats (..)
27 , ConnectionStats (..)
28
20 -- ** Connection 29 -- ** Connection
21 , Connection 30 , Connection
22 ( connCaps, connTopic 31 , connProtocol
23 , connRemotePeerId, connThisPeerId 32 , connCaps
24 ) 33 , connTopic
25 , getConnection 34 , connRemotePeerId
35 , connThisPeerId
26 36
27 -- ** Setup 37 -- ** Setup
28 , runWire 38 , runWire
29 , connectWire 39 , connectWire
30 , acceptWire 40 , acceptWire
31 41
32 -- ** Query
33 , getExtCaps
34
35 -- ** Messaging 42 -- ** Messaging
36 , recvMessage 43 , recvMessage
37 , sendMessage 44 , sendMessage
38 45
46 -- ** Query
47 , getConnection
48 , getExtCaps
49 , getStats
50
51 -- ** Conduits
39 , validate 52 , validate
40 , validateBoth 53 , validateBoth
41 , trackStats 54 , trackStats
42
43 -- ** Stats
44 , ConnectionStats (..)
45 , getStats
46 , askStats
47
48 , recvBytes
49 , sentBytes
50 , wastedBytes
51 , payloadBytes
52 ) where 55 ) where
53 56
54import Control.Applicative 57import Control.Applicative
@@ -87,6 +90,7 @@ import Data.BEncode as BE
87-- Exceptions 90-- Exceptions
88-----------------------------------------------------------------------} 91-----------------------------------------------------------------------}
89 92
93-- | Used to specify initiator of 'ProtocolError'.
90data ChannelSide 94data ChannelSide
91 = ThisPeer 95 = ThisPeer
92 | RemotePeer 96 | RemotePeer
@@ -98,19 +102,52 @@ instance Default ChannelSide where
98instance Pretty ChannelSide where 102instance Pretty ChannelSide where
99 pretty = PP.text . show 103 pretty = PP.text . show
100 104
101-- | Errors occur when a remote peer violates protocol specification. 105-- | A protocol errors occur when a peer violates protocol
106-- specification.
102data ProtocolError 107data ProtocolError
103 -- | Protocol string should be 'BitTorrent Protocol' but remote 108 -- | Protocol string should be 'BitTorrent Protocol' but remote
104 -- peer send a different string. 109 -- peer have sent a different string.
105 = InvalidProtocol ProtocolName 110 = InvalidProtocol ProtocolName
106 | UnexpectedTopic InfoHash -- ^ peer replied with unexpected infohash. 111
107 | UnexpectedPeerId PeerId -- ^ peer replied with unexpected peer id. 112 -- | Sent and received protocol strings do not match. Can occur
108 | UnknownTopic InfoHash -- ^ peer requested unknown torrent. 113 -- in 'connectWire' only.
109 | HandshakeRefused -- ^ peer do not send an extended handshake back. 114 | UnexpectedProtocol ProtocolName
110 | BitfieldAlreadSend ChannelSide 115
111 | InvalidMessage -- TODO caps violation 116 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
112 { violentSender :: ChannelSide -- ^ endpoint sent invalid message 117 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
113 , extensionRequired :: Extension -- ^ 118 -- 'connectWire' only.
119 | UnexpectedTopic InfoHash
120
121 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
122 -- remote peer handshaked with different 'hsPeerId' then this
123 -- exception is raised. Can occur in 'connectWire' only.
124 | UnexpectedPeerId PeerId
125
126 -- | Accepted peer have sent unknown torrent infohash in
127 -- 'hsInfoHash' field. This situation usually happen when /this/
128 -- peer have deleted the requested torrent. The error can occur in
129 -- 'acceptWire' function only.
130 | UnknownTopic InfoHash
131
132 -- | A remote peer have 'ExtExtended' enabled but did not send an
133 -- 'ExtendedHandshake' back.
134 | HandshakeRefused
135
136 -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST
137 -- be send either once or zero times, but either this peer or
138 -- remote peer send a bitfield message the second time.
139 | BitfieldAlreadySend ChannelSide
140
141 -- | Capabilities violation. For example this exception can occur
142 -- when a peer have sent 'Port' message but 'ExtDHT' is not
143 -- allowed in 'connCaps'.
144 | DisallowedMessage
145 { -- | Who sent invalid message.
146 violentSender :: ChannelSide
147
148 -- | If the 'violentSender' reconnect with this extension
149 -- enabled then he can try to send this message.
150 , extensionRequired :: Extension
114 } 151 }
115 deriving Show 152 deriving Show
116 153
@@ -119,9 +156,24 @@ instance Pretty ProtocolError where
119 156
120-- | Exceptions used to interrupt the current P2P session. 157-- | Exceptions used to interrupt the current P2P session.
121data WireFailure 158data WireFailure
122 = PeerDisconnected -- ^ A peer not responding. 159 -- | Force termination of wire connection.
123 | DisconnectPeer -- ^ 160 --
124 | ProtocolError ProtocolError 161 -- Normally you should throw only this exception from event loop
162 -- using 'disconnectPeer', other exceptions are thrown
163 -- automatically by functions from this module.
164 --
165 = DisconnectPeer
166
167 -- | A peer not responding and did not send a 'KeepAlive' message
168 -- for a specified period of time.
169 | PeerDisconnected
170
171 -- | A remote peer have sent some unknown message we unable to
172 -- parse.
173 | DecodingError GetException
174
175 -- | See 'ProtocolError' for more details.
176 | ProtocolError ProtocolError
125 deriving (Show, Typeable) 177 deriving (Show, Typeable)
126 178
127instance Exception WireFailure 179instance Exception WireFailure
@@ -137,54 +189,105 @@ isWireFailure _ = return ()
137-- Stats 189-- Stats
138-----------------------------------------------------------------------} 190-----------------------------------------------------------------------}
139 191
192-- | Message stats in one direction.
140data FlowStats = FlowStats 193data FlowStats = FlowStats
141 { messageBytes :: {-# UNPACK #-} !ByteStats 194 { -- | Sum of byte sequences of all messages.
195 messageBytes :: {-# UNPACK #-} !ByteStats
196 -- | Number of the messages sent or received.
142 , messageCount :: {-# UNPACK #-} !Int 197 , messageCount :: {-# UNPACK #-} !Int
143 -- msgTypes :: Map MessageType Int
144 } deriving Show 198 } deriving Show
145 199
146-- | Note that this is stats is completely different from Progress: 200-- | Zeroed stats.
147-- TODO explain why. 201instance Default FlowStats where
202 def = FlowStats def 0
203
204-- | Monoid under addition.
205instance Monoid FlowStats where
206 mempty = def
207 mappend a b = FlowStats
208 { messageBytes = messageBytes a <> messageBytes b
209 , messageCount = messageCount a + messageCount b
210 }
211
212-- | Aggregate one more message stats in this direction.
213addFlowStats :: ByteStats -> FlowStats -> FlowStats
214addFlowStats x FlowStats {..} = FlowStats
215 { messageBytes = messageBytes <> x
216 , messageCount = succ messageCount
217 }
218
219-- | Message stats in both directions. This data can be retrieved
220-- using 'getStats' function.
221--
222-- Note that this stats is completely different from
223-- 'Data.Torrent.Progress.Progress': payload bytes not necessary
224-- equal to downloaded\/uploaded bytes since a peer can send a
225-- broken block.
226--
148data ConnectionStats = ConnectionStats 227data ConnectionStats = ConnectionStats
149 { incomingFlow :: !ByteStats 228 { -- | Received messages stats.
150 , outcomingFlow :: !ByteStats 229 incomingFlow :: !FlowStats
230 -- | Sent messages stats.
231 , outcomingFlow :: !FlowStats
151 } deriving Show 232 } deriving Show
152 233
234-- | Zeroed stats.
153instance Default ConnectionStats where 235instance Default ConnectionStats where
154 def = ConnectionStats def def 236 def = ConnectionStats def def
155 237
156addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats 238-- | Monoid under addition.
157addStats ThisPeer x s = s { outcomingFlow = outcomingFlow s <> x } 239instance Monoid ConnectionStats where
158addStats RemotePeer x s = s { incomingFlow = incomingFlow s <> x } 240 mempty = def
159 241 mappend a b = ConnectionStats
160recvBytes :: ConnectionStats -> Int 242 { incomingFlow = incomingFlow a <> incomingFlow b
161recvBytes = byteLength . incomingFlow 243 , outcomingFlow = outcomingFlow a <> outcomingFlow b
162 244 }
163sentBytes :: ConnectionStats -> Int
164sentBytes = byteLength . outcomingFlow
165
166wastedBytes :: ConnectionStats -> Int
167wastedBytes (ConnectionStats _in out) = overhead _in + overhead out
168 245
169payloadBytes :: ConnectionStats -> Int 246-- | Aggregate one more message stats in the /specified/ direction.
170payloadBytes (ConnectionStats _in out) = payload _in + payload out 247addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats
248addStats ThisPeer x s = s { outcomingFlow = addFlowStats x (outcomingFlow s) }
249addStats RemotePeer x s = s { incomingFlow = addFlowStats x (incomingFlow s) }
171 250
172{----------------------------------------------------------------------- 251{-----------------------------------------------------------------------
173-- Connection 252-- Connection
174-----------------------------------------------------------------------} 253-----------------------------------------------------------------------}
175 254
255-- | Connection keep various info about both peers.
176data Connection = Connection 256data Connection = Connection
177 { connCaps :: !Caps 257 { -- | /Both/ peers handshaked with this protocol string. The only
258 -- value is \"Bittorrent Protocol\" but this can be changed in
259 -- future.
260 connProtocol :: !ProtocolName
261
262 -- | A set of enabled extensions. This value used to check if a
263 -- message is allowed to be sent or received.
264 , connCaps :: !Caps
265
266 -- | /Both/ peers handshaked with this infohash. A connection can
267 -- handle only one topic, use 'reconnect' to change the current
268 -- topic.
178 , connTopic :: !InfoHash 269 , connTopic :: !InfoHash
270
271 -- | Typically extracted from handshake.
179 , connRemotePeerId :: !PeerId 272 , connRemotePeerId :: !PeerId
273
274 -- | Typically extracted from handshake.
180 , connThisPeerId :: !PeerId 275 , connThisPeerId :: !PeerId
181 , connStats :: !(IORef ConnectionStats) 276
277 -- | If @not (allowed ExtExtended connCaps)@ then this set is
278 -- always empty. Otherwise it has extension protocol 'MessageId'
279 -- map.
182 , connExtCaps :: !(IORef ExtendedCaps) 280 , connExtCaps :: !(IORef ExtendedCaps)
281
282 -- | Various stats about messages sent and received. Stats can be
283 -- used to protect /this/ peer against flood attacks.
284 , connStats :: !(IORef ConnectionStats)
183 } 285 }
184 286
185instance Pretty Connection where 287instance Pretty Connection where
186 pretty Connection {..} = "Connection" 288 pretty Connection {..} = "Connection"
187 289
290-- TODO check extended messages too
188isAllowed :: Connection -> Message -> Bool 291isAllowed :: Connection -> Message -> Bool
189isAllowed Connection {..} msg 292isAllowed Connection {..} msg
190 | Just ext <- requires msg = ext `allowed` connCaps 293 | Just ext <- requires msg = ext `allowed` connCaps
@@ -233,11 +336,17 @@ connectToPeer p = do
233-- Wire 336-- Wire
234-----------------------------------------------------------------------} 337-----------------------------------------------------------------------}
235 338
236type Wire = ConduitM Message Message (ReaderT Connection IO) 339-- | do not expose this so we can change it without breaking api
340type Connectivity = ReaderT Connection
341
342-- | A duplex channel connected to a remote peer which keep tracks
343-- connection parameters.
344type Wire a = ConduitM Message Message (Connectivity IO) a
237 345
238protocolError :: ProtocolError -> Wire a 346protocolError :: ProtocolError -> Wire a
239protocolError = monadThrow . ProtocolError 347protocolError = monadThrow . ProtocolError
240 348
349-- | Forcefully terminate wire session and close socket.
241disconnectPeer :: Wire a 350disconnectPeer :: Wire a
242disconnectPeer = monadThrow DisconnectPeer 351disconnectPeer = monadThrow DisconnectPeer
243 352
@@ -256,22 +365,24 @@ modifyRef f m = do
256 ref <- lift (asks f) 365 ref <- lift (asks f)
257 liftIO (atomicModifyIORef' ref (\x -> (m x, ()))) 366 liftIO (atomicModifyIORef' ref (\x -> (m x, ())))
258 367
259getExtCaps :: Wire ExtendedCaps
260getExtCaps = readRef connExtCaps
261
262setExtCaps :: ExtendedCaps -> Wire () 368setExtCaps :: ExtendedCaps -> Wire ()
263setExtCaps = writeRef connExtCaps 369setExtCaps = writeRef connExtCaps
264 370
371-- | Get current extended capabilities. Note that this value can
372-- change in current session if either this or remote peer will
373-- initiate rehandshaking.
374getExtCaps :: Wire ExtendedCaps
375getExtCaps = readRef connExtCaps
376
377-- | Get current stats. Note that this value will change with the next
378-- sent or received message.
265getStats :: Wire ConnectionStats 379getStats :: Wire ConnectionStats
266getStats = readRef connStats 380getStats = readRef connStats
267 381
268askStats :: (ConnectionStats -> a) -> Wire a
269askStats f = f <$> getStats
270
271putStats :: ChannelSide -> Message -> Wire () 382putStats :: ChannelSide -> Message -> Wire ()
272putStats side msg = modifyRef connStats (addStats side (stats msg)) 383putStats side msg = modifyRef connStats (addStats side (stats msg))
273 384
274 385-- | See the 'Connection' section for more info.
275getConnection :: Wire Connection 386getConnection :: Wire Connection
276getConnection = lift ask 387getConnection = lift ask
277 388
@@ -284,7 +395,7 @@ validate side = await >>= maybe (return ()) yieldCheck
284 Nothing -> return () 395 Nothing -> return ()
285 Just ext 396 Just ext
286 | ext `allowed` caps -> yield msg 397 | ext `allowed` caps -> yield msg
287 | otherwise -> protocolError $ InvalidMessage side ext 398 | otherwise -> protocolError $ DisallowedMessage side ext
288 399
289validateBoth :: Wire () -> Wire () 400validateBoth :: Wire () -> Wire ()
290validateBoth action = do 401validateBoth action = do
@@ -299,6 +410,7 @@ trackStats = do
299 Nothing -> return () 410 Nothing -> return ()
300 Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer 411 Just msg -> putStats ThisPeer msg -- FIXME not really ThisPeer
301 412
413-- | Normally you should use 'connectWire' or 'acceptWire'.
302runWire :: Wire () -> Socket -> Connection -> IO () 414runWire :: Wire () -> Socket -> Connection -> IO ()
303runWire action sock = runReaderT $ 415runWire action sock = runReaderT $
304 sourceSocket sock $= 416 sourceSocket sock $=
@@ -307,16 +419,20 @@ runWire action sock = runReaderT $
307 S.conduitPut S.put $$ 419 S.conduitPut S.put $$
308 sinkSocket sock 420 sinkSocket sock
309 421
422-- | This function will block until a peer send new message. You can
423-- also use 'await'.
424recvMessage :: Wire Message
425recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
426
427-- | You can also use 'yield'.
310sendMessage :: PeerMessage msg => msg -> Wire () 428sendMessage :: PeerMessage msg => msg -> Wire ()
311sendMessage msg = do 429sendMessage msg = do
312 ecaps <- getExtCaps 430 ecaps <- getExtCaps
313 yield $ envelop ecaps msg 431 yield $ envelop ecaps msg
314 432
315recvMessage :: Wire Message
316recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
317
318extendedHandshake :: ExtendedCaps -> Wire () 433extendedHandshake :: ExtendedCaps -> Wire ()
319extendedHandshake caps = do 434extendedHandshake caps = do
435 -- TODO add other params to the handshake
320 sendMessage $ nullExtendedHandshake caps 436 sendMessage $ nullExtendedHandshake caps
321 msg <- recvMessage 437 msg <- recvMessage
322 case msg of 438 case msg of
@@ -324,6 +440,18 @@ extendedHandshake caps = do
324 setExtCaps $ ehsCaps <> caps 440 setExtCaps $ ehsCaps <> caps
325 _ -> protocolError HandshakeRefused 441 _ -> protocolError HandshakeRefused
326 442
443rehandshake :: ExtendedCaps -> Wire ()
444rehandshake caps = undefined
445
446reconnect :: Wire ()
447reconnect = undefined
448
449-- | Initiate 'Wire' connection and handshake with a peer. This
450-- function will also do extension protocol handshake if 'ExtExtended'
451-- is enabled on both sides.
452--
453-- This function can throw 'WireFailure' exception.
454--
327connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO () 455connectWire :: Handshake -> PeerAddr -> ExtendedCaps -> Wire () -> IO ()
328connectWire hs addr extCaps wire = 456connectWire hs addr extCaps wire =
329 bracket (connectToPeer addr) close $ \ sock -> do 457 bracket (connectToPeer addr) close $ \ sock -> do
@@ -332,6 +460,9 @@ connectWire hs addr extCaps wire =
332 unless (def == hsProtocol hs') $ do 460 unless (def == hsProtocol hs') $ do
333 throwIO $ ProtocolError $ InvalidProtocol (hsProtocol hs') 461 throwIO $ ProtocolError $ InvalidProtocol (hsProtocol hs')
334 462
463 unless (hsProtocol hs == hsProtocol hs') $ do
464 throwIO $ ProtocolError $ UnexpectedProtocol (hsProtocol hs')
465
335 unless (hsInfoHash hs == hsInfoHash hs') $ do 466 unless (hsInfoHash hs == hsInfoHash hs') $ do
336 throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs') 467 throwIO $ ProtocolError $ UnexpectedTopic (hsInfoHash hs')
337 468
@@ -346,7 +477,8 @@ connectWire hs addr extCaps wire =
346 extCapsRef <- newIORef def 477 extCapsRef <- newIORef def
347 statsRef <- newIORef def 478 statsRef <- newIORef def
348 runWire wire' sock $ Connection 479 runWire wire' sock $ Connection
349 { connCaps = caps 480 { connProtocol = hsProtocol hs
481 , connCaps = caps
350 , connTopic = hsInfoHash hs 482 , connTopic = hsInfoHash hs
351 , connRemotePeerId = hsPeerId hs' 483 , connRemotePeerId = hsPeerId hs'
352 , connThisPeerId = hsPeerId hs 484 , connThisPeerId = hsPeerId hs
@@ -354,5 +486,13 @@ connectWire hs addr extCaps wire =
354 , connStats = statsRef 486 , connStats = statsRef
355 } 487 }
356 488
357acceptWire :: Wire () -> Socket -> IO () 489-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
358acceptWire = undefined 490-- socket. For peer listener loop the 'acceptSafe' should be
491-- prefered against 'accept'. The socket will be close at exit.
492--
493-- This function can throw 'WireFailure' exception.
494--
495acceptWire :: Socket -> Wire () -> IO ()
496acceptWire sock wire = do
497 bracket (return sock) close $ \ _ -> do
498 error "acceptWire: not implemented"