diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-24 14:21:10 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-24 14:21:10 +0400 |
commit | be9c82cd6d3351d02e5f944f041837709d97fa39 (patch) | |
tree | a77798caf6bce0e232a2e044050d952469ef46aa /src | |
parent | 42eaee8dbcd1cfb922d94e974043d8d564dbd353 (diff) |
Implement acceptWire function
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent/Exchange/Session.hs | 111 | ||||
-rw-r--r-- | src/Network/BitTorrent/Exchange/Wire.hs | 263 |
2 files changed, 296 insertions, 78 deletions
diff --git a/src/Network/BitTorrent/Exchange/Session.hs b/src/Network/BitTorrent/Exchange/Session.hs index 88554807..1537efe1 100644 --- a/src/Network/BitTorrent/Exchange/Session.hs +++ b/src/Network/BitTorrent/Exchange/Session.hs | |||
@@ -7,11 +7,16 @@ module Network.BitTorrent.Exchange.Session | |||
7 | , newSession | 7 | , newSession |
8 | , closeSession | 8 | , closeSession |
9 | 9 | ||
10 | -- * Connections | ||
10 | , Network.BitTorrent.Exchange.Session.insert | 11 | , Network.BitTorrent.Exchange.Session.insert |
12 | , Network.BitTorrent.Exchange.Session.attach | ||
13 | , Network.BitTorrent.Exchange.Session.delete | ||
14 | , Network.BitTorrent.Exchange.Session.deleteAll | ||
11 | ) where | 15 | ) where |
12 | 16 | ||
13 | import Control.Applicative | 17 | import Control.Applicative |
14 | import Control.Concurrent | 18 | import Control.Concurrent |
19 | import Control.Concurrent.STM | ||
15 | import Control.Exception hiding (Handler) | 20 | import Control.Exception hiding (Handler) |
16 | import Control.Lens | 21 | import Control.Lens |
17 | import Control.Monad.Logger | 22 | import Control.Monad.Logger |
@@ -23,6 +28,7 @@ import Data.Conduit.List as CL (iterM) | |||
23 | import Data.Maybe | 28 | import Data.Maybe |
24 | import Data.Map as M | 29 | import Data.Map as M |
25 | import Data.Monoid | 30 | import Data.Monoid |
31 | import Data.Set as S | ||
26 | import Data.Text as T | 32 | import Data.Text as T |
27 | import Data.Typeable | 33 | import Data.Typeable |
28 | import Text.PrettyPrint hiding ((<>)) | 34 | import Text.PrettyPrint hiding ((<>)) |
@@ -70,11 +76,6 @@ data Cached a = Cached | |||
70 | cache :: BEncode a => a -> Cached a | 76 | cache :: BEncode a => a -> Cached a |
71 | cache s = Cached s (BE.encode s) | 77 | cache s = Cached s (BE.encode s) |
72 | 78 | ||
73 | data ConnectionEntry = ConnectionEntry | ||
74 | { initiatedBy :: !ChannelSide | ||
75 | , connection :: !(Connection Session) | ||
76 | } | ||
77 | |||
78 | data Session = Session | 79 | data Session = Session |
79 | { tpeerId :: PeerId | 80 | { tpeerId :: PeerId |
80 | , infohash :: InfoHash | 81 | , infohash :: InfoHash |
@@ -82,12 +83,15 @@ data Session = Session | |||
82 | , storage :: Storage | 83 | , storage :: Storage |
83 | , status :: MVar SessionStatus | 84 | , status :: MVar SessionStatus |
84 | , unchoked :: [PeerAddr IP] | 85 | , unchoked :: [PeerAddr IP] |
85 | , connections :: MVar (Map (PeerAddr IP) ConnectionEntry) | 86 | , pendingConnections :: TVar (Set (PeerAddr IP)) |
87 | , establishedConnections :: TVar (Map (PeerAddr IP) (Connection Session)) | ||
86 | , broadcast :: Chan Message | 88 | , broadcast :: Chan Message |
87 | , logger :: LogFun | 89 | , logger :: LogFun |
88 | , infodict :: MVar (Cached InfoDict) | 90 | , infodict :: MVar (Cached InfoDict) |
89 | } | 91 | } |
90 | 92 | ||
93 | instance Ord IP | ||
94 | |||
91 | -- | Logger function. | 95 | -- | Logger function. |
92 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 96 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () |
93 | 97 | ||
@@ -97,18 +101,21 @@ newSession :: LogFun | |||
97 | -> InfoDict -- ^ torrent info dictionary; | 101 | -> InfoDict -- ^ torrent info dictionary; |
98 | -> IO Session -- ^ | 102 | -> IO Session -- ^ |
99 | newSession logFun addr rootPath dict = do | 103 | newSession logFun addr rootPath dict = do |
100 | connVar <- newMVar M.empty | 104 | pconnVar <- newTVarIO S.empty |
105 | econnVar <- newTVarIO M.empty | ||
101 | store <- openInfoDict ReadWriteEx rootPath dict | 106 | store <- openInfoDict ReadWriteEx rootPath dict |
102 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) | 107 | statusVar <- newMVar $ sessionStatus (BF.haveNone (totalPieces store)) |
103 | (piPieceLength (idPieceInfo dict)) | 108 | (piPieceLength (idPieceInfo dict)) |
104 | chan <- newChan | 109 | chan <- newChan |
105 | return Session | 110 | return Session |
106 | { tpeerId = fromMaybe (error "newSession: impossible") (peerId addr) | 111 | { tpeerId = fromMaybe (error "newSession: impossible") |
107 | , infohash = idInfoHash dict | 112 | (peerId addr) |
108 | , status = statusVar | 113 | , infohash = idInfoHash dict |
109 | , storage = store | 114 | , status = statusVar |
110 | , unchoked = [] | 115 | , storage = store |
111 | , connections = connVar | 116 | , unchoked = [] |
117 | , pendingConnections = pconnVar | ||
118 | , establishedConnections = econnVar | ||
112 | , broadcast = chan | 119 | , broadcast = chan |
113 | , logger = logFun | 120 | , logger = logFun |
114 | } | 121 | } |
@@ -137,31 +144,79 @@ logEvent :: MonadLogger m => Text -> m () | |||
137 | logEvent = logInfoN | 144 | logEvent = logInfoN |
138 | 145 | ||
139 | {----------------------------------------------------------------------- | 146 | {----------------------------------------------------------------------- |
147 | -- Connection slots | ||
148 | -----------------------------------------------------------------------} | ||
149 | --- pending -> established -> closed | ||
150 | --- | /|\ | ||
151 | --- \-------------------------| | ||
152 | |||
153 | pendingConnection :: PeerAddr IP -> Session -> IO Bool | ||
154 | pendingConnection addr Session {..} = atomically $ do | ||
155 | pSet <- readTVar pendingConnections | ||
156 | eSet <- readTVar establishedConnections | ||
157 | if (addr `S.member` pSet) || (addr `M.member` eSet) | ||
158 | then return False | ||
159 | else do | ||
160 | modifyTVar' pendingConnections (S.insert addr) | ||
161 | return True | ||
162 | |||
163 | establishedConnection :: Connected Session () | ||
164 | establishedConnection = undefined --atomically $ do | ||
165 | -- pSet <- readTVar pendingConnections | ||
166 | -- eSet <- readTVar | ||
167 | undefined | ||
168 | |||
169 | finishedConnection :: Connected Session () | ||
170 | finishedConnection = return () | ||
171 | |||
172 | -- | There are no state for this connection, remove it. | ||
173 | closedConnection :: PeerAddr IP -> Session -> IO () | ||
174 | closedConnection addr Session {..} = atomically $ do | ||
175 | modifyTVar pendingConnections $ S.delete addr | ||
176 | modifyTVar establishedConnections $ M.delete addr | ||
177 | |||
178 | {----------------------------------------------------------------------- | ||
140 | -- Connections | 179 | -- Connections |
141 | -----------------------------------------------------------------------} | 180 | -----------------------------------------------------------------------} |
142 | -- TODO unmap storage on zero connections | 181 | -- TODO unmap storage on zero connections |
143 | 182 | ||
144 | insert :: PeerAddr IP | 183 | mainWire :: Wire Session () |
145 | -> {- Maybe Socket | 184 | mainWire = do |
146 | -> -} Session -> IO () | 185 | lift establishedConnection |
186 | Session {..} <- asks connSession | ||
187 | lift $ resizeBitfield (totalPieces storage) | ||
188 | logEvent "Connection established" | ||
189 | iterM logMessage =$= exchange =$= iterM logMessage | ||
190 | lift finishedConnection | ||
191 | |||
192 | getConnectionConfig :: Session -> IO (ConnectionConfig Session) | ||
193 | getConnectionConfig s @ Session {..} = undefined --ConnectionConfig | ||
194 | -- let caps = def | ||
195 | -- let ecaps = def | ||
196 | -- let hs = Handshake def caps infohash tpeerId | ||
197 | -- chan <- dupChan broadcast | ||
198 | |||
199 | -- { cfgPrefs = undefined | ||
200 | -- , cfgSession = ConnectionSession undefined undefined s | ||
201 | -- , cfgWire = mainWire | ||
202 | -- } | ||
203 | |||
204 | insert :: PeerAddr IP -> Session -> IO () | ||
147 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) | 205 | insert addr ses @ Session {..} = void $ forkIO (action `finally` cleanup) |
148 | where | 206 | where |
207 | action = do | ||
208 | pendingConnection addr ses | ||
209 | cfg <- getConnectionConfig ses | ||
210 | connectWire addr cfg | ||
211 | |||
149 | cleanup = do | 212 | cleanup = do |
150 | runStatusUpdates status (SS.resetPending addr) | 213 | runStatusUpdates status (SS.resetPending addr) |
151 | -- TODO Metata.resetPending addr | 214 | -- TODO Metata.resetPending addr |
215 | closedConnection addr ses | ||
152 | 216 | ||
153 | action = do | 217 | -- TODO closePending on error |
154 | let caps = def | 218 | attach :: PendingConnection -> Session -> IO () |
155 | let ecaps = def | 219 | attach = undefined |
156 | let hs = Handshake def caps infohash tpeerId | ||
157 | chan <- dupChan broadcast | ||
158 | connectWire ses hs addr ecaps chan $ do | ||
159 | conn <- ask | ||
160 | -- liftIO $ modifyMVar_ connections $ pure . M.insert addr conn | ||
161 | lift $ resizeBitfield (totalPieces storage) | ||
162 | logEvent "Connection established" | ||
163 | iterM logMessage =$= exchange =$= iterM logMessage | ||
164 | -- liftIO $ modifyMVar_ connections $ pure . M.delete addr | ||
165 | 220 | ||
166 | delete :: PeerAddr IP -> Session -> IO () | 221 | delete :: PeerAddr IP -> Session -> IO () |
167 | delete = undefined | 222 | delete = undefined |
diff --git a/src/Network/BitTorrent/Exchange/Wire.hs b/src/Network/BitTorrent/Exchange/Wire.hs index 27915724..d1bed146 100644 --- a/src/Network/BitTorrent/Exchange/Wire.hs +++ b/src/Network/BitTorrent/Exchange/Wire.hs | |||
@@ -7,6 +7,11 @@ | |||
7 | -- Stability : experimental | 7 | -- Stability : experimental |
8 | -- Portability : portable | 8 | -- Portability : portable |
9 | -- | 9 | -- |
10 | -- Each peer wire connection is identified by triple @(topic, | ||
11 | -- remote_addr, this_addr)@. This means that connections are the | ||
12 | -- same if and only if their 'ConnectionId' are the same. Of course, | ||
13 | -- you /must/ avoid duplicated connections. | ||
14 | -- | ||
10 | -- This module control /integrity/ of data send and received. | 15 | -- This module control /integrity/ of data send and received. |
11 | -- | 16 | -- |
12 | {-# LANGUAGE DeriveDataTypeable #-} | 17 | {-# LANGUAGE DeriveDataTypeable #-} |
@@ -21,6 +26,7 @@ module Network.BitTorrent.Exchange.Wire | |||
21 | 26 | ||
22 | -- * Connection | 27 | -- * Connection |
23 | , Connection | 28 | , Connection |
29 | , connInitiatedBy | ||
24 | 30 | ||
25 | -- ** Identity | 31 | -- ** Identity |
26 | , connRemoteAddr | 32 | , connRemoteAddr |
@@ -44,9 +50,23 @@ module Network.BitTorrent.Exchange.Wire | |||
44 | , connStats | 50 | , connStats |
45 | 51 | ||
46 | -- * Setup | 52 | -- * Setup |
47 | , runWire | 53 | , ConnectionPrefs (..) |
54 | , ConnectionSession (..) | ||
55 | , ConnectionConfig (..) | ||
56 | |||
57 | -- ** Initiate | ||
48 | , connectWire | 58 | , connectWire |
59 | |||
60 | -- ** Accept | ||
61 | , PendingConnection | ||
62 | , newPendingConnection | ||
63 | , pendingPeer | ||
64 | , pendingCaps | ||
65 | , pendingTopic | ||
66 | , closePending | ||
49 | , acceptWire | 67 | , acceptWire |
68 | |||
69 | -- ** Post setup actions | ||
50 | , resizeBitfield | 70 | , resizeBitfield |
51 | 71 | ||
52 | -- * Messaging | 72 | -- * Messaging |
@@ -143,7 +163,7 @@ data ProtocolError | |||
143 | 163 | ||
144 | -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not | 164 | -- | /Remote/ peer replied with invalid 'hsInfoHash' which do not |
145 | -- match with 'hsInfoHash' /this/ peer have sent. Can occur in | 165 | -- match with 'hsInfoHash' /this/ peer have sent. Can occur in |
146 | -- 'connectWire' only. | 166 | -- 'connectWire' or 'acceptWire' only. |
147 | | UnexpectedTopic InfoHash | 167 | | UnexpectedTopic InfoHash |
148 | 168 | ||
149 | -- | Some trackers or DHT can return 'PeerId' of a peer. If a | 169 | -- | Some trackers or DHT can return 'PeerId' of a peer. If a |
@@ -470,9 +490,20 @@ data ConnectionState = ConnectionState { | |||
470 | 490 | ||
471 | makeLenses ''ConnectionState | 491 | makeLenses ''ConnectionState |
472 | 492 | ||
493 | instance Default ConnectionState where | ||
494 | def = ConnectionState | ||
495 | { _connExtCaps = def | ||
496 | , _connRemoteEhs = def | ||
497 | , _connStats = def | ||
498 | , _connStatus = def | ||
499 | , _connBitfield = BF.haveNone 0 | ||
500 | } | ||
501 | |||
473 | -- | Connection keep various info about both peers. | 502 | -- | Connection keep various info about both peers. |
474 | data Connection s = Connection | 503 | data Connection s = Connection |
475 | { connRemoteAddr :: !(PeerAddr IP) | 504 | { connInitiatedBy :: !ChannelSide |
505 | |||
506 | , connRemoteAddr :: !(PeerAddr IP) | ||
476 | 507 | ||
477 | -- | /Both/ peers handshaked with this protocol string. The only | 508 | -- | /Both/ peers handshaked with this protocol string. The only |
478 | -- value is \"Bittorrent Protocol\" but this can be changed in | 509 | -- value is \"Bittorrent Protocol\" but this can be changed in |
@@ -561,6 +592,29 @@ initiateHandshake sock hs = do | |||
561 | sendHandshake sock hs | 592 | sendHandshake sock hs |
562 | recvHandshake sock | 593 | recvHandshake sock |
563 | 594 | ||
595 | data HandshakePair = HandshakePair | ||
596 | { handshakeSent :: !Handshake | ||
597 | , handshakeRecv :: !Handshake | ||
598 | } deriving (Show, Eq) | ||
599 | |||
600 | validatePair :: HandshakePair -> PeerAddr IP -> IO () | ||
601 | validatePair (HandshakePair hs hs') addr = Prelude.mapM_ checkProp | ||
602 | [ (def == hsProtocol hs', InvalidProtocol $ hsProtocol hs') | ||
603 | , (hsProtocol hs == hsProtocol hs', UnexpectedProtocol $ hsProtocol hs') | ||
604 | , (hsInfoHash hs == hsInfoHash hs', UnexpectedTopic $ hsInfoHash hs') | ||
605 | , (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) | ||
606 | , UnexpectedPeerId $ hsPeerId hs') | ||
607 | ] | ||
608 | where | ||
609 | checkProp (t, e) = unless t $ throwIO $ ProtocolError e | ||
610 | |||
611 | -- | Connection state /right/ after handshaking. | ||
612 | establishedStats :: HandshakePair -> ConnectionStats | ||
613 | establishedStats HandshakePair {..} = ConnectionStats | ||
614 | { outcomingFlow = FlowStats 1 $ handshakeStats handshakeSent | ||
615 | , incomingFlow = FlowStats 1 $ handshakeStats handshakeRecv | ||
616 | } | ||
617 | |||
564 | {----------------------------------------------------------------------- | 618 | {----------------------------------------------------------------------- |
565 | -- Wire | 619 | -- Wire |
566 | -----------------------------------------------------------------------} | 620 | -----------------------------------------------------------------------} |
@@ -670,53 +724,129 @@ rehandshake caps = undefined | |||
670 | reconnect :: Wire s () | 724 | reconnect :: Wire s () |
671 | reconnect = undefined | 725 | reconnect = undefined |
672 | 726 | ||
673 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | 727 | data ConnectionId = ConnectionId |
674 | -- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on | 728 | { topic :: !InfoHash |
675 | -- both sides. | 729 | , remoteAddr :: !(PeerAddr IP) |
730 | , thisAddr :: !(PeerAddr (Maybe IP)) -- ^ foreign address of this node. | ||
731 | } | ||
732 | |||
733 | -- | /Preffered/ settings of wire. To get the real use 'ask'. | ||
734 | data ConnectionPrefs = ConnectionPrefs | ||
735 | { prefOptions :: !Options | ||
736 | , prefProtocol :: !ProtocolName | ||
737 | , prefCaps :: !Caps | ||
738 | , prefExtCaps :: !ExtendedCaps | ||
739 | } | ||
740 | |||
741 | instance Default ConnectionPrefs where | ||
742 | def = ConnectionPrefs | ||
743 | { prefOptions = def | ||
744 | , prefProtocol = def | ||
745 | , prefCaps = def | ||
746 | , prefExtCaps = def | ||
747 | } | ||
748 | |||
749 | normalize :: ConnectionPrefs -> ConnectionPrefs | ||
750 | normalize = undefined | ||
751 | |||
752 | data ConnectionSession s = ConnectionSession | ||
753 | { sessionTopic :: !(InfoHash) | ||
754 | , sessionPeerId :: !(PeerId) | ||
755 | , metadataSize :: !(Maybe Int) | ||
756 | , outputChan :: !(Maybe (Chan Message)) | ||
757 | , connectionSession :: !(s) | ||
758 | } | ||
759 | |||
760 | data ConnectionConfig s = ConnectionConfig | ||
761 | { cfgPrefs :: !(ConnectionPrefs) | ||
762 | , cfgSession :: !(ConnectionSession s) | ||
763 | , cfgWire :: !(Wire s ()) | ||
764 | } | ||
765 | |||
766 | configHandshake :: ConnectionConfig s -> Handshake | ||
767 | configHandshake ConnectionConfig {..} = Handshake | ||
768 | { hsProtocol = prefProtocol cfgPrefs | ||
769 | , hsReserved = prefCaps cfgPrefs | ||
770 | , hsInfoHash = sessionTopic cfgSession | ||
771 | , hsPeerId = sessionPeerId cfgSession | ||
772 | } | ||
773 | |||
774 | {----------------------------------------------------------------------- | ||
775 | -- Pending connections | ||
776 | -----------------------------------------------------------------------} | ||
777 | |||
778 | -- | Connection in half opened state. A normal usage scenario: | ||
676 | -- | 779 | -- |
677 | -- This function can throw 'WireFailure' exception. | 780 | -- * Opened using 'newPendingConnection', usually in the listener |
781 | -- loop; | ||
678 | -- | 782 | -- |
679 | connectWire :: s -> Handshake -> PeerAddr IP -> ExtendedCaps -> Chan Message | 783 | -- * Closed using 'closePending' if 'pendingPeer' is banned, |
680 | -> Wire s () -> IO () | 784 | -- 'pendingCaps' is prohibited or pendingTopic is unknown; |
681 | connectWire session hs addr extCaps chan wire = do | 785 | -- |
682 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return | 786 | -- * Accepted using 'acceptWire' otherwise. |
683 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | 787 | -- |
684 | hs' <- initiateHandshake sock hs | 788 | data PendingConnection = PendingConnection |
789 | { pendingSock :: Socket | ||
790 | , pendingPeer :: PeerAddr IP -- ^ 'peerId' is always non empty; | ||
791 | , pendingCaps :: Caps -- ^ advertised by the peer; | ||
792 | , pendingTopic :: InfoHash -- ^ possible non-existent topic. | ||
793 | } | ||
685 | 794 | ||
686 | Prelude.mapM_ (\(t,e) -> unless t $ throwIO $ ProtocolError e) [ | 795 | -- | Reconstruct handshake sent by the remote peer. |
687 | (def == hsProtocol hs' | 796 | pendingHandshake :: PendingConnection -> Handshake |
688 | , InvalidProtocol $ hsProtocol hs'), | 797 | pendingHandshake PendingConnection {..} = Handshake |
689 | (hsProtocol hs == hsProtocol hs' | 798 | { hsProtocol = def |
690 | , UnexpectedProtocol $ hsProtocol hs'), | 799 | , hsReserved = pendingCaps |
691 | (hsInfoHash hs == hsInfoHash hs' | 800 | , hsInfoHash = pendingTopic |
692 | , UnexpectedTopic $ hsInfoHash hs'), | 801 | , hsPeerId = fromMaybe (error "pendingHandshake: impossible") |
693 | (hsPeerId hs' == fromMaybe (hsPeerId hs') (peerId addr) | 802 | (peerId pendingPeer) |
694 | , UnexpectedPeerId $ hsPeerId hs') | 803 | } |
695 | ] | ||
696 | |||
697 | let caps = hsReserved hs <> hsReserved hs' | ||
698 | wire' = if ExtExtended `allowed` caps | ||
699 | then extendedHandshake extCaps >> wire | ||
700 | else wire | ||
701 | 804 | ||
702 | cstate <- newIORef $ ConnectionState { | 805 | -- | |
703 | _connExtCaps = def | 806 | -- |
704 | , _connRemoteEhs = def | 807 | -- This function can throw 'WireFailure' exception. |
705 | , _connStats = ConnectionStats { | 808 | -- |
706 | outcomingFlow = FlowStats 1 $ handshakeStats hs | 809 | newPendingConnection :: Socket -> PeerAddr IP -> IO PendingConnection |
707 | , incomingFlow = FlowStats 1 $ handshakeStats hs' | 810 | newPendingConnection sock addr = do |
708 | } | 811 | Handshake {..} <- recvHandshake sock |
709 | , _connStatus = def | 812 | unless (hsProtocol == def) $ do |
710 | , _connBitfield = BF.haveNone 0 | 813 | throwIO $ ProtocolError $ InvalidProtocol hsProtocol |
711 | } | 814 | return PendingConnection |
815 | { pendingSock = sock | ||
816 | , pendingPeer = addr { peerId = Just hsPeerId } | ||
817 | , pendingCaps = hsReserved | ||
818 | , pendingTopic = hsInfoHash | ||
819 | } | ||
712 | 820 | ||
713 | -- TODO make KA interval configurable | 821 | -- | Release all resources associated with the given connection. Note |
714 | let kaInterval = defaultKeepAliveInterval | 822 | -- that you /must not/ 'closePending' if you 'acceptWire'. |
715 | bracket | 823 | closePending :: PendingConnection -> IO () |
716 | (forkIO $ sourceChan kaInterval chan $= conduitPut S.put $$ sinkSocket sock) | 824 | closePending PendingConnection {..} = do |
717 | (killThread) $ \ _ -> | 825 | close pendingSock |
718 | runWire wire' sock chan $ Connection | 826 | |
719 | { connRemoteAddr = addr | 827 | {----------------------------------------------------------------------- |
828 | -- Connection setup | ||
829 | -----------------------------------------------------------------------} | ||
830 | |||
831 | chanToSock :: Int -> Chan Message -> Socket -> IO () | ||
832 | chanToSock ka chan sock = | ||
833 | sourceChan ka chan $= conduitPut S.put $$ sinkSocket sock | ||
834 | |||
835 | afterHandshaking :: ChannelSide -> PeerAddr IP -> Socket -> HandshakePair | ||
836 | -> ConnectionConfig s -> IO () | ||
837 | afterHandshaking initiator addr sock | ||
838 | hpair @ (HandshakePair hs hs') | ||
839 | (ConnectionConfig | ||
840 | { cfgPrefs = ConnectionPrefs {..} | ||
841 | , cfgSession = ConnectionSession {..} | ||
842 | , cfgWire = wire | ||
843 | }) = do | ||
844 | let caps = hsReserved hs <> hsReserved hs' | ||
845 | cstate <- newIORef def { _connStats = establishedStats hpair } | ||
846 | chan <- maybe newChan return outputChan | ||
847 | let conn = Connection { | ||
848 | connInitiatedBy = initiator | ||
849 | , connRemoteAddr = addr | ||
720 | , connProtocol = hsProtocol hs | 850 | , connProtocol = hsProtocol hs |
721 | , connCaps = caps | 851 | , connCaps = caps |
722 | , connTopic = hsInfoHash hs | 852 | , connTopic = hsInfoHash hs |
@@ -724,20 +854,53 @@ connectWire session hs addr extCaps chan wire = do | |||
724 | , connThisPeerId = hsPeerId hs | 854 | , connThisPeerId = hsPeerId hs |
725 | , connOptions = def | 855 | , connOptions = def |
726 | , connState = cstate | 856 | , connState = cstate |
727 | , connSession = session | 857 | , connSession = connectionSession |
728 | , connChan = chan | 858 | , connChan = chan |
729 | } | 859 | } |
730 | 860 | ||
861 | -- TODO make KA interval configurable | ||
862 | let kaInterval = defaultKeepAliveInterval | ||
863 | wire' = if ExtExtended `allowed` caps | ||
864 | then extendedHandshake prefExtCaps >> wire | ||
865 | else wire | ||
866 | |||
867 | bracket (forkIO (chanToSock kaInterval chan sock)) | ||
868 | (killThread) | ||
869 | (\ _ -> runWire wire' sock chan conn) | ||
870 | |||
871 | -- | Initiate 'Wire' connection and handshake with a peer. This function will | ||
872 | -- also do the BEP10 extension protocol handshake if 'ExtExtended' is enabled on | ||
873 | -- both sides. | ||
874 | -- | ||
875 | -- This function can throw 'WireFailure' exception. | ||
876 | -- | ||
877 | connectWire :: PeerAddr IP -> ConnectionConfig s -> IO () | ||
878 | connectWire addr cfg = do | ||
879 | let catchRefusal m = try m >>= either (throwIO . ConnectionRefused) return | ||
880 | bracket (catchRefusal (peerSocket Stream addr)) close $ \ sock -> do | ||
881 | let hs = configHandshake cfg | ||
882 | hs' <- initiateHandshake sock hs | ||
883 | let hpair = HandshakePair hs hs' | ||
884 | validatePair hpair addr | ||
885 | afterHandshaking ThisPeer addr sock hpair cfg | ||
886 | |||
731 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed | 887 | -- | Accept 'Wire' connection using already 'Network.Socket.accept'ed |
732 | -- socket. For peer listener loop the 'acceptSafe' should be | 888 | -- socket. For peer listener loop the 'acceptSafe' should be |
733 | -- prefered against 'accept'. The socket will be closed at exit. | 889 | -- prefered against 'accept'. The socket will be closed at exit. |
734 | -- | 890 | -- |
735 | -- This function can throw 'WireFailure' exception. | 891 | -- This function can throw 'WireFailure' exception. |
736 | -- | 892 | -- |
737 | acceptWire :: Socket -> PeerAddr IP -> Wire s () -> IO () | 893 | acceptWire :: PendingConnection -> ConnectionConfig s -> IO () |
738 | acceptWire sock peerAddr wire = do | 894 | acceptWire pc @ PendingConnection {..} cfg = do |
739 | bracket (return sock) close $ \ _ -> do | 895 | bracket (return pendingSock) close $ \ _ -> do |
740 | error "acceptWire: not implemented" | 896 | unless (sessionTopic (cfgSession cfg) == pendingTopic) $ do |
897 | throwIO (ProtocolError (UnexpectedTopic pendingTopic)) | ||
898 | |||
899 | let hs = configHandshake cfg | ||
900 | sendHandshake pendingSock hs | ||
901 | let hpair = HandshakePair hs (pendingHandshake pc) | ||
902 | |||
903 | afterHandshaking RemotePeer pendingPeer pendingSock hpair cfg | ||
741 | 904 | ||
742 | -- | Used when size of bitfield becomes known. | 905 | -- | Used when size of bitfield becomes known. |
743 | resizeBitfield :: Int -> Connected s () | 906 | resizeBitfield :: Int -> Connected s () |