diff options
Diffstat (limited to 'src/Network/BitTorrent/Exchange/Wire.hs')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 263 |
1 files changed, 213 insertions, 50 deletions
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 27915724..d1bed146 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -7,6 +7,11 @@ | |||
7 | -- Stability : experimental | 7 | -- Stability : experimental |
8 | -- Portability : portable | 8 | -- Portability : portable |
9 | -- | 9 | -- |
10 | -- Each peer wire connection is identified by triple @(topic, | ||
11 | -- remote_addr, this_addr)@. This means that connections are the | ||
12 | -- same if and only if their 'ConnectionId' are the same. Of course, | ||
13 | -- you /must/ avoid duplicated connections. | ||
14 | -- | ||
10 | -- This module control /integrity/ of data send and received. | 15 | -- This module control /integrity/ of data send and received. |
11 | -- | 16 | -- |
12 | {-# LANGUAGE DeriveDataTypeable #-} | 17 | {-# LANGUAGE DeriveDataTypeable #-} |
@@ -21,6 +26,7 @@ module Network.BitTorrent.Exchange.Wire | |||
21 | 26 | ||
22 | -- * Connection | 27 | -- * Connection |
23 | , Connection | 28 | , Connection |
29 | , connInitiatedBy | ||
24 | 30 | ||
25 | -- ** Identity | 31 | -- ** Identity |
26 | , connRemoteAddr | 32 | , connRemoteAddr |
@@ -44,9 +50,23 @@ module Network.BitTorrent.Exchange.Wire | |||
44 | , connStats | 50 | , connStats |
45 | 51 | ||
46 | -- * Setup | 52 | -- * Setup |
47 | , runWire | 53 | , ConnectionPrefs (..) |
54 | , ConnectionSession (..) | ||
55 | , ConnectionConfig (..) | ||
56 | |||
57 | -- ** Initiate | ||
48 | , connectWire | 58 | , connectWire |
59 | |||
60 | -- ** Accept | ||
61 | , PendingConnection | ||
62 | , newPendingConnection | ||
63 | , pendingPeer | ||
64 | , pendingCaps | ||
65 | , pendingTopic | ||
66 | , closePending | ||
49 | , acceptWire | 67 | , acceptWire |
68 | |||
69 | -- ** Post setup actions | ||
50 | , resizeBitfield | 70 | , resizeBitfield |
51 | 71 | ||
52 | -- * Messaging | 72 | -- * Messaging |
@@ -143,7 +163,7 @@ data ProtocolError | |||
143 | 163 | ||
144 | -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not | 164 | -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not |
145 | -- match with 'hsInfoHash' /this/ peer have sent. Can occur in | 165 | -- match with 'hsInfoHash' /this/ peer have sent. Can occur in |
146 | -- 'connectWire' only. | 166 | -- 'connectWire' or 'acceptWire' only. |
147 | | UnexpectedTopic InfoHash | 167 | | UnexpectedTopic InfoHash |
148 | 168 | ||
149 | -- | Some trackers or DHT can return 'PeerId' of a peer. If a | 169 | -- | Some trackers or DHT can return 'PeerId' of a peer. If a |
@@ -470,9 +490,20 @@ data ConnectionState = ConnectionState { | |||
470 | 490 | ||
471 | makeLenses ''ConnectionState | 491 | makeLenses ''ConnectionState |
472 | 492 | ||
493 | instance Default ConnectionState where | ||
494 | def = ConnectionState | ||
495 | { _connExtCaps = def | ||
496 | , _connRemoteEhs = def | ||
497 | , _connStats = def | ||
498 | , _connStatus = def | ||
499 | , _connBitfield = BF.haveNone 0 | ||
500 | } | ||
501 | |||
473 | -- | Connection keep various info about both peers. | 502 | -- | Connection keep various info about both peers. |
474 | data Connection s = Connection | 503 | data Connection s = Connection |
475 | { connRemoteAddr :: !(PeerAddr IP) | 504 | { connInitiatedBy :: !ChannelSide |
505 | |||
506 | , connRemoteAddr :: !(PeerAddr IP) | ||
476 | 507 | ||
477 | -- | /Both/ peers handshaked with this protocol string. The only | 508 | -- | /Both/ peers handshaked with this protocol string. The only |
478 | -- value is \"Bittorrent Protocol\" but this can be changed in | 509 | -- value is \"Bittorrent Protocol\" but this can be changed in |
@@ -561,6 +592,29 @@ initiateHandshake sock hs = do | |||
561 | sendHandshake sock hs | 592 | sendHandshake sock hs |
562 | recvHandshake sock | 593 | recvHandshake sock |
563 | 594 | ||
595 | data HandshakePair = HandshakePair | ||
596 | { handshakeSent :: !Handshake | ||
597 | , handshakeRecv :: !Handshake | ||
598 | } deriving (Show, Eq) | ||
599 | |||
600 | validatePair :: HandshakePair -> PeerAddr IP -> IO () | ||
601 | validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp | ||
602 | [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs') | ||
603 | , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs') | ||
604 | , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs') | ||
605 | , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) | ||
606 | , UnexpectedPeerId $ hsPeerId hs') | ||
607 | ] | ||
608 | where | ||
609 | checkProp (t, e) = unless t $ throwIO $ ProtocolError e | ||
610 | |||
611 | -- | Connection state /right/ after handshaking. | ||
612 | establishedStats :: HandshakePair -> ConnectionStats | ||
613 | establishedStats HandshakePair {..} = ConnectionStats | ||
614 | { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent | ||
615 | , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv | ||
616 | } | ||
617 | |||
564 | {----------------------------------------------------------------------- | 618 | {----------------------------------------------------------------------- |
565 | -- Wire | 619 | -- Wire |
566 | -----------------------------------------------------------------------} | 620 | -----------------------------------------------------------------------} |
@@ -670,53 +724,129 @@ rehandshake caps = undefined | |||
670 | reconnect :: Wire s () | 724 | reconnect :: Wire s () |
671 | reconnect = undefined | 725 | reconnect = undefined |
672 | 726 | ||
673 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | 727 | data ConnectionId = ConnectionId |
674 | -- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on | 728 | { topic :: !InfoHash |
675 | -- both sides. | 729 | , remoteAddr :: !(PeerAddr IP) |
730 | , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node. | ||
731 | } | ||
732 | |||
733 | -- | /Preffered/ settings of wire. To get the real use 'ask'. | ||
734 | data ConnectionPrefs = ConnectionPrefs | ||
735 | { prefOptions :: !Options | ||
736 | , prefProtocol :: !ProtocolName | ||
737 | , prefCaps :: !Caps | ||
738 | , prefExtCaps :: !ExtendedCaps | ||
739 | } | ||
740 | |||
741 | instance Default ConnectionPrefs where | ||
742 | def = ConnectionPrefs | ||
743 | { prefOptions = def | ||
744 | , prefProtocol = def | ||
745 | , prefCaps = def | ||
746 | , prefExtCaps = def | ||
747 | } | ||
748 | |||
749 | normalize :: ConnectionPrefs -> ConnectionPrefs | ||
750 | normalize = undefined | ||
751 | |||
752 | data ConnectionSession s = ConnectionSession | ||
753 | { sessionTopic :: !(InfoHash) | ||
754 | , sessionPeerId :: !(PeerId) | ||
755 | , metadataSize :: !(Maybe Int) | ||
756 | , outputChan :: !(Maybe (Chan Message)) | ||
757 | , connectionSession :: !(s) | ||
758 | } | ||
759 | |||
760 | data ConnectionConfig s = ConnectionConfig | ||
761 | { cfgPrefs :: !(ConnectionPrefs) | ||
762 | , cfgSession :: !(ConnectionSession s) | ||
763 | , cfgWire :: !(Wire s ()) | ||
764 | } | ||
765 | |||
766 | configHandshake :: ConnectionConfig s -> Handshake | ||
767 | configHandshake ConnectionConfig {..} = Handshake | ||
768 | { hsProtocol = prefProtocol cfgPrefs | ||
769 | , hsReserved = prefCaps cfgPrefs | ||
770 | , hsInfoHash = sessionTopic cfgSession | ||
771 | , hsPeerId = sessionPeerId cfgSession | ||
772 | } | ||
773 | |||
774 | {----------------------------------------------------------------------- | ||
775 | -- Pending connections | ||
776 | -----------------------------------------------------------------------} | ||
777 | |||
778 | -- | Connection in half opened state. A normal usage scenario: | ||
676 | -- | 779 | -- |
677 | -- This function can throw 'WireFailure' exception. | 780 | -- * Opened using 'newPendingConnection', usually in the listener |
781 | -- loop; | ||
678 | -- | 782 | -- |
679 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message | 783 | -- * Closed using 'closePending' if 'pendingPeer' is banned, |
680 | -> Wire s () -> IO () | 784 | -- 'pendingCaps' is prohibited or pendingTopic is unknown; |
681 | connectWire session hs addr extCaps chan wire = do | 785 | -- |
682 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return | 786 | -- * Accepted using 'acceptWire' otherwise. |
683 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | 787 | -- |
684 | hs' <- initiateHandshake sock hs | 788 | data PendingConnection = PendingConnection |
789 | { pendingSock :: Socket | ||
790 | , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty; | ||
791 | , pendingCaps :: Caps -- ^ advertised by the peer; | ||
792 | , pendingTopic :: InfoHash -- ^ possible non-existent topic. | ||
793 | } | ||
685 | 794 | ||
686 | Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ | 795 | -- | Reconstruct handshake sent by the remote peer. |
687 | (def == hsProtocol hs' | 796 | pendingHandshake :: PendingConnection -> Handshake |
688 | , InvalidProtocol $ hsProtocol hs'), | 797 | pendingHandshake PendingConnection {..} = Handshake |
689 | (hsProtocol hs == hsProtocol hs' | 798 | { hsProtocol = def |
690 | , UnexpectedProtocol $ hsProtocol hs'), | 799 | , hsReserved = pendingCaps |
691 | (hsInfoHash hs == hsInfoHash hs' | 800 | , hsInfoHash = pendingTopic |
692 | , UnexpectedTopic $ hsInfoHash hs'), | 801 | , hsPeerId = fromMaybe (error "pendingHandshake: impossible") |
693 | (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) | 802 | (peerId pendingPeer) |
694 | , UnexpectedPeerId $ hsPeerId hs') | 803 | } |
695 | ] | ||
696 | |||
697 | let caps = hsReserved hs <> hsReserved hs' | ||
698 | wire' = if ExtExtended `allowed` caps | ||
699 | then extendedHandshake extCaps >> wire | ||
700 | else wire | ||
701 | 804 | ||
702 | cstate <- newIORef $ ConnectionState { | 805 | -- | |
703 | _connExtCaps = def | 806 | -- |
704 | , _connRemoteEhs = def | 807 | -- This function can throw 'WireFailure' exception. |
705 | , _connStats = ConnectionStats { | 808 | -- |
706 | outcomingFlow = FlowStats 1 $ handshakeStats hs | 809 | newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection |
707 | , incomingFlow = FlowStats 1 $ handshakeStats hs' | 810 | newPendingConnection sock addr = do |
708 | } | 811 | Handshake {..} <- recvHandshake sock |
709 | , _connStatus = def | 812 | unless (hsProtocol == def) $ do |
710 | , _connBitfield = BF.haveNone 0 | 813 | throwIO $ ProtocolError $ InvalidProtocol hsProtocol |
711 | } | 814 | return PendingConnection |
815 | { pendingSock = sock | ||
816 | , pendingPeer = addr { peerId = Just hsPeerId } | ||
817 | , pendingCaps = hsReserved | ||
818 | , pendingTopic = hsInfoHash | ||
819 | } | ||
712 | 820 | ||
713 | -- TODO make KA interval configurable | 821 | -- | Release all resources associated with the given connection. Note |
714 | let kaInterval = defaultKeepAliveInterval | 822 | -- that you /must not/ 'closePending' if you 'acceptWire'. |
715 | bracket | 823 | closePending :: PendingConnection -> IO () |
716 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) | 824 | closePending PendingConnection {..} = do |
717 | (killThread) $ \ _ -> | 825 | close pendingSock |
718 | runWire wire' sock chan $ Connection | 826 | |
719 | { connRemoteAddr = addr | 827 | {----------------------------------------------------------------------- |
828 | -- Connection setup | ||
829 | -----------------------------------------------------------------------} | ||
830 | |||
831 | chanToSock :: Int -> Chan Message -> Socket -> IO () | ||
832 | chanToSock ka chan sock = | ||
833 | sourceChan ka chan $= conduitPut S.put $$ sinkSocket sock | ||
834 | |||
835 | afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair | ||
836 | -> ConnectionConfig s -> IO () | ||
837 | afterHandshaking initiator addr sock | ||
838 | hpair @ (HandshakePair hs hs') | ||
839 | (ConnectionConfig | ||
840 | { cfgPrefs = ConnectionPrefs {..} | ||
841 | , cfgSession = ConnectionSession {..} | ||
842 | , cfgWire = wire | ||
843 | }) = do | ||
844 | let caps = hsReserved hs <> hsReserved hs' | ||
845 | cstate <- newIORef def { _connStats = establishedStats hpair } | ||
846 | chan <- maybe newChan return outputChan | ||
847 | let conn = Connection { | ||
848 | connInitiatedBy = initiator | ||
849 | , connRemoteAddr = addr | ||
720 | , connProtocol = hsProtocol hs | 850 | , connProtocol = hsProtocol hs |
721 | , connCaps = caps | 851 | , connCaps = caps |
722 | , connTopic = hsInfoHash hs | 852 | , connTopic = hsInfoHash hs |
@@ -724,20 +854,53 @@ connectWire session hs addr extCaps chan wire = do | |||
724 | , connThisPeerId = hsPeerId hs | 854 | , connThisPeerId = hsPeerId hs |
725 | , connOptions = def | 855 | , connOptions = def |
726 | , connState = cstate | 856 | , connState = cstate |
727 | , connSession = session | 857 | , connSession = connectionSession |
728 | , connChan = chan | 858 | , connChan = chan |
729 | } | 859 | } |
730 | 860 | ||
861 | -- TODO make KA interval configurable | ||
862 | let kaInterval = defaultKeepAliveInterval | ||
863 | wire' = if ExtExtended `allowed` caps | ||
864 | then extendedHandshake prefExtCaps >> wire | ||
865 | else wire | ||
866 | |||
867 | bracket (forkIO (chanToSock kaInterval chan sock)) | ||
868 | (killThread) | ||
869 | (\ _ -> runWire wire' sock chan conn) | ||
870 | |||
871 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | ||
872 | -- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on | ||
873 | -- both sides. | ||
874 | -- | ||
875 | -- This function can throw 'WireFailure' exception. | ||
876 | -- | ||
877 | connectWire :: PeerAddr IP -> ConnectionConfig s -> IO () | ||
878 | connectWire addr cfg = do | ||
879 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return | ||
880 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | ||
881 | let hs = configHandshake cfg | ||
882 | hs' <- initiateHandshake sock hs | ||
883 | let hpair = HandshakePair hs hs' | ||
884 | validatePair hpair addr | ||
885 | afterHandshaking ThisPeer addr sock hpair cfg | ||
886 | |||
731 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | 887 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed |
732 | -- socket. For peer listener loop the 'acceptSafe' should be | 888 | -- socket. For peer listener loop the 'acceptSafe' should be |
733 | -- prefered against 'accept'. The socket will be closed at exit. | 889 | -- prefered against 'accept'. The socket will be closed at exit. |
734 | -- | 890 | -- |
735 | -- This function can throw 'WireFailure' exception. | 891 | -- This function can throw 'WireFailure' exception. |
736 | -- | 892 | -- |
737 | acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO () | 893 | acceptWire :: PendingConnection -> ConnectionConfig s -> IO () |
738 | acceptWire sock peerAddr wire = do | 894 | acceptWire pc @ PendingConnection {..} cfg = do |
739 | bracket (return sock) close $ \ _ -> do | 895 | bracket (return pendingSock) close $ \ _ -> do |
740 | error "acceptWire: not implemented" | 896 | unless (sessionTopic (cfgSession cfg) == pendingTopic) $ do |
897 | throwIO (ProtocolError (UnexpectedTopic pendingTopic)) | ||
898 | |||
899 | let hs = configHandshake cfg | ||
900 | sendHandshake pendingSock hs | ||
901 | let hpair = HandshakePair hs (pendingHandshake pc) | ||
902 | |||
903 | afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg | ||
741 | 904 | ||
742 | -- | Used when size of bitfield becomes known. | 905 | -- | Used when size of bitfield becomes known. |
743 | resizeBitfield :: Int -> Connected s () | 906 | resizeBitfield :: Int -> Connected s () |