summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/Exchange/Wire.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Wire.hs')
-rw-r--r--src/Network/BitTorrent/Exchange/Wire.hs263
1 files changed, 213 insertions, 50 deletions
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs
index 27915724..d1bed146 100644
--- a/src/Network/BitTorrent/Exchange/Wire.hs
+++ b/src/Network/BitTorrent/Exchange/Wire.hs
@@ -7,6 +7,11 @@
7-- Stability : experimental 7-- Stability : experimental
8-- Portability : portable 8-- Portability : portable
9-- 9--
10-- Each peer wire connection is identified by triple @(topic,
11-- remote_addr, this_addr)@. This means that connections are the
12-- same if and only if their 'ConnectionId' are the same. Of course,
13-- you /must/ avoid duplicated connections.
14--
10-- This module control /integrity/ of data send and received. 15-- This module control /integrity/ of data send and received.
11-- 16--
12{-# LANGUAGE DeriveDataTypeable #-} 17{-# LANGUAGE DeriveDataTypeable #-}
@@ -21,6 +26,7 @@ module Network.BitTorrent.Exchange.Wire
21 26
22 -- * Connection 27 -- * Connection
23 , Connection 28 , Connection
29 , connInitiatedBy
24 30
25 -- ** Identity 31 -- ** Identity
26 , connRemoteAddr 32 , connRemoteAddr
@@ -44,9 +50,23 @@ module Network.BitTorrent.Exchange.Wire
44 , connStats 50 , connStats
45 51
46 -- * Setup 52 -- * Setup
47 , runWire 53 , ConnectionPrefs (..)
54 , ConnectionSession (..)
55 , ConnectionConfig (..)
56
57 -- ** Initiate
48 , connectWire 58 , connectWire
59
60 -- ** Accept
61 , PendingConnection
62 , newPendingConnection
63 , pendingPeer
64 , pendingCaps
65 , pendingTopic
66 , closePending
49 , acceptWire 67 , acceptWire
68
69 -- ** Post setup actions
50 , resizeBitfield 70 , resizeBitfield
51 71
52 -- * Messaging 72 -- * Messaging
@@ -143,7 +163,7 @@ data ProtocolError
143 163
144 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not 164 -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not
145 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in 165 -- match with 'hsInfoHash' /this/ peer have sent. Can occur in
146 -- 'connectWire' only. 166 -- 'connectWire' or 'acceptWire' only.
147 | UnexpectedTopic InfoHash 167 | UnexpectedTopic InfoHash
148 168
149 -- | Some trackers or DHT can return 'PeerId' of a peer. If a 169 -- | Some trackers or DHT can return 'PeerId' of a peer. If a
@@ -470,9 +490,20 @@ data ConnectionState = ConnectionState {
470 490
471makeLenses ''ConnectionState 491makeLenses ''ConnectionState
472 492
493instance Default ConnectionState where
494 def = ConnectionState
495 { _connExtCaps = def
496 , _connRemoteEhs = def
497 , _connStats = def
498 , _connStatus = def
499 , _connBitfield = BF.haveNone 0
500 }
501
473-- | Connection keep various info about both peers. 502-- | Connection keep various info about both peers.
474data Connection s = Connection 503data Connection s = Connection
475 { connRemoteAddr :: !(PeerAddr IP) 504 { connInitiatedBy :: !ChannelSide
505
506 , connRemoteAddr :: !(PeerAddr IP)
476 507
477 -- | /Both/ peers handshaked with this protocol string. The only 508 -- | /Both/ peers handshaked with this protocol string. The only
478 -- value is \"Bittorrent Protocol\" but this can be changed in 509 -- value is \"Bittorrent Protocol\" but this can be changed in
@@ -561,6 +592,29 @@ initiateHandshake sock hs = do
561 sendHandshake sock hs 592 sendHandshake sock hs
562 recvHandshake sock 593 recvHandshake sock
563 594
595data HandshakePair = HandshakePair
596 { handshakeSent :: !Handshake
597 , handshakeRecv :: !Handshake
598 } deriving (Show, Eq)
599
600validatePair :: HandshakePair -> PeerAddr IP -> IO ()
601validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp
602 [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs')
603 , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs')
604 , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs')
605 , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr)
606 , UnexpectedPeerId $ hsPeerId hs')
607 ]
608 where
609 checkProp (t, e) = unless t $ throwIO $ ProtocolError e
610
611-- | Connection state /right/ after handshaking.
612establishedStats :: HandshakePair -> ConnectionStats
613establishedStats HandshakePair {..} = ConnectionStats
614 { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent
615 , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv
616 }
617
564{----------------------------------------------------------------------- 618{-----------------------------------------------------------------------
565-- Wire 619-- Wire
566-----------------------------------------------------------------------} 620-----------------------------------------------------------------------}
@@ -670,53 +724,129 @@ rehandshake caps = undefined
670reconnect :: Wire s () 724reconnect :: Wire s ()
671reconnect = undefined 725reconnect = undefined
672 726
673-- | Initiate 'Wire' connection and handshake with a peer. This function will 727data ConnectionId = ConnectionId
674-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on 728 { topic :: !InfoHash
675-- both sides. 729 , remoteAddr :: !(PeerAddr IP)
730 , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node.
731 }
732
733-- | /Preffered/ settings of wire. To get the real use 'ask'.
734data ConnectionPrefs = ConnectionPrefs
735 { prefOptions :: !Options
736 , prefProtocol :: !ProtocolName
737 , prefCaps :: !Caps
738 , prefExtCaps :: !ExtendedCaps
739 }
740
741instance Default ConnectionPrefs where
742 def = ConnectionPrefs
743 { prefOptions = def
744 , prefProtocol = def
745 , prefCaps = def
746 , prefExtCaps = def
747 }
748
749normalize :: ConnectionPrefs -> ConnectionPrefs
750normalize = undefined
751
752data ConnectionSession s = ConnectionSession
753 { sessionTopic :: !(InfoHash)
754 , sessionPeerId :: !(PeerId)
755 , metadataSize :: !(Maybe Int)
756 , outputChan :: !(Maybe (Chan Message))
757 , connectionSession :: !(s)
758 }
759
760data ConnectionConfig s = ConnectionConfig
761 { cfgPrefs :: !(ConnectionPrefs)
762 , cfgSession :: !(ConnectionSession s)
763 , cfgWire :: !(Wire s ())
764 }
765
766configHandshake :: ConnectionConfig s -> Handshake
767configHandshake ConnectionConfig {..} = Handshake
768 { hsProtocol = prefProtocol cfgPrefs
769 , hsReserved = prefCaps cfgPrefs
770 , hsInfoHash = sessionTopic cfgSession
771 , hsPeerId = sessionPeerId cfgSession
772 }
773
774{-----------------------------------------------------------------------
775-- Pending connections
776-----------------------------------------------------------------------}
777
778-- | Connection in half opened state. A normal usage scenario:
676-- 779--
677-- This function can throw 'WireFailure' exception. 780-- * Opened using 'newPendingConnection', usually in the listener
781-- loop;
678-- 782--
679connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message 783-- * Closed using 'closePending' if 'pendingPeer' is banned,
680 -> Wire s () -> IO () 784-- 'pendingCaps' is prohibited or pendingTopic is unknown;
681connectWire session hs addr extCaps chan wire = do 785--
682 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return 786-- * Accepted using 'acceptWire' otherwise.
683 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do 787--
684 hs' <- initiateHandshake sock hs 788data PendingConnection = PendingConnection
789 { pendingSock :: Socket
790 , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty;
791 , pendingCaps :: Caps -- ^ advertised by the peer;
792 , pendingTopic :: InfoHash -- ^ possible non-existent topic.
793 }
685 794
686 Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ 795-- | Reconstruct handshake sent by the remote peer.
687 (def == hsProtocol hs' 796pendingHandshake :: PendingConnection -> Handshake
688 , InvalidProtocol $ hsProtocol hs'), 797pendingHandshake PendingConnection {..} = Handshake
689 (hsProtocol hs == hsProtocol hs' 798 { hsProtocol = def
690 , UnexpectedProtocol $ hsProtocol hs'), 799 , hsReserved = pendingCaps
691 (hsInfoHash hs == hsInfoHash hs' 800 , hsInfoHash = pendingTopic
692 , UnexpectedTopic $ hsInfoHash hs'), 801 , hsPeerId = fromMaybe (error "pendingHandshake: impossible")
693 (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) 802 (peerId pendingPeer)
694 , UnexpectedPeerId $ hsPeerId hs') 803 }
695 ]
696
697 let caps = hsReserved hs <> hsReserved hs'
698 wire' = if ExtExtended `allowed` caps
699 then extendedHandshake extCaps >> wire
700 else wire
701 804
702 cstate <- newIORef $ ConnectionState { 805-- |
703 _connExtCaps = def 806--
704 , _connRemoteEhs = def 807-- This function can throw 'WireFailure' exception.
705 , _connStats = ConnectionStats { 808--
706 outcomingFlow = FlowStats 1 $ handshakeStats hs 809newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection
707 , incomingFlow = FlowStats 1 $ handshakeStats hs' 810newPendingConnection sock addr = do
708 } 811 Handshake {..} <- recvHandshake sock
709 , _connStatus = def 812 unless (hsProtocol == def) $ do
710 , _connBitfield = BF.haveNone 0 813 throwIO $ ProtocolError $ InvalidProtocol hsProtocol
711 } 814 return PendingConnection
815 { pendingSock = sock
816 , pendingPeer = addr { peerId = Just hsPeerId }
817 , pendingCaps = hsReserved
818 , pendingTopic = hsInfoHash
819 }
712 820
713 -- TODO make KA interval configurable 821-- | Release all resources associated with the given connection. Note
714 let kaInterval = defaultKeepAliveInterval 822-- that you /must not/ 'closePending' if you 'acceptWire'.
715 bracket 823closePending :: PendingConnection -> IO ()
716 (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) 824closePending PendingConnection {..} = do
717 (killThread) $ \ _ -> 825 close pendingSock
718 runWire wire' sock chan $ Connection 826
719 { connRemoteAddr = addr 827{-----------------------------------------------------------------------
828-- Connection setup
829-----------------------------------------------------------------------}
830
831chanToSock :: Int -> Chan Message -> Socket -> IO ()
832chanToSock ka chan sock =
833 sourceChan ka chan $= conduitPut S.put $$ sinkSocket sock
834
835afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair
836 -> ConnectionConfig s -> IO ()
837afterHandshaking initiator addr sock
838 hpair @ (HandshakePair hs hs')
839 (ConnectionConfig
840 { cfgPrefs = ConnectionPrefs {..}
841 , cfgSession = ConnectionSession {..}
842 , cfgWire = wire
843 }) = do
844 let caps = hsReserved hs <> hsReserved hs'
845 cstate <- newIORef def { _connStats = establishedStats hpair }
846 chan <- maybe newChan return outputChan
847 let conn = Connection {
848 connInitiatedBy = initiator
849 , connRemoteAddr = addr
720 , connProtocol = hsProtocol hs 850 , connProtocol = hsProtocol hs
721 , connCaps = caps 851 , connCaps = caps
722 , connTopic = hsInfoHash hs 852 , connTopic = hsInfoHash hs
@@ -724,20 +854,53 @@ connectWire session hs addr extCaps chan wire = do
724 , connThisPeerId = hsPeerId hs 854 , connThisPeerId = hsPeerId hs
725 , connOptions = def 855 , connOptions = def
726 , connState = cstate 856 , connState = cstate
727 , connSession = session 857 , connSession = connectionSession
728 , connChan = chan 858 , connChan = chan
729 } 859 }
730 860
861 -- TODO make KA interval configurable
862 let kaInterval = defaultKeepAliveInterval
863 wire' = if ExtExtended `allowed` caps
864 then extendedHandshake prefExtCaps >> wire
865 else wire
866
867 bracket (forkIO (chanToSock kaInterval chan sock))
868 (killThread)
869 (\ _ -> runWire wire' sock chan conn)
870
871-- | Initiate 'Wire' connection and handshake with a peer. This function will
872-- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on
873-- both sides.
874--
875-- This function can throw 'WireFailure' exception.
876--
877connectWire :: PeerAddr IP -> ConnectionConfig s -> IO ()
878connectWire addr cfg = do
879 let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return
880 bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do
881 let hs = configHandshake cfg
882 hs' <- initiateHandshake sock hs
883 let hpair = HandshakePair hs hs'
884 validatePair hpair addr
885 afterHandshaking ThisPeer addr sock hpair cfg
886
731-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed 887-- | Accept 'Wire' connection using already 'Network.Socket.accept'ed
732-- socket. For peer listener loop the 'acceptSafe' should be 888-- socket. For peer listener loop the 'acceptSafe' should be
733-- prefered against 'accept'. The socket will be closed at exit. 889-- prefered against 'accept'. The socket will be closed at exit.
734-- 890--
735-- This function can throw 'WireFailure' exception. 891-- This function can throw 'WireFailure' exception.
736-- 892--
737acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO () 893acceptWire :: PendingConnection -> ConnectionConfig s -> IO ()
738acceptWire sock peerAddr wire = do 894acceptWire pc @ PendingConnection {..} cfg = do
739 bracket (return sock) close $ \ _ -> do 895 bracket (return pendingSock) close $ \ _ -> do
740 error "acceptWire: not implemented" 896 unless (sessionTopic (cfgSession cfg) == pendingTopic) $ do
897 throwIO (ProtocolError (UnexpectedTopic pendingTopic))
898
899 let hs = configHandshake cfg
900 sendHandshake pendingSock hs
901 let hpair = HandshakePair hs (pendingHandshake pc)
902
903 afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg
741 904
742-- | Used when size of bitfield becomes known. 905-- | Used when size of bitfield becomes known.
743resizeBitfield :: Int -> Connected s () 906resizeBitfield :: Int -> Connected s ()