summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Connection.hs
diff options
context:
space:
mode:
authorSam Truzjan <pxqr.sta@gmail.com>2014-03-03 03:09:34 +0400
committerSam Truzjan <pxqr.sta@gmail.com>2014-03-03 03:09:34 +0400
commit863f7a236b86f562309748273f3087035a999ee7 (patch)
treeb0ccfad4cc73ec0033674094a12e53203b05c0c9 /src/Network/BitTorrent/Exchange/Connection.hs
parent9bd27cd068591446ff9026420654de14d58e0841 (diff)
Rename Wire.hs to Connection.hs
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Connection.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Connection.hs912
1 files changed, 912 insertions, 0 deletions
diff --git a/src/Network/BitTorrent/Exchange/Connection.hs b/src/Network/BitTorrent/Exchange/Connection.hs
new file mode 100644
index 00000000..b23eb08b
--- /dev/null
+++ b/src/Network/BitTorrent/Exchange/Connection.hs
@@ -0,0 +1,912 @@
1-- |
2-- Module : Network.BitTorrent.Exchange.Wire
3-- Copyright : (c) Sam Truzjan 2013
4-- (c) Daniel Gröber 2013
5-- License : BSD3
6-- Maintainer : pxqr.sta@gmail.com
7-- Stability : experimental
8-- Portability : portable
9--
10-- Each peer wire connection is identified by triple @(topic,
11-- remote_addr, this_addr)@. This means that connections are the
12-- same if and only if their 'ConnectionId' are the same. Of course,
13-- you /must/ avoid duplicated connections.
14--
15-- This module control /integrity/ of data send and received.
16--
17{-# LANGUAGE DeriveDataTypeable #-}
18{-# LANGUAGE TemplateHaskell #-}
19{-# LANGUAGE MultiParamTypeClasses #-}
20{-# LANGUAGE GeneralizedNewtypeDeriving #-}
21module Network.BitTorrent.Exchange.Connection
22 ( -- * Wire
23 Connected
24 , Wire
25 , ChannelSide (..)
26
27 -- * Connection
28 , Connection
29 , connInitiatedBy
30
31 -- ** Identity
32 , connRemoteAddr
33 , connTopic
34 , connRemotePeerId
35 , connThisPeerId
36
37 -- ** Capabilities
38 , connProtocol
39 , connCaps
40 , connExtCaps
41 , connRemoteEhs
42
43 -- ** State
44 , connStatus
45 , connBitfield
46
47 -- ** Env
48 , connOptions
49 , connSession
50 , connStats
51
52 -- * Setup
53 , ConnectionPrefs (..)
54 , SessionLink (..)
55 , ConnectionConfig (..)
56
57 -- ** Initiate
58 , connectWire
59
60 -- ** Accept
61 , PendingConnection
62 , newPendingConnection
63 , pendingPeer
64 , pendingCaps
65 , pendingTopic
66 , closePending
67 , acceptWire
68
69 -- ** Post setup actions
70 , resizeBitfield
71
72 -- * Messaging
73 , recvMessage
74 , sendMessage
75 , filterQueue
76 , getMaxQueueLength
77
78 -- * Exceptions
79 , ProtocolError (..)
80 , WireFailure (..)
81 , peerPenalty
82 , isWireFailure
83 , disconnectPeer
84
85 -- * Stats
86 , ByteStats (..)
87 , FlowStats (..)
88 , ConnectionStats (..)
89
90 -- * Flood detection
91 , FloodDetector (..)
92
93 -- * Options
94 , Options (..)
95 ) where
96
97import Control.Applicative
98import Control.Concurrent hiding (yield)
99import Control.Exception
100import Control.Monad.Reader
101import Control.Monad.State
102import Control.Lens
103import Data.ByteString as BS
104import Data.ByteString.Lazy as BSL
105import Data.Conduit
106import Data.Conduit.Cereal
107import Data.Conduit.List
108import Data.Conduit.Network
109import Data.Default
110import Data.IORef
111import Data.List as L
112import Data.Maybe
113import Data.Monoid
114import Data.Serialize as S
115import Data.Typeable
116import Network
117import Network.Socket hiding (Connected)
118import Network.Socket.ByteString as BS
119import Text.PrettyPrint as PP hiding (($$), (<>))
120import Text.PrettyPrint.Class
121import Text.Show.Functions ()
122import System.Log.FastLogger (ToLogStr(..))
123import System.Timeout
124
125import Data.Torrent.Bitfield as BF
126import Data.Torrent.InfoHash
127import Network.BitTorrent.Core
128import Network.BitTorrent.Exchange.Connection.Status
129import Network.BitTorrent.Exchange.Message as Msg
130
131-- TODO handle port message?
132-- TODO handle limits?
133-- TODO filter not requested PIECE messages
134-- TODO metadata piece request flood protection
135-- TODO piece request flood protection
136-- TODO protect against flood attacks
137{-----------------------------------------------------------------------
138-- Exceptions
139-----------------------------------------------------------------------}
140
141-- | Used to specify initiator of 'ProtocolError'.
142data ChannelSide
143 = ThisPeer
144 | RemotePeer
145 deriving (Show, Eq, Enum, Bounded)
146
147instance Default ChannelSide where
148 def = ThisPeer
149
150instance Pretty ChannelSide where
151 pretty = PP.text . show
152
153-- | A protocol errors occur when a peer violates protocol
154-- specification.
155data ProtocolError
156 -- | Protocol string should be 'BitTorrent Protocol' but remote
157 -- peer have sent a different string.
158 = InvalidProtocol ProtocolName
159
160 -- | Sent and received protocol strings do not match. Can occur
161 -- in 'connectWire' only.
162 | UnexpectedProtocol ProtocolName
163
164 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
165 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
166 -- 'connectWire' or 'acceptWire' only.
167 | UnexpectedTopic InfoHash
168
169 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
170 -- remote peer handshaked with different 'hsPeerId' then this
171 -- exception is raised. Can occur in 'connectWire' only.
172 | UnexpectedPeerId PeerId
173
174 -- | Accepted peer have sent unknown torrent infohash in
175 -- 'hsInfoHash' field. This situation usually happen when /this/
176 -- peer have deleted the requested torrent. The error can occur in
177 -- 'acceptWire' function only.
178 | UnknownTopic InfoHash
179
180 -- | A remote peer have 'ExtExtended' enabled but did not send an
181 -- 'ExtendedHandshake' back.
182 | HandshakeRefused
183
184 -- | 'Network.BitTorrent.Exchange.Message.Bitfield' message MUST
185 -- be send either once or zero times, but either this peer or
186 -- remote peer send a bitfield message the second time.
187 | BitfieldAlreadySent ChannelSide
188
189 -- | Capabilities violation. For example this exception can occur
190 -- when a peer have sent 'Port' message but 'ExtDHT' is not
191 -- allowed in 'connCaps'.
192 | DisallowedMessage
193 { -- | Who sent invalid message.
194 violentSender :: ChannelSide
195
196 -- | If the 'violentSender' reconnect with this extension
197 -- enabled then he can try to send this message.
198 , extensionRequired :: Extension
199 }
200 deriving Show
201
202instance Pretty ProtocolError where
203 pretty = PP.text . show
204
205errorPenalty :: ProtocolError -> Int
206errorPenalty (InvalidProtocol _) = 1
207errorPenalty (UnexpectedProtocol _) = 1
208errorPenalty (UnexpectedTopic _) = 1
209errorPenalty (UnexpectedPeerId _) = 1
210errorPenalty (UnknownTopic _) = 0
211errorPenalty (HandshakeRefused ) = 1
212errorPenalty (BitfieldAlreadySent _) = 1
213errorPenalty (DisallowedMessage _ _) = 1
214
215-- | Exceptions used to interrupt the current P2P session.
216data WireFailure
217 = ConnectionRefused IOError
218
219 -- | Force termination of wire connection.
220 --
221 -- Normally you should throw only this exception from event loop
222 -- using 'disconnectPeer', other exceptions are thrown
223 -- automatically by functions from this module.
224 --
225 | DisconnectPeer
226
227 -- | A peer not responding and did not send a 'KeepAlive' message
228 -- for a specified period of time.
229 | PeerDisconnected
230
231 -- | A remote peer have sent some unknown message we unable to
232 -- parse.
233 | DecodingError GetException
234
235 -- | See 'ProtocolError' for more details.
236 | ProtocolError ProtocolError
237
238 -- | A possible malicious peer have sent too many control messages
239 -- without making any progress.
240 | FloodDetected ConnectionStats
241 deriving (Show, Typeable)
242
243instance Exception WireFailure
244
245instance Pretty WireFailure where
246 pretty = PP.text . show
247
248-- TODO
249-- data Penalty = Ban | Penalty Int
250
251peerPenalty :: WireFailure -> Int
252peerPenalty DisconnectPeer = 0
253peerPenalty PeerDisconnected = 0
254peerPenalty (DecodingError _) = 1
255peerPenalty (ProtocolError e) = errorPenalty e
256peerPenalty (FloodDetected _) = 1
257
258-- | Do nothing with exception, used with 'handle' or 'try'.
259isWireFailure :: Monad m => WireFailure -> m ()
260isWireFailure _ = return ()
261
262protocolError :: MonadThrow m => ProtocolError -> m a
263protocolError = monadThrow . ProtocolError
264
265{-----------------------------------------------------------------------
266-- Stats
267-----------------------------------------------------------------------}
268
269-- | Message stats in one direction.
270data FlowStats = FlowStats
271 { -- | Number of the messages sent or received.
272 messageCount :: {-# UNPACK #-} !Int
273 -- | Sum of byte sequences of all messages.
274 , messageBytes :: {-# UNPACK #-} !ByteStats
275 } deriving Show
276
277instance Pretty FlowStats where
278 pretty FlowStats {..} =
279 PP.int messageCount <+> "messages" $+$
280 pretty messageBytes
281
282-- | Zeroed stats.
283instance Default FlowStats where
284 def = FlowStats 0 def
285
286-- | Monoid under addition.
287instance Monoid FlowStats where
288 mempty = def
289 mappend a b = FlowStats
290 { messageBytes = messageBytes a <> messageBytes b
291 , messageCount = messageCount a + messageCount b
292 }
293
294-- | Find average length of byte sequences per message.
295avgByteStats :: FlowStats -> ByteStats
296avgByteStats (FlowStats n ByteStats {..}) = ByteStats
297 { overhead = overhead `quot` n
298 , control = control `quot` n
299 , payload = payload `quot` n
300 }
301
302-- | Message stats in both directions. This data can be retrieved
303-- using 'getStats' function.
304--
305-- Note that this stats is completely different from
306-- 'Data.Torrent.Progress.Progress': payload bytes not necessary
307-- equal to downloaded\/uploaded bytes since a peer can send a
308-- broken block.
309--
310data ConnectionStats = ConnectionStats
311 { -- | Received messages stats.
312 incomingFlow :: !FlowStats
313 -- | Sent messages stats.
314 , outcomingFlow :: !FlowStats
315 } deriving Show
316
317instance Pretty ConnectionStats where
318 pretty ConnectionStats {..} = vcat
319 [ "Recv:" <+> pretty incomingFlow
320 , "Sent:" <+> pretty outcomingFlow
321 , "Both:" <+> pretty (incomingFlow <> outcomingFlow)
322 ]
323
324-- | Zeroed stats.
325instance Default ConnectionStats where
326 def = ConnectionStats def def
327
328-- | Monoid under addition.
329instance Monoid ConnectionStats where
330 mempty = def
331 mappend a b = ConnectionStats
332 { incomingFlow = incomingFlow a <> incomingFlow b
333 , outcomingFlow = outcomingFlow a <> outcomingFlow b
334 }
335
336-- | Aggregate one more message stats in the /specified/ direction.
337addStats :: ChannelSide -> ByteStats -> ConnectionStats -> ConnectionStats
338addStats ThisPeer x s = s { outcomingFlow = (FlowStats 1 x) <> (outcomingFlow s) }
339addStats RemotePeer x s = s { incomingFlow = (FlowStats 1 x) <> (incomingFlow s) }
340
341-- | Sum of overhead and control bytes in both directions.
342wastedBytes :: ConnectionStats -> Int
343wastedBytes ConnectionStats {..} = overhead + control
344 where
345 FlowStats _ ByteStats {..} = incomingFlow <> outcomingFlow
346
347-- | Sum of payload bytes in both directions.
348payloadBytes :: ConnectionStats -> Int
349payloadBytes ConnectionStats {..} =
350 payload (messageBytes (incomingFlow <> outcomingFlow))
351
352-- | Sum of any bytes in both directions.
353transmittedBytes :: ConnectionStats -> Int
354transmittedBytes ConnectionStats {..} =
355 byteLength (messageBytes (incomingFlow <> outcomingFlow))
356
357{-----------------------------------------------------------------------
358-- Flood protection
359-----------------------------------------------------------------------}
360
361defaultFloodFactor :: Int
362defaultFloodFactor = 1
363
364-- | This is a very permissive value, connection setup usually takes
365-- around 10-100KB, including both directions.
366defaultFloodThreshold :: Int
367defaultFloodThreshold = 2 * 1024 * 1024
368
369-- | A flood detection function.
370type Detector stats = Int -- ^ Factor;
371 -> Int -- ^ Threshold;
372 -> stats -- ^ Stats to analyse;
373 -> Bool -- ^ Is this a flooded connection?
374
375defaultDetector :: Detector ConnectionStats
376defaultDetector factor threshold s =
377 transmittedBytes s > threshold &&
378 factor * wastedBytes s > payloadBytes s
379
380-- | Flood detection is used to protect /this/ peer against a /remote/
381-- malicious peer sending meaningless control messages.
382data FloodDetector = FloodDetector
383 { -- | Max ratio of payload bytes to control bytes.
384 floodFactor :: {-# UNPACK #-} !Int
385
386 -- | Max count of bytes connection /setup/ can take including
387 -- 'Handshake', 'ExtendedHandshake', 'Bitfield', 'Have' and 'Port'
388 -- messages. This value is used to avoid false positives at the
389 -- connection initialization.
390 , floodThreshold :: {-# UNPACK #-} !Int
391
392 -- | Flood predicate on the /current/ 'ConnectionStats'.
393 , floodPredicate :: Detector ConnectionStats
394 } deriving Show
395
396instance Eq FloodDetector where
397 a == b = floodFactor a == floodFactor b
398 && floodThreshold a == floodThreshold b
399
400-- | Flood detector with very permissive options.
401instance Default FloodDetector where
402 def = FloodDetector
403 { floodFactor = defaultFloodFactor
404 , floodThreshold = defaultFloodThreshold
405 , floodPredicate = defaultDetector
406 }
407
408-- | This peer might drop connection if the detector gives positive answer.
409runDetector :: FloodDetector -> ConnectionStats -> Bool
410runDetector FloodDetector {..} = floodPredicate floodFactor floodThreshold
411
412{-----------------------------------------------------------------------
413-- Options
414-----------------------------------------------------------------------}
415
416-- | Various connection settings and limits.
417data Options = Options
418 { -- | How often /this/ peer should send 'KeepAlive' messages.
419 keepaliveInterval :: {-# UNPACK #-} !Int
420
421 -- | /This/ peer will drop connection if a /remote/ peer did not
422 -- send any message for this period of time.
423 , keepaliveTimeout :: {-# UNPACK #-} !Int
424
425 , requestQueueLength :: {-# UNPACK #-} !Int
426
427 -- | Used to protect against flood attacks.
428 , floodDetector :: FloodDetector
429
430 -- | Used to protect against flood attacks in /metadata
431 -- exchange/. Normally, a requesting peer should request each
432 -- 'InfoDict' piece only one time, but a malicious peer can
433 -- saturate wire with 'MetadataRequest' messages thus flooding
434 -- responding peer.
435 --
436 -- This value set upper bound for number of 'MetadataRequests'
437 -- for each piece.
438 --
439 , metadataFactor :: {-# UNPACK #-} !Int
440
441 -- | Used to protect against out-of-memory attacks: malicious peer
442 -- can claim that 'totalSize' is, say, 100TB and send some random
443 -- data instead of infodict pieces. Since requesting peer unable
444 -- to check not completed infodict via the infohash, the
445 -- accumulated pieces will allocate the all available memory.
446 --
447 -- This limit set upper bound for 'InfoDict' size. See
448 -- 'ExtendedMetadata' for more info.
449 --
450 , maxInfoDictSize :: {-# UNPACK #-} !Int
451 } deriving (Show, Eq)
452
453-- | Permissive default parameters, most likely you don't need to
454-- change them.
455instance Default Options where
456 def = Options
457 { keepaliveInterval = defaultKeepAliveInterval
458 , keepaliveTimeout = defaultKeepAliveTimeout
459 , requestQueueLength = defaultRequestQueueLength
460 , floodDetector = def
461 , metadataFactor = defaultMetadataFactor
462 , maxInfoDictSize = defaultMaxInfoDictSize
463 }
464
465{-----------------------------------------------------------------------
466-- Connection
467-----------------------------------------------------------------------}
468
469data ConnectionState = ConnectionState {
470 -- | If @not (allowed ExtExtended connCaps)@ then this set is always
471 -- empty. Otherwise it has the BEP10 extension protocol mandated mapping of
472 -- 'MessageId' to the message type for the remote peer.
473 --
474 -- Note that this value can change in current session if either
475 -- this or remote peer will initiate rehandshaking.
476 --
477 _connExtCaps :: !ExtendedCaps
478
479 -- | Current extended handshake information from the remote peer
480 , _connRemoteEhs :: !ExtendedHandshake
481
482 -- | Various stats about messages sent and received. Stats can be
483 -- used to protect /this/ peer against flood attacks.
484 --
485 -- Note that this value will change with the next sent or received
486 -- message.
487 , _connStats :: !ConnectionStats
488
489 , _connStatus :: !ConnectionStatus
490
491 -- | Bitfield of remote endpoint.
492 , _connBitfield :: !Bitfield
493 }
494
495makeLenses ''ConnectionState
496
497instance Default ConnectionState where
498 def = ConnectionState
499 { _connExtCaps = def
500 , _connRemoteEhs = def
501 , _connStats = def
502 , _connStatus = def
503 , _connBitfield = BF.haveNone 0
504 }
505
506-- | Connection keep various info about both peers.
507data Connection s = Connection
508 { connInitiatedBy :: !ChannelSide
509
510 , connRemoteAddr :: !(PeerAddr IP)
511
512 -- | /Both/ peers handshaked with this protocol string. The only
513 -- value is \"Bittorrent Protocol\" but this can be changed in
514 -- future.
515 , connProtocol :: !ProtocolName
516
517 -- | Set of enabled core extensions, i.e. the pre BEP10 extension
518 -- mechanism. This value is used to check if a message is allowed
519 -- to be sent or received.
520 , connCaps :: !Caps
521
522 -- | /Both/ peers handshaked with this infohash. A connection can
523 -- handle only one topic, use 'reconnect' to change the current
524 -- topic.
525 , connTopic :: !InfoHash
526
527 -- | Typically extracted from handshake.
528 , connRemotePeerId :: !PeerId
529
530 -- | Typically extracted from handshake.
531 , connThisPeerId :: !PeerId
532
533 -- |
534 , connOptions :: !Options
535
536 -- | Mutable connection state, see 'ConnectionState'
537 , connState :: !(IORef ConnectionState)
538
539-- -- | Max request queue length.
540-- , connMaxQueueLen :: !Int
541
542 -- | Environment data.
543 , connSession :: !s
544
545 , connChan :: !(Chan Message)
546 }
547
548instance Pretty (Connection s) where
549 pretty Connection {..} = "Connection"
550
551instance ToLogStr (Connection s) where
552 toLogStr Connection {..} = mconcat
553 [ toLogStr (show connRemoteAddr)
554 , toLogStr (show connProtocol)
555 , toLogStr (show connCaps)
556 , toLogStr (show connTopic)
557 , toLogStr (show connRemotePeerId)
558 , toLogStr (show connThisPeerId)
559 , toLogStr (show connOptions)
560 ]
561
562-- TODO check extended messages too
563isAllowed :: Connection s -> Message -> Bool
564isAllowed Connection {..} msg
565 | Just ext <- requires msg = ext `allowed` connCaps
566 | otherwise = True
567
568{-----------------------------------------------------------------------
569-- Hanshaking
570-----------------------------------------------------------------------}
571
572sendHandshake :: Socket -> Handshake -> IO ()
573sendHandshake sock hs = sendAll sock (S.encode hs)
574
575recvHandshake :: Socket -> IO Handshake
576recvHandshake sock = do
577 header <- BS.recv sock 1
578 unless (BS.length header == 1) $
579 throw $ userError "Unable to receive handshake header."
580
581 let protocolLen = BS.head header
582 let restLen = handshakeSize protocolLen - 1
583
584 body <- BS.recv sock restLen
585 let resp = BS.cons protocolLen body
586 either (throwIO . userError) return $ S.decode resp
587
588-- | Handshaking with a peer specified by the second argument.
589--
590-- It's important to send handshake first because /accepting/ peer
591-- do not know handshake topic and will wait until /connecting/ peer
592-- will send handshake.
593--
594initiateHandshake :: Socket -> Handshake -> IO Handshake
595initiateHandshake sock hs = do
596 sendHandshake sock hs
597 recvHandshake sock
598
599data HandshakePair = HandshakePair
600 { handshakeSent :: !Handshake
601 , handshakeRecv :: !Handshake
602 } deriving (Show, Eq)
603
604validatePair :: HandshakePair -> PeerAddr IP -> IO ()
605validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp
606 [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs')
607 , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs')
608 , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs')
609 , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr)
610 , UnexpectedPeerId $ hsPeerId hs')
611 ]
612 where
613 checkProp (t, e) = unless t $ throwIO $ ProtocolError e
614
615-- | Connection state /right/ after handshaking.
616establishedStats :: HandshakePair -> ConnectionStats
617establishedStats HandshakePair {..} = ConnectionStats
618 { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent
619 , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv
620 }
621
622{-----------------------------------------------------------------------
623-- Wire
624-----------------------------------------------------------------------}
625
626-- | do not expose this so we can change it without breaking api
627newtype Connected s a = Connected { runConnected :: (ReaderT (Connection s) IO a) }
628 deriving (Functor, Applicative, Monad
629 , MonadIO, MonadReader (Connection s), MonadThrow
630 )
631
632instance MonadState ConnectionState (Connected s) where
633 get = Connected (asks connState) >>= liftIO . readIORef
634 put x = Connected (asks connState) >>= liftIO . flip writeIORef x
635
636-- | A duplex channel connected to a remote peer which keep tracks
637-- connection parameters.
638type Wire s a = ConduitM Message Message (Connected s) a
639
640{-----------------------------------------------------------------------
641-- Wrapper
642-----------------------------------------------------------------------}
643
644putStats :: ChannelSide -> Message -> Connected s ()
645putStats side msg = connStats %= addStats side (stats msg)
646
647validate :: ChannelSide -> Message -> Connected s ()
648validate side msg = do
649 caps <- asks connCaps
650 case requires msg of
651 Nothing -> return ()
652 Just ext
653 | ext `allowed` caps -> return ()
654 | otherwise -> protocolError $ DisallowedMessage side ext
655
656trackFlow :: ChannelSide -> Wire s ()
657trackFlow side = iterM $ do
658 validate side
659 putStats side
660
661{-----------------------------------------------------------------------
662-- Setup
663-----------------------------------------------------------------------}
664
665-- System.Timeout.timeout multiplier
666seconds :: Int
667seconds = 1000000
668
669sinkChan :: MonadIO m => Chan Message -> Sink Message m ()
670sinkChan chan = await >>= maybe (return ()) (liftIO . writeChan chan)
671
672sourceChan :: MonadIO m => Int -> Chan Message -> Source m Message
673sourceChan interval chan = do
674 mmsg <- liftIO $ timeout (interval * seconds) $ readChan chan
675 yield $ fromMaybe Msg.KeepAlive mmsg
676
677-- | Normally you should use 'connectWire' or 'acceptWire'.
678runWire :: Wire s () -> Socket -> Chan Message -> Connection s -> IO ()
679runWire action sock chan conn = flip runReaderT conn $ runConnected $
680 sourceSocket sock $=
681 conduitGet S.get $=
682 trackFlow RemotePeer $=
683 action $=
684 trackFlow ThisPeer $$
685 sinkChan chan
686
687-- | This function will block until a peer send new message. You can
688-- also use 'await'.
689recvMessage :: Wire s Message
690recvMessage = await >>= maybe (monadThrow PeerDisconnected) return
691
692-- | You can also use 'yield'.
693sendMessage :: PeerMessage msg => msg -> Wire s ()
694sendMessage msg = do
695 ecaps <- use connExtCaps
696 yield $ envelop ecaps msg
697
698getMaxQueueLength :: Connected s Int
699getMaxQueueLength = do
700 advertisedLen <- ehsQueueLength <$> use connRemoteEhs
701 defaultLen <- asks (requestQueueLength . connOptions)
702 return $ fromMaybe defaultLen advertisedLen
703
704-- | Filter pending messages from send buffer.
705filterQueue :: (Message -> Bool) -> Wire s ()
706filterQueue p = lift $ do
707 chan <- asks connChan
708 liftIO $ getChanContents chan >>= writeList2Chan chan . L.filter p
709
710-- | Forcefully terminate wire session and close socket.
711disconnectPeer :: Wire s a
712disconnectPeer = monadThrow DisconnectPeer
713
714extendedHandshake :: ExtendedCaps -> Wire s ()
715extendedHandshake caps = do
716 -- TODO add other params to the handshake
717 sendMessage $ nullExtendedHandshake caps
718 msg <- recvMessage
719 case msg of
720 Extended (EHandshake remoteEhs@(ExtendedHandshake {..})) -> do
721 connExtCaps .= (ehsCaps <> caps)
722 connRemoteEhs .= remoteEhs
723 _ -> protocolError HandshakeRefused
724
725rehandshake :: ExtendedCaps -> Wire s ()
726rehandshake caps = undefined
727
728reconnect :: Wire s ()
729reconnect = undefined
730
731data ConnectionId = ConnectionId
732 { topic :: !InfoHash
733 , remoteAddr :: !(PeerAddr IP)
734 , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node.
735 }
736
737-- | /Preffered/ settings of wire. To get the real use 'ask'.
738data ConnectionPrefs = ConnectionPrefs
739 { prefOptions :: !Options
740 , prefProtocol :: !ProtocolName
741 , prefCaps :: !Caps
742 , prefExtCaps :: !ExtendedCaps
743 } deriving (Show, Eq)
744
745instance Default ConnectionPrefs where
746 def = ConnectionPrefs
747 { prefOptions = def
748 , prefProtocol = def
749 , prefCaps = def
750 , prefExtCaps = def
751 }
752
753normalize :: ConnectionPrefs -> ConnectionPrefs
754normalize = undefined
755
756-- | Bridge between 'Connection' and 'Network.BitTorrent.Exchange.Session'.
757data SessionLink s = SessionLink
758 { linkTopic :: !(InfoHash)
759 , linkPeerId :: !(PeerId)
760 , linkMetadataSize :: !(Maybe Int)
761 , linkOutputChan :: !(Maybe (Chan Message))
762 , linkSession :: !(s)
763 }
764
765data ConnectionConfig s = ConnectionConfig
766 { cfgPrefs :: !(ConnectionPrefs)
767 , cfgSession :: !(SessionLink s)
768 , cfgWire :: !(Wire s ())
769 }
770
771configHandshake :: ConnectionConfig s -> Handshake
772configHandshake ConnectionConfig {..} = Handshake
773 { hsProtocol = prefProtocol cfgPrefs
774 , hsReserved = prefCaps cfgPrefs
775 , hsInfoHash = linkTopic cfgSession
776 , hsPeerId = linkPeerId cfgSession
777 }
778
779{-----------------------------------------------------------------------
780-- Pending connections
781-----------------------------------------------------------------------}
782
783-- | Connection in half opened state. A normal usage scenario:
784--
785-- * Opened using 'newPendingConnection', usually in the listener
786-- loop;
787--
788-- * Closed using 'closePending' if 'pendingPeer' is banned,
789-- 'pendingCaps' is prohibited or pendingTopic is unknown;
790--
791-- * Accepted using 'acceptWire' otherwise.
792--
793data PendingConnection = PendingConnection
794 { pendingSock :: Socket
795 , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty;
796 , pendingCaps :: Caps -- ^ advertised by the peer;
797 , pendingTopic :: InfoHash -- ^ possible non-existent topic.
798 }
799
800-- | Reconstruct handshake sent by the remote peer.
801pendingHandshake :: PendingConnection -> Handshake
802pendingHandshake PendingConnection {..} = Handshake
803 { hsProtocol = def
804 , hsReserved = pendingCaps
805 , hsInfoHash = pendingTopic
806 , hsPeerId = fromMaybe (error "pendingHandshake: impossible")
807 (peerId pendingPeer)
808 }
809
810-- |
811--
812-- This function can throw 'WireFailure' exception.
813--
814newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection
815newPendingConnection sock addr = do
816 Handshake {..} <- recvHandshake sock
817 unless (hsProtocol == def) $ do
818 throwIO $ ProtocolError $ InvalidProtocol hsProtocol
819 return PendingConnection
820 { pendingSock = sock
821 , pendingPeer = addr { peerId = Just hsPeerId }
822 , pendingCaps = hsReserved
823 , pendingTopic = hsInfoHash
824 }
825
826-- | Release all resources associated with the given connection. Note
827-- that you /must not/ 'closePending' if you 'acceptWire'.
828closePending :: PendingConnection -> IO ()
829closePending PendingConnection {..} = do
830 close pendingSock
831
832{-----------------------------------------------------------------------
833-- Connection setup
834-----------------------------------------------------------------------}
835
836chanToSock :: Int -> Chan Message -> Socket -> IO ()
837chanToSock ka chan sock =
838 sourceChan ka chan $= conduitPut S.put $$ sinkSocket sock
839
840afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair
841 -> ConnectionConfig s -> IO ()
842afterHandshaking initiator addr sock
843 hpair @ (HandshakePair hs hs')
844 (ConnectionConfig
845 { cfgPrefs = ConnectionPrefs {..}
846 , cfgSession = SessionLink {..}
847 , cfgWire = wire
848 }) = do
849 let caps = hsReserved hs <> hsReserved hs'
850 cstate <- newIORef def { _connStats = establishedStats hpair }
851 chan <- maybe newChan return linkOutputChan
852 let conn = Connection {
853 connInitiatedBy = initiator
854 , connRemoteAddr = addr
855 , connProtocol = hsProtocol hs
856 , connCaps = caps
857 , connTopic = hsInfoHash hs
858 , connRemotePeerId = hsPeerId hs'
859 , connThisPeerId = hsPeerId hs
860 , connOptions = def
861 , connState = cstate
862 , connSession = linkSession
863 , connChan = chan
864 }
865
866 -- TODO make KA interval configurable
867 let kaInterval = defaultKeepAliveInterval
868 wire' = if ExtExtended `allowed` caps
869 then extendedHandshake prefExtCaps >> wire
870 else wire
871
872 bracket (forkIO (chanToSock kaInterval chan sock))
873 (killThread)
874 (\ _ -> runWire wire' sock chan conn)
875
876-- | Initiate 'Wire' connection and handshake with a peer. This function will
877-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on
878-- both sides.
879--
880-- This function can throw 'WireFailure' exception.
881--
882connectWire :: PeerAddr IP -> ConnectionConfig s -> IO ()
883connectWire addr cfg = do
884 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
885 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
886 let hs = configHandshake cfg
887 hs' <- initiateHandshake sock hs
888 let hpair = HandshakePair hs hs'
889 validatePair hpair addr
890 afterHandshaking ThisPeer addr sock hpair cfg
891
892-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
893-- socket. For peer listener loop the 'acceptSafe' should be
894-- prefered against 'accept'. The socket will be closed at exit.
895--
896-- This function can throw 'WireFailure' exception.
897--
898acceptWire :: PendingConnection -> ConnectionConfig s -> IO ()
899acceptWire pc @ PendingConnection {..} cfg = do
900 bracket (return pendingSock) close $ \ _ -> do
901 unless (linkTopic (cfgSession cfg) == pendingTopic) $ do
902 throwIO (ProtocolError (UnexpectedTopic pendingTopic))
903
904 let hs = configHandshake cfg
905 sendHandshake pendingSock hs
906 let hpair = HandshakePair hs (pendingHandshake pc)
907
908 afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg
909
910-- | Used when size of bitfield becomes known.
911resizeBitfield :: Int -> Connected s ()
912resizeBitfield n = connBitfield %= adjustSize n