summaryrefslogtreecommitdiff
path: root/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs')
-rw-r--r--dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs1012
1 files changed, 1012 insertions, 0 deletions
diff --git a/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
new file mode 100644
index 00000000..6804d0a2
--- /dev/null
+++ b/dht/bittorrent/src/Network/BitTorrent/Exchange/Connection.hs
@@ -0,0 +1,1012 @@
1-- |
2-- Module : Network.BitTorrent.Exchange.Wire
3-- Copyright : (c) Sam Truzjan 2013
4-- (c) Daniel Gröber 2013
5-- License : BSD3
6-- Maintainer : pxqr.sta@gmail.com
7-- Stability : experimental
8-- Portability : portable
9--
10-- Each peer wire connection is identified by triple @(topic,
11-- remote_addr, this_addr)@. This means that connections are the
12-- same if and only if their 'ConnectionId' are the same. Of course,
13-- you /must/ avoid duplicated connections.
14--
15-- This module control /integrity/ of data send and received.
16--
17{-# LANGUAGE DeriveDataTypeable #-}
18{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE MultiParamTypeClasses #-}
20{-# LANGUAGE GeneralizedNewtypeDeriving #-}
21module Network.BitTorrent.Exchange.Connection
22 ( -- * Wire
23 Connected
24 , Wire
25 , ChannelSide (..)
26
27 -- * Connection
28 , Connection
29 , connInitiatedBy
30
31 -- ** Identity
32 , connRemoteAddr
33 , connTopic
34 , connRemotePeerId
35 , connThisPeerId
36
37 -- ** Capabilities
38 , connProtocol
39 , connCaps
40 , connExtCaps
41 , connRemoteEhs
42
43 -- ** State
44 , connStatus
45 , connBitfield
46
47 -- ** Env
48 , connOptions
49 , connSession
50 , connStats
51
52 -- ** Status
53 , PeerStatus (..)
54 , ConnectionStatus (..)
55 , updateStatus
56 , statusUpdates
57 , clientStatus
58 , remoteStatus
59 , canUpload
60 , canDownload
61 , defaultUnchokeSlots
62 , defaultRechokeInterval
63
64
65 -- * Setup
66 , ConnectionPrefs (..)
67 , SessionLink (..)
68 , ConnectionConfig (..)
69
70 -- ** Initiate
71 , connectWire
72
73 -- ** Accept
74 , PendingConnection
75 , newPendingConnection
76 , pendingPeer
77 , pendingCaps
78 , pendingTopic
79 , closePending
80 , acceptWire
81
82 -- ** Post setup actions
83 , resizeBitfield
84
85 -- * Messaging
86 , recvMessage
87 , sendMessage
88 , filterQueue
89 , getMaxQueueLength
90
91 -- * Exceptions
92 , ProtocolError (..)
93 , WireFailure (..)
94 , peerPenalty
95 , isWireFailure
96 , disconnectPeer
97
98 -- * Stats
99 , ByteStats (..)
100 , FlowStats (..)
101 , ConnectionStats (..)
102
103 -- * Flood detection
104 , FloodDetector (..)
105
106 -- * Options
107 , Options (..)
108 ) where
109
110import Control.Applicative
111import Control.Concurrent hiding (yield)
112import Control.Exception
113import Control.Monad.Reader
114import Control.Monad.State
115import Control.Monad.Trans.Resource
116import Control.Lens
117import Data.ByteString as BS
118import Data.ByteString.Lazy as BSL
119import Data.Conduit as C
120import Data.Conduit.Cereal
121import Data.Conduit.List
122import Data.Conduit.Network
123import Data.Default
124import Data.IORef
125import Data.List as L
126import Data.Maybe as M
127import Data.Monoid
128import Data.Serialize as S
129import Data.Typeable
130import Network
131import Network.Socket hiding (Connected)
132import Network.Socket.ByteString as BS
133import Text.PrettyPrint as PP hiding ((<>))
134import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
135import Text.Show.Functions ()
136import System.Log.FastLogger (ToLogStr(..))
137import System.Timeout
138
139import Data.Torrent
140import Network.Address
141import Network.BitTorrent.Exchange.Bitfield as BF
142import Network.BitTorrent.Exchange.Message as Msg
143
144-- TODO handle port message?
145-- TODO handle limits?
146-- TODO filter not requested PIECE messages
147-- TODO metadata piece request flood protection
148-- TODO piece request flood protection
149-- TODO protect against flood attacks
150{-----------------------------------------------------------------------
151-- Exceptions
152-----------------------------------------------------------------------}
153
154-- | Used to specify initiator of 'ProtocolError'.
155data ChannelSide
156 = ThisPeer
157 | RemotePeer
158 deriving (Show, Eq, Enum, Bounded)
159
160instance Default ChannelSide where
161 def = ThisPeer
162
163instance Pretty ChannelSide where
164 pPrint = PP.text . show
165
166-- | A protocol errors occur when a peer violates protocol
167-- specification.
168data ProtocolError
169 -- | Protocol string should be 'BitTorrent Protocol' but remote
170 -- peer have sent a different string.
171 = InvalidProtocol ProtocolName
172
173 -- | Sent and received protocol strings do not match. Can occur
174 -- in 'connectWire' only.
175 | UnexpectedProtocol ProtocolName
176
177 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
178 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
179 -- 'connectWire' or 'acceptWire' only.
180 | UnexpectedTopic InfoHash
181
182 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
183 -- remote peer handshaked with different 'hsPeerId' then this
184 -- exception is raised. Can occur in 'connectWire' only.
185 | UnexpectedPeerId PeerId
186
187 -- | Accepted peer have sent unknown torrent infohash in
188 -- 'hsInfoHash' field. This situation usually happen when /this/
189 -- peer have deleted the requested torrent. The error can occur in
190 -- 'acceptWire' function only.
191 | UnknownTopic InfoHash
192
193 -- | A remote peer have 'ExtExtended' enabled but did not send an
194 -- 'ExtendedHandshake' back.
195 | HandshakeRefused
196
197 -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST
198 -- be send either once or zero times, but either this peer or
199 -- remote peer send a bitfield message the second time.
200 | BitfieldAlreadySent ChannelSide
201
202 -- | Capabilities violation. For example this exception can occur
203 -- when a peer have sent 'Port' message but 'ExtDHT' is not
204 -- allowed in 'connCaps'.
205 | DisallowedMessage
206 { -- | Who sent invalid message.
207 violentSender :: ChannelSide
208
209 -- | If the 'violentSender' reconnect with this extension
210 -- enabled then he can try to send this message.
211 , extensionRequired :: Extension
212 }
213 deriving Show
214
215instance Pretty ProtocolError where
216 pPrint = PP.text . show
217
218errorPenalty :: ProtocolError -> Int
219errorPenalty (InvalidProtocol _) = 1
220errorPenalty (UnexpectedProtocol _) = 1
221errorPenalty (UnexpectedTopic _) = 1
222errorPenalty (UnexpectedPeerId _) = 1
223errorPenalty (UnknownTopic _) = 0
224errorPenalty (HandshakeRefused ) = 1
225errorPenalty (BitfieldAlreadySent _) = 1
226errorPenalty (DisallowedMessage _ _) = 1
227
228-- | Exceptions used to interrupt the current P2P session.
229data WireFailure
230 = ConnectionRefused IOError
231
232 -- | Force termination of wire connection.
233 --
234 -- Normally you should throw only this exception from event loop
235 -- using 'disconnectPeer', other exceptions are thrown
236 -- automatically by functions from this module.
237 --
238 | DisconnectPeer
239
240 -- | A peer not responding and did not send a 'KeepAlive' message
241 -- for a specified period of time.
242 | PeerDisconnected
243
244 -- | A remote peer have sent some unknown message we unable to
245 -- parse.
246 | DecodingError GetException
247
248 -- | See 'ProtocolError' for more details.
249 | ProtocolError ProtocolError
250
251 -- | A possible malicious peer have sent too many control messages
252 -- without making any progress.
253 | FloodDetected ConnectionStats
254 deriving (Show, Typeable)
255
256instance Exception WireFailure
257
258instance Pretty WireFailure where
259 pPrint = PP.text . show
260
261-- TODO
262-- data Penalty = Ban | Penalty Int
263
264peerPenalty :: WireFailure -> Int
265peerPenalty DisconnectPeer = 0
266peerPenalty PeerDisconnected = 0
267peerPenalty (DecodingError _) = 1
268peerPenalty (ProtocolError e) = errorPenalty e
269peerPenalty (FloodDetected _) = 1
270
271-- | Do nothing with exception, used with 'handle' or 'try'.
272isWireFailure :: Monad m => WireFailure -> m ()
273isWireFailure _ = return ()
274
275protocolError :: MonadThrow m => ProtocolError -> m a
276protocolError = monadThrow . ProtocolError
277
278{-----------------------------------------------------------------------
279-- Stats
280-----------------------------------------------------------------------}
281
282-- | Message stats in one direction.
283data FlowStats = FlowStats
284 { -- | Number of the messages sent or received.
285 messageCount :: {-# UNPACK #-} !Int
286 -- | Sum of byte sequences of all messages.
287 , messageBytes :: {-# UNPACK #-} !ByteStats
288 } deriving Show
289
290instance Pretty FlowStats where
291 pPrint FlowStats {..} =
292 PP.int messageCount <+> "messages" $+$
293 pPrint messageBytes
294
295-- | Zeroed stats.
296instance Default FlowStats where
297 def = FlowStats 0 def
298
299-- | Monoid under addition.
300instance Monoid FlowStats where
301 mempty = def
302 mappend a b = FlowStats
303 { messageBytes = messageBytes a <> messageBytes b
304 , messageCount = messageCount a + messageCount b
305 }
306
307-- | Find average length of byte sequences per message.
308avgByteStats :: FlowStats -> ByteStats
309avgByteStats (FlowStats n ByteStats {..}) = ByteStats
310 { overhead = overhead `quot` n
311 , control = control `quot` n
312 , payload = payload `quot` n
313 }
314
315-- | Message stats in both directions. This data can be retrieved
316-- using 'getStats' function.
317--
318-- Note that this stats is completely different from
319-- 'Data.Torrent.Progress.Progress': payload bytes not necessary
320-- equal to downloaded\/uploaded bytes since a peer can send a
321-- broken block.
322--
323data ConnectionStats = ConnectionStats
324 { -- | Received messages stats.
325 incomingFlow :: !FlowStats
326 -- | Sent messages stats.
327 , outcomingFlow :: !FlowStats
328 } deriving Show
329
330instance Pretty ConnectionStats where
331 pPrint ConnectionStats {..} = vcat
332 [ "Recv:" <+> pPrint incomingFlow
333 , "Sent:" <+> pPrint outcomingFlow
334 , "Both:" <+> pPrint (incomingFlow <> outcomingFlow)
335 ]
336
337-- | Zeroed stats.
338instance Default ConnectionStats where
339 def = ConnectionStats def def
340
341-- | Monoid under addition.
342instance Monoid ConnectionStats where
343 mempty = def
344 mappend a b = ConnectionStats
345 { incomingFlow = incomingFlow a <> incomingFlow b
346 , outcomingFlow = outcomingFlow a <> outcomingFlow b
347 }
348
349-- | Aggregate one more message stats in the /specified/ direction.
350addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats
351addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) }
352addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) }
353
354-- | Sum of overhead and control bytes in both directions.
355wastedBytes :: ConnectionStats -> Int
356wastedBytes ConnectionStats {..} = overhead + control
357 where
358 FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow
359
360-- | Sum of payload bytes in both directions.
361payloadBytes :: ConnectionStats -> Int
362payloadBytes ConnectionStats {..} =
363 payload (messageBytes (incomingFlow <> outcomingFlow))
364
365-- | Sum of any bytes in both directions.
366transmittedBytes :: ConnectionStats -> Int
367transmittedBytes ConnectionStats {..} =
368 byteLength (messageBytes (incomingFlow <> outcomingFlow))
369
370{-----------------------------------------------------------------------
371-- Flood protection
372-----------------------------------------------------------------------}
373
374defaultFloodFactor :: Int
375defaultFloodFactor = 1
376
377-- | This is a very permissive value, connection setup usually takes
378-- around 10-100KB, including both directions.
379defaultFloodThreshold :: Int
380defaultFloodThreshold = 2 * 1024 * 1024
381
382-- | A flood detection function.
383type Detector stats = Int -- ^ Factor;
384 -> Int -- ^ Threshold;
385 -> stats -- ^ Stats to analyse;
386 -> Bool -- ^ Is this a flooded connection?
387
388defaultDetector :: Detector ConnectionStats
389defaultDetector factor threshold s =
390 transmittedBytes s > threshold &&
391 factor * wastedBytes s > payloadBytes s
392
393-- | Flood detection is used to protect /this/ peer against a /remote/
394-- malicious peer sending meaningless control messages.
395data FloodDetector = FloodDetector
396 { -- | Max ratio of payload bytes to control bytes.
397 floodFactor :: {-# UNPACK #-} !Int
398
399 -- | Max count of bytes connection /setup/ can take including
400 -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port'
401 -- messages. This value is used to avoid false positives at the
402 -- connection initialization.
403 , floodThreshold :: {-# UNPACK #-} !Int
404
405 -- | Flood predicate on the /current/ 'ConnectionStats'.
406 , floodPredicate :: Detector ConnectionStats
407 } deriving Show
408
409instance Eq FloodDetector where
410 a == b = floodFactor a == floodFactor b
411 && floodThreshold a == floodThreshold b
412
413-- | Flood detector with very permissive options.
414instance Default FloodDetector where
415 def = FloodDetector
416 { floodFactor = defaultFloodFactor
417 , floodThreshold = defaultFloodThreshold
418 , floodPredicate = defaultDetector
419 }
420
421-- | This peer might drop connection if the detector gives positive answer.
422runDetector :: FloodDetector -> ConnectionStats -> Bool
423runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold
424
425{-----------------------------------------------------------------------
426-- Options
427-----------------------------------------------------------------------}
428
429-- | Various connection settings and limits.
430data Options = Options
431 { -- | How often /this/ peer should send 'KeepAlive' messages.
432 keepaliveInterval :: {-# UNPACK #-} !Int
433
434 -- | /This/ peer will drop connection if a /remote/ peer did not
435 -- send any message for this period of time.
436 , keepaliveTimeout :: {-# UNPACK #-} !Int
437
438 , requestQueueLength :: {-# UNPACK #-} !Int
439
440 -- | Used to protect against flood attacks.
441 , floodDetector :: FloodDetector
442
443 -- | Used to protect against flood attacks in /metadata
444 -- exchange/. Normally, a requesting peer should request each
445 -- 'InfoDict' piece only one time, but a malicious peer can
446 -- saturate wire with 'MetadataRequest' messages thus flooding
447 -- responding peer.
448 --
449 -- This value set upper bound for number of 'MetadataRequests'
450 -- for each piece.
451 --
452 , metadataFactor :: {-# UNPACK #-} !Int
453
454 -- | Used to protect against out-of-memory attacks: malicious peer
455 -- can claim that 'totalSize' is, say, 100TB and send some random
456 -- data instead of infodict pieces. Since requesting peer unable
457 -- to check not completed infodict via the infohash, the
458 -- accumulated pieces will allocate the all available memory.
459 --
460 -- This limit set upper bound for 'InfoDict' size. See
461 -- 'ExtendedMetadata' for more info.
462 --
463 , maxInfoDictSize :: {-# UNPACK #-} !Int
464 } deriving (Show, Eq)
465
466-- | Permissive default parameters, most likely you don't need to
467-- change them.
468instance Default Options where
469 def = Options
470 { keepaliveInterval = defaultKeepAliveInterval
471 , keepaliveTimeout = defaultKeepAliveTimeout
472 , requestQueueLength = defaultRequestQueueLength
473 , floodDetector = def
474 , metadataFactor = defaultMetadataFactor
475 , maxInfoDictSize = defaultMaxInfoDictSize
476 }
477
478{-----------------------------------------------------------------------
479-- Peer status
480-----------------------------------------------------------------------}
481
482-- | Connections contain two bits of state on either end: choked or
483-- not, and interested or not.
484data PeerStatus = PeerStatus
485 { -- | Choking is a notification that no data will be sent until
486 -- unchoking happens.
487 _choking :: !Bool
488
489 -- |
490 , _interested :: !Bool
491 } deriving (Show, Eq, Ord)
492
493$(makeLenses ''PeerStatus)
494
495instance Pretty PeerStatus where
496 pPrint PeerStatus {..} =
497 pPrint (Choking _choking) <+> "and" <+> pPrint (Interested _interested)
498
499-- | Connections start out choked and not interested.
500instance Default PeerStatus where
501 def = PeerStatus True False
502
503instance Monoid PeerStatus where
504 mempty = def
505 mappend a b = PeerStatus
506 { _choking = _choking a && _choking b
507 , _interested = _interested a || _interested b
508 }
509
510-- | Can be used to update remote peer status using incoming 'Status'
511-- message.
512updateStatus :: StatusUpdate -> PeerStatus -> PeerStatus
513updateStatus (Choking b) = choking .~ b
514updateStatus (Interested b) = interested .~ b
515
516-- | Can be used to generate outcoming messages.
517statusUpdates :: PeerStatus -> PeerStatus -> [StatusUpdate]
518statusUpdates a b = M.catMaybes $
519 [ if _choking a == _choking b then Nothing
520 else Just $ Choking $ _choking b
521 , if _interested a == _interested b then Nothing
522 else Just $ Interested $ _interested b
523 ]
524
525{-----------------------------------------------------------------------
526-- Connection status
527-----------------------------------------------------------------------}
528
529-- | Status of the both endpoints.
530data ConnectionStatus = ConnectionStatus
531 { _clientStatus :: !PeerStatus
532 , _remoteStatus :: !PeerStatus
533 } deriving (Show, Eq)
534
535$(makeLenses ''ConnectionStatus)
536
537instance Pretty ConnectionStatus where
538 pPrint ConnectionStatus {..} =
539 "this " PP.<+> pPrint _clientStatus PP.$$
540 "remote" PP.<+> pPrint _remoteStatus
541
542-- | Connections start out choked and not interested.
543instance Default ConnectionStatus where
544 def = ConnectionStatus def def
545
546-- | Can the client transfer to the remote peer?
547canUpload :: ConnectionStatus -> Bool
548canUpload ConnectionStatus {..}
549 = _interested _remoteStatus && not (_choking _clientStatus)
550
551-- | Can the client transfer from the remote peer?
552canDownload :: ConnectionStatus -> Bool
553canDownload ConnectionStatus {..}
554 = _interested _clientStatus && not (_choking _remoteStatus)
555
556-- | Indicates how many peers are allowed to download from the client
557-- by default.
558defaultUnchokeSlots :: Int
559defaultUnchokeSlots = 4
560
561-- |
562defaultRechokeInterval :: Int
563defaultRechokeInterval = 10 * 1000 * 1000
564
565{-----------------------------------------------------------------------
566-- Connection
567-----------------------------------------------------------------------}
568
569data ConnectionState = ConnectionState {
570 -- | If @not (allowed ExtExtended connCaps)@ then this set is always
571 -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of
572 -- 'MessageId' to the message type for the remote peer.
573 --
574 -- Note that this value can change in current session if either
575 -- this or remote peer will initiate rehandshaking.
576 --
577 _connExtCaps :: !ExtendedCaps
578
579 -- | Current extended handshake information from the remote peer
580 , _connRemoteEhs :: !ExtendedHandshake
581
582 -- | Various stats about messages sent and received. Stats can be
583 -- used to protect /this/ peer against flood attacks.
584 --
585 -- Note that this value will change with the next sent or received
586 -- message.
587 , _connStats :: !ConnectionStats
588
589 , _connStatus :: !ConnectionStatus
590
591 -- | Bitfield of remote endpoint.
592 , _connBitfield :: !Bitfield
593 }
594
595makeLenses ''ConnectionState
596
597instance Default ConnectionState where
598 def = ConnectionState
599 { _connExtCaps = def
600 , _connRemoteEhs = def
601 , _connStats = def
602 , _connStatus = def
603 , _connBitfield = BF.haveNone 0
604 }
605
606-- | Connection keep various info about both peers.
607data Connection s = Connection
608 { connInitiatedBy :: !ChannelSide
609
610 , connRemoteAddr :: !(PeerAddr IP)
611
612 -- | /Both/ peers handshaked with this protocol string. The only
613 -- value is \"Bittorrent Protocol\" but this can be changed in
614 -- future.
615 , connProtocol :: !ProtocolName
616
617 -- | Set of enabled core extensions, i.e. the pre BEP10 extension
618 -- mechanism. This value is used to check if a message is allowed
619 -- to be sent or received.
620 , connCaps :: !Caps
621
622 -- | /Both/ peers handshaked with this infohash. A connection can
623 -- handle only one topic, use 'reconnect' to change the current
624 -- topic.
625 , connTopic :: !InfoHash
626
627 -- | Typically extracted from handshake.
628 , connRemotePeerId :: !PeerId
629
630 -- | Typically extracted from handshake.
631 , connThisPeerId :: !PeerId
632
633 -- |
634 , connOptions :: !Options
635
636 -- | Mutable connection state, see 'ConnectionState'
637 , connState :: !(IORef ConnectionState)
638
639-- -- | Max request queue length.
640-- , connMaxQueueLen :: !Int
641
642 -- | Environment data.
643 , connSession :: !s
644
645 , connChan :: !(Chan Message)
646 }
647
648instance Pretty (Connection s) where
649 pPrint Connection {..} = "Connection"
650
651instance ToLogStr (Connection s) where
652 toLogStr Connection {..} = mconcat
653 [ toLogStr (show connRemoteAddr)
654 , toLogStr (show connProtocol)
655 , toLogStr (show connCaps)
656 , toLogStr (show connTopic)
657 , toLogStr (show connRemotePeerId)
658 , toLogStr (show connThisPeerId)
659 , toLogStr (show connOptions)
660 ]
661
662-- TODO check extended messages too
663isAllowed :: Connection s -> Message -> Bool
664isAllowed Connection {..} msg
665 | Just ext <- requires msg = ext `allowed` connCaps
666 | otherwise = True
667
668{-----------------------------------------------------------------------
669-- Hanshaking
670-----------------------------------------------------------------------}
671
672sendHandshake :: Socket -> Handshake -> IO ()
673sendHandshake sock hs = sendAll sock (S.encode hs)
674
675recvHandshake :: Socket -> IO Handshake
676recvHandshake sock = do
677 header <- BS.recv sock 1
678 unless (BS.length header == 1) $
679 throw $ userError "Unable to receive handshake header."
680
681 let protocolLen = BS.head header
682 let restLen = handshakeSize protocolLen - 1
683
684 body <- BS.recv sock restLen
685 let resp = BS.cons protocolLen body
686 either (throwIO . userError) return $ S.decode resp
687
688-- | Handshaking with a peer specified by the second argument.
689--
690-- It's important to send handshake first because /accepting/ peer
691-- do not know handshake topic and will wait until /connecting/ peer
692-- will send handshake.
693--
694initiateHandshake :: Socket -> Handshake -> IO Handshake
695initiateHandshake sock hs = do
696 sendHandshake sock hs
697 recvHandshake sock
698
699data HandshakePair = HandshakePair
700 { handshakeSent :: !Handshake
701 , handshakeRecv :: !Handshake
702 } deriving (Show, Eq)
703
704validatePair :: HandshakePair -> PeerAddr IP -> IO ()
705validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp
706 [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs')
707 , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs')
708 , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs')
709 , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr)
710 , UnexpectedPeerId $ hsPeerId hs')
711 ]
712 where
713 checkProp (t, e) = unless t $ throwIO $ ProtocolError e
714
715-- | Connection state /right/ after handshaking.
716establishedStats :: HandshakePair -> ConnectionStats
717establishedStats HandshakePair {..} = ConnectionStats
718 { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent
719 , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv
720 }
721
722{-----------------------------------------------------------------------
723-- Wire
724-----------------------------------------------------------------------}
725
726-- | do not expose this so we can change it without breaking api
727newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) }
728 deriving (Functor, Applicative, Monad
729 , MonadIO, MonadReader (Connection s), MonadThrow
730 )
731
732instance MonadState ConnectionState (Connected s) where
733 get = Connected (asks connState) >>= liftIO . readIORef
734 put x = Connected (asks connState) >>= liftIO . flip writeIORef x
735
736-- | A duplex channel connected to a remote peer which keep tracks
737-- connection parameters.
738type Wire s a = ConduitM Message Message (Connected s) a
739
740{-----------------------------------------------------------------------
741-- Wrapper
742-----------------------------------------------------------------------}
743
744putStats :: ChannelSide -> Message -> Connected s ()
745putStats side msg = connStats %= addStats side (stats msg)
746
747validate :: ChannelSide -> Message -> Connected s ()
748validate side msg = do
749 caps <- asks connCaps
750 case requires msg of
751 Nothing -> return ()
752 Just ext
753 | ext `allowed` caps -> return ()
754 | otherwise -> protocolError $ DisallowedMessage side ext
755
756trackFlow :: ChannelSide -> Wire s ()
757trackFlow side = iterM $ do
758 validate side
759 putStats side
760
761{-----------------------------------------------------------------------
762-- Setup
763-----------------------------------------------------------------------}
764
765-- System.Timeout.timeout multiplier
766seconds :: Int
767seconds = 1000000
768
769sinkChan :: MonadIO m => Chan Message -> Sink Message m ()
770sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan)
771
772sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message
773sourceChan interval chan = do
774 mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan
775 yield $ fromMaybe Msg.KeepAlive mmsg
776
777-- | Normally you should use 'connectWire' or 'acceptWire'.
778runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO ()
779runWire action sock chan conn = flip runReaderT conn $ runConnected $
780 sourceSocket sock $=
781 conduitGet S.get $=
782 trackFlow RemotePeer $=
783 action $=
784 trackFlow ThisPeer C.$$
785 sinkChan chan
786
787-- | This function will block until a peer send new message. You can
788-- also use 'await'.
789recvMessage :: Wire s Message
790recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
791
792-- | You can also use 'yield'.
793sendMessage :: PeerMessage msg => msg -> Wire s ()
794sendMessage msg = do
795 ecaps <- use connExtCaps
796 yield $ envelop ecaps msg
797
798getMaxQueueLength :: Connected s Int
799getMaxQueueLength = do
800 advertisedLen <- ehsQueueLength <$> use connRemoteEhs
801 defaultLen <- asks (requestQueueLength . connOptions)
802 return $ fromMaybe defaultLen advertisedLen
803
804-- | Filter pending messages from send buffer.
805filterQueue :: (Message -> Bool) -> Wire s ()
806filterQueue p = lift $ do
807 chan <- asks connChan
808 liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p
809
810-- | Forcefully terminate wire session and close socket.
811disconnectPeer :: Wire s a
812disconnectPeer = monadThrow DisconnectPeer
813
814extendedHandshake :: ExtendedCaps -> Wire s ()
815extendedHandshake caps = do
816 -- TODO add other params to the handshake
817 sendMessage $ nullExtendedHandshake caps
818 msg <- recvMessage
819 case msg of
820 Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do
821 connExtCaps .= (ehsCaps <> caps)
822 connRemoteEhs .= remoteEhs
823 _ -> protocolError HandshakeRefused
824
825rehandshake :: ExtendedCaps -> Wire s ()
826rehandshake caps = error "rehandshake"
827
828reconnect :: Wire s ()
829reconnect = error "reconnect"
830
831data ConnectionId = ConnectionId
832 { topic :: !InfoHash
833 , remoteAddr :: !(PeerAddr IP)
834 , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node.
835 }
836
837-- | /Preffered/ settings of wire. To get the real use 'ask'.
838data ConnectionPrefs = ConnectionPrefs
839 { prefOptions :: !Options
840 , prefProtocol :: !ProtocolName
841 , prefCaps :: !Caps
842 , prefExtCaps :: !ExtendedCaps
843 } deriving (Show, Eq)
844
845instance Default ConnectionPrefs where
846 def = ConnectionPrefs
847 { prefOptions = def
848 , prefProtocol = def
849 , prefCaps = def
850 , prefExtCaps = def
851 }
852
853normalize :: ConnectionPrefs -> ConnectionPrefs
854normalize = error "normalize"
855
856-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'.
857data SessionLink s = SessionLink
858 { linkTopic :: !(InfoHash)
859 , linkPeerId :: !(PeerId)
860 , linkMetadataSize :: !(Maybe Int)
861 , linkOutputChan :: !(Maybe (Chan Message))
862 , linkSession :: !(s)
863 }
864
865data ConnectionConfig s = ConnectionConfig
866 { cfgPrefs :: !(ConnectionPrefs)
867 , cfgSession :: !(SessionLink s)
868 , cfgWire :: !(Wire s ())
869 }
870
871configHandshake :: ConnectionConfig s -> Handshake
872configHandshake ConnectionConfig {..} = Handshake
873 { hsProtocol = prefProtocol cfgPrefs
874 , hsReserved = prefCaps cfgPrefs
875 , hsInfoHash = linkTopic cfgSession
876 , hsPeerId = linkPeerId cfgSession
877 }
878
879{-----------------------------------------------------------------------
880-- Pending connections
881-----------------------------------------------------------------------}
882
883-- | Connection in half opened state. A normal usage scenario:
884--
885-- * Opened using 'newPendingConnection', usually in the listener
886-- loop;
887--
888-- * Closed using 'closePending' if 'pendingPeer' is banned,
889-- 'pendingCaps' is prohibited or pendingTopic is unknown;
890--
891-- * Accepted using 'acceptWire' otherwise.
892--
893data PendingConnection = PendingConnection
894 { pendingSock :: Socket
895 , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty;
896 , pendingCaps :: Caps -- ^ advertised by the peer;
897 , pendingTopic :: InfoHash -- ^ possible non-existent topic.
898 }
899
900-- | Reconstruct handshake sent by the remote peer.
901pendingHandshake :: PendingConnection -> Handshake
902pendingHandshake PendingConnection {..} = Handshake
903 { hsProtocol = def
904 , hsReserved = pendingCaps
905 , hsInfoHash = pendingTopic
906 , hsPeerId = fromMaybe (error "pendingHandshake: impossible")
907 (peerId pendingPeer)
908 }
909
910-- |
911--
912-- This function can throw 'WireFailure' exception.
913--
914newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection
915newPendingConnection sock addr = do
916 Handshake {..} <- recvHandshake sock
917 unless (hsProtocol == def) $ do
918 throwIO $ ProtocolError $ InvalidProtocol hsProtocol
919 return PendingConnection
920 { pendingSock = sock
921 , pendingPeer = addr { peerId = Just hsPeerId }
922 , pendingCaps = hsReserved
923 , pendingTopic = hsInfoHash
924 }
925
926-- | Release all resources associated with the given connection. Note
927-- that you /must not/ 'closePending' if you 'acceptWire'.
928closePending :: PendingConnection -> IO ()
929closePending PendingConnection {..} = do
930 close pendingSock
931
932{-----------------------------------------------------------------------
933-- Connection setup
934-----------------------------------------------------------------------}
935
936chanToSock :: Int -> Chan Message -> Socket -> IO ()
937chanToSock ka chan sock =
938 sourceChan ka chan $= conduitPut S.put C.$$ sinkSocket sock
939
940afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair
941 -> ConnectionConfig s -> IO ()
942afterHandshaking initiator addr sock
943 hpair @ (HandshakePair hs hs')
944 (ConnectionConfig
945 { cfgPrefs = ConnectionPrefs {..}
946 , cfgSession = SessionLink {..}
947 , cfgWire = wire
948 }) = do
949 let caps = hsReserved hs <> hsReserved hs'
950 cstate <- newIORef def { _connStats = establishedStats hpair }
951 chan <- maybe newChan return linkOutputChan
952 let conn = Connection {
953 connInitiatedBy = initiator
954 , connRemoteAddr = addr
955 , connProtocol = hsProtocol hs
956 , connCaps = caps
957 , connTopic = hsInfoHash hs
958 , connRemotePeerId = hsPeerId hs'
959 , connThisPeerId = hsPeerId hs
960 , connOptions = def
961 , connState = cstate
962 , connSession = linkSession
963 , connChan = chan
964 }
965
966 -- TODO make KA interval configurable
967 let kaInterval = defaultKeepAliveInterval
968 wire' = if ExtExtended `allowed` caps
969 then extendedHandshake prefExtCaps >> wire
970 else wire
971
972 bracket (forkIO (chanToSock kaInterval chan sock))
973 (killThread)
974 (\ _ -> runWire wire' sock chan conn)
975
976-- | Initiate 'Wire' connection and handshake with a peer. This function will
977-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on
978-- both sides.
979--
980-- This function can throw 'WireFailure' exception.
981--
982connectWire :: PeerAddr IP -> ConnectionConfig s -> IO ()
983connectWire addr cfg = do
984 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
985 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
986 let hs = configHandshake cfg
987 hs' <- initiateHandshake sock hs
988 let hpair = HandshakePair hs hs'
989 validatePair hpair addr
990 afterHandshaking ThisPeer addr sock hpair cfg
991
992-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
993-- socket. For peer listener loop the 'acceptSafe' should be
994-- prefered against 'accept'. The socket will be closed at exit.
995--
996-- This function can throw 'WireFailure' exception.
997--
998acceptWire :: PendingConnection -> ConnectionConfig s -> IO ()
999acceptWire pc @ PendingConnection {..} cfg = do
1000 bracket (return pendingSock) close $ \ _ -> do
1001 unless (linkTopic (cfgSession cfg) == pendingTopic) $ do
1002 throwIO (ProtocolError (UnexpectedTopic pendingTopic))
1003
1004 let hs = configHandshake cfg
1005 sendHandshake pendingSock hs
1006 let hpair = HandshakePair hs (pendingHandshake pc)
1007
1008 afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg
1009
1010-- | Used when size of bitfield becomes known.
1011resizeBitfield :: Int -> Connected s ()
1012resizeBitfield n = connBitfield %= adjustSize n