diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Network/BitTorrent.hs | 36 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 336 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Protocol.hs | 339 | ||||
-rw-r--r-- | src/Network/BitTorrent/Discovery.hs | 56 | ||||
-rw-r--r-- | src/Network/BitTorrent/Internal.lhs | 82 |
5 files changed, 469 insertions, 380 deletions
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs index 3602dd7e..06df77dd 100644 --- a/src/Network/BitTorrent.hs +++ b/src/Network/BitTorrent.hs | |||
@@ -17,8 +17,7 @@ module Network.BitTorrent | |||
17 | -- ** Client | 17 | -- ** Client |
18 | , ClientSession( clientPeerId, allowedExtensions ) | 18 | , ClientSession( clientPeerId, allowedExtensions ) |
19 | 19 | ||
20 | , newClient | 20 | , withDefaultClient |
21 | , defaultClient | ||
22 | 21 | ||
23 | , Progress(..) | 22 | , Progress(..) |
24 | , getCurrentProgress | 23 | , getCurrentProgress |
@@ -91,36 +90,25 @@ import Network | |||
91 | import Data.Bitfield as BF | 90 | import Data.Bitfield as BF |
92 | import Data.Torrent | 91 | import Data.Torrent |
93 | import Network.BitTorrent.Internal | 92 | import Network.BitTorrent.Internal |
93 | import Network.BitTorrent.Peer | ||
94 | import Network.BitTorrent.Extension | ||
94 | import Network.BitTorrent.Exchange | 95 | import Network.BitTorrent.Exchange |
95 | import Network.BitTorrent.Exchange.Protocol | 96 | import Network.BitTorrent.Exchange.Protocol |
96 | import Network.BitTorrent.Tracker | 97 | import Network.BitTorrent.Tracker |
97 | import Network.BitTorrent.Extension | 98 | import Network.BitTorrent.Discovery |
98 | import Network.BitTorrent.Peer | ||
99 | 99 | ||
100 | import System.Torrent.Storage | 100 | import System.Torrent.Storage |
101 | 101 | ||
102 | -- TODO remove fork from Network.BitTorrent.Exchange | ||
103 | -- TODO make all forks in Internal. | ||
102 | 104 | ||
103 | -- | Client session with default parameters. Use it for testing only. | 105 | -- | Client session with default parameters. Use it for testing only. |
104 | defaultClient :: IO ClientSession | 106 | withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO () |
105 | defaultClient = newClient defaultThreadCount defaultExtensions | 107 | withDefaultClient dhtPort listPort action = do |
106 | 108 | withClientSession defaultThreadCount defaultExtensions $ \client -> do | |
107 | -- discover should hide tracker and DHT communication under the hood | 109 | startListener client listPort |
108 | -- thus we can obtain an unified interface | 110 | startDHT client dhtPort |
109 | 111 | action client | |
110 | discover :: SwarmSession -> P2P () -> IO () | ||
111 | discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | ||
112 | let conn = TConnection (tAnnounce torrentMeta) | ||
113 | (tInfoHash torrentMeta) | ||
114 | (clientPeerId clientSession) | ||
115 | (listenerPort clientSession) | ||
116 | |||
117 | progress <- getCurrentProgress clientSession | ||
118 | |||
119 | withTracker progress conn $ \tses -> do | ||
120 | forever $ do | ||
121 | addr <- getPeerAddr tses | ||
122 | spawnP2P swarm addr $ do | ||
123 | action | ||
124 | 112 | ||
125 | {----------------------------------------------------------------------- | 113 | {----------------------------------------------------------------------- |
126 | Torrent management | 114 | Torrent management |
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index f3c993c3..b0aac002 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -1,338 +1,6 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT | 1 | module Network.BitTorrent.DHT |
4 | ( | 2 | ( newNodeSession |
5 | newNodeSession | ||
6 | |||
7 | -- * Tracker | ||
8 | , ping | ||
9 | , findNode | ||
10 | , getPeers | ||
11 | , announcePeer | ||
12 | |||
13 | -- * Server | ||
14 | , dhtServer | 3 | , dhtServer |
15 | ) where | 4 | ) where |
16 | 5 | ||
17 | import Control.Applicative | 6 | import Network.BitTorrent.DHT.Protocol \ No newline at end of file |
18 | import Control.Concurrent | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Monad | ||
21 | import Control.Exception | ||
22 | import Data.ByteString | ||
23 | import Data.Serialize as S | ||
24 | import Data.Function | ||
25 | import Data.Ord | ||
26 | import Data.Maybe | ||
27 | import Data.List as L | ||
28 | import Data.Map as M | ||
29 | import Data.HashMap.Strict as HM | ||
30 | import Network | ||
31 | import Network.Socket | ||
32 | import System.Entropy | ||
33 | |||
34 | import Remote.KRPC | ||
35 | import Remote.KRPC.Protocol | ||
36 | import Data.BEncode | ||
37 | import Data.Torrent | ||
38 | import Network.BitTorrent.Peer | ||
39 | |||
40 | {----------------------------------------------------------------------- | ||
41 | Node | ||
42 | -----------------------------------------------------------------------} | ||
43 | |||
44 | type NodeId = ByteString | ||
45 | |||
46 | -- WARN is the 'system' random suitable for this? | ||
47 | -- | Generate random NodeID used for the entire session. | ||
48 | -- Distribution of ID's should be as uniform as possible. | ||
49 | -- | ||
50 | genNodeId :: IO NodeId | ||
51 | genNodeId = getEntropy 20 | ||
52 | |||
53 | instance Serialize PortNumber where | ||
54 | get = fromIntegral <$> getWord16be | ||
55 | put = putWord16be . fromIntegral | ||
56 | |||
57 | |||
58 | data NodeAddr = NodeAddr { | ||
59 | nodeIP :: {-# UNPACK #-} !HostAddress | ||
60 | , nodePort :: {-# UNPACK #-} !PortNumber | ||
61 | } deriving (Show, Eq) | ||
62 | |||
63 | instance Serialize NodeAddr where | ||
64 | get = NodeAddr <$> getWord32be <*> get | ||
65 | put NodeAddr {..} = do | ||
66 | putWord32be nodeIP | ||
67 | put nodePort | ||
68 | |||
69 | |||
70 | data NodeInfo = NodeInfo { | ||
71 | nodeID :: !NodeId | ||
72 | , nodeAddr :: !NodeAddr | ||
73 | } deriving (Show, Eq) | ||
74 | |||
75 | instance Serialize NodeInfo where | ||
76 | get = NodeInfo <$> getByteString 20 <*> get | ||
77 | put NodeInfo {..} = put nodeID >> put nodeAddr | ||
78 | |||
79 | type CompactInfo = ByteString | ||
80 | |||
81 | decodeCompact :: CompactInfo -> [NodeInfo] | ||
82 | decodeCompact = either (const []) id . S.runGet (many get) | ||
83 | |||
84 | encodeCompact :: [NodeId] -> CompactInfo | ||
85 | encodeCompact = S.runPut . mapM_ put | ||
86 | |||
87 | decodePeerList :: [BEncode] -> [PeerAddr] | ||
88 | decodePeerList = undefined | ||
89 | |||
90 | encodePeerList :: [PeerAddr] -> [BEncode] | ||
91 | encodePeerList = undefined | ||
92 | |||
93 | type Distance = NodeId | ||
94 | |||
95 | |||
96 | {----------------------------------------------------------------------- | ||
97 | Tokens | ||
98 | -----------------------------------------------------------------------} | ||
99 | |||
100 | type Secret = Int | ||
101 | |||
102 | genSecret :: IO Secret | ||
103 | genSecret = undefined | ||
104 | |||
105 | -- | Instead of periodically loop over the all nodes in the routing | ||
106 | -- table with some given interval (or some other tricky method | ||
107 | -- e.g. using timeouts) we can just update tokens on demand - if no | ||
108 | -- one asks for a token then the token _should_ not change at all. | ||
109 | -- | ||
110 | type Token = ByteString | ||
111 | |||
112 | genToken :: NodeAddr -> Secret -> Token | ||
113 | genToken = return undefined | ||
114 | |||
115 | defaultToken :: Token | ||
116 | defaultToken = "0xdeadbeef" | ||
117 | |||
118 | {----------------------------------------------------------------------- | ||
119 | Routing table | ||
120 | -----------------------------------------------------------------------} | ||
121 | |||
122 | type ContactInfo = HashMap InfoHash [PeerAddr] | ||
123 | |||
124 | insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo | ||
125 | insertPeer ih addr = HM.insertWith (++) ih [addr] | ||
126 | |||
127 | lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] | ||
128 | lookupPeers ih = fromMaybe [] . HM.lookup ih | ||
129 | |||
130 | -- TODO use more compact routing table | ||
131 | type RoutingTable = HashMap NodeId NodeAddr | ||
132 | |||
133 | insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable | ||
134 | insertNode = HM.insert | ||
135 | |||
136 | type Alpha = Int | ||
137 | |||
138 | defaultAlpha :: Alpha | ||
139 | defaultAlpha = 8 | ||
140 | |||
141 | -- TODO | ||
142 | kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] | ||
143 | kclosest = undefined | ||
144 | |||
145 | {----------------------------------------------------------------------- | ||
146 | Node session | ||
147 | -----------------------------------------------------------------------} | ||
148 | |||
149 | data NodeSession = NodeSession { | ||
150 | nodeId :: !NodeId | ||
151 | , routingTable :: !(TVar RoutingTable) | ||
152 | , contactInfo :: !(TVar ContactInfo) | ||
153 | , alpha :: !Alpha | ||
154 | , listenerPort :: !PortNumber | ||
155 | } | ||
156 | |||
157 | instance Eq NodeSession where | ||
158 | (==) = (==) `on` nodeId | ||
159 | |||
160 | instance Ord NodeSession where | ||
161 | compare = comparing nodeId | ||
162 | |||
163 | newNodeSession :: PortNumber -> IO NodeSession | ||
164 | newNodeSession lport | ||
165 | = NodeSession | ||
166 | <$> genNodeId | ||
167 | <*> newTVarIO HM.empty | ||
168 | <*> newTVarIO HM.empty | ||
169 | <*> pure defaultAlpha | ||
170 | <*> pure lport | ||
171 | |||
172 | assignToken :: NodeSession -> NodeId -> IO Token | ||
173 | assignToken _ _ = return "" | ||
174 | |||
175 | -- TODO | ||
176 | checkToken :: NodeId -> Token -> NodeSession -> IO Bool | ||
177 | checkToken nid token _ = return True | ||
178 | |||
179 | updateTimestamp :: NodeSession -> NodeId -> IO () | ||
180 | updateTimestamp = error "updateTimestamp" | ||
181 | |||
182 | updateToken :: NodeSession -> NodeId -> Token -> IO () | ||
183 | updateToken _ _ _ = error "updateToken" | ||
184 | |||
185 | {----------------------------------------------------------------------- | ||
186 | DHT Queries | ||
187 | -----------------------------------------------------------------------} | ||
188 | |||
189 | pingM :: Method NodeId NodeId | ||
190 | pingM = method "ping" ["id"] ["id"] | ||
191 | |||
192 | findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) | ||
193 | findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] | ||
194 | |||
195 | -- | Lookup peers by a torrent infohash. This method might return | ||
196 | -- different kind of responses depending on the routing table of | ||
197 | -- queried node: | ||
198 | -- | ||
199 | -- * If quieried node contains a peer list for the given infohash | ||
200 | -- then the node should return the list in a "value" key. Note that | ||
201 | -- list is encoded as compact peer address, not a compact node info. | ||
202 | -- The result of 'get_peers' method have the following scheme: | ||
203 | -- | ||
204 | -- > { "id" : "dht_server_node_id" | ||
205 | -- > , "token" : "assigned_token" | ||
206 | -- > , "values" : ["_IP_PO", "_ip_po"] | ||
207 | -- > } | ||
208 | -- | ||
209 | -- * If quieried node does not contain a list of peers associated | ||
210 | -- with the given infohash, then node should return | ||
211 | -- | ||
212 | -- > { "id" : "dht_server_node_id" | ||
213 | -- > , "token" : "assigned_token" | ||
214 | -- > , "nodes" : "compact_nodes_info" | ||
215 | -- > } | ||
216 | -- | ||
217 | -- The resulting dictionaries might differ only in a values\/nodes | ||
218 | -- keys. | ||
219 | -- | ||
220 | getPeersM :: Method (NodeId, InfoHash) BEncode | ||
221 | getPeersM = method "get_peers" ["id", "info_hash"] [] | ||
222 | |||
223 | -- | Used to announce that the peer, controlling the quering node is | ||
224 | -- downloading a torrent on a port. | ||
225 | announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId | ||
226 | announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] | ||
227 | |||
228 | {----------------------------------------------------------------------- | ||
229 | DHT Tracker | ||
230 | -----------------------------------------------------------------------} | ||
231 | -- TODO: update node timestamp on each successful call | ||
232 | |||
233 | -- | Note that tracker side query functions could throw RPCException. | ||
234 | type DHT a b = NodeSession -> NodeAddr -> a -> IO b | ||
235 | |||
236 | ping :: DHT () () | ||
237 | ping NodeSession {..} addr @ NodeAddr {..} () = do | ||
238 | nid <- call (nodeIP, nodePort) pingM nodeId | ||
239 | atomically $ modifyTVar' routingTable $ HM.insert nid addr | ||
240 | |||
241 | findNode :: DHT NodeId [NodeInfo] | ||
242 | findNode ses @ NodeSession {..} NodeAddr {..} qnid = do | ||
243 | (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) | ||
244 | updateTimestamp ses nid | ||
245 | return (decodeCompact info) | ||
246 | |||
247 | getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) | ||
248 | getPeers ses @ NodeSession {..} NodeAddr {..} ih = do | ||
249 | resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) | ||
250 | (nid, tok, res) <- extrResp resp | ||
251 | updateTimestamp ses nid | ||
252 | updateToken ses nid tok | ||
253 | return res | ||
254 | where | ||
255 | extrResp (BDict d) | ||
256 | | Just (BString nid ) <- M.lookup "id" d | ||
257 | , Just (BString tok ) <- M.lookup "token" d | ||
258 | , Just (BList values) <- M.lookup "values" d | ||
259 | = return $ (nid, tok, Right $ decodePeerList values) | ||
260 | |||
261 | | Just (BString nid ) <- M.lookup "id" d | ||
262 | , Just (BString tok ) <- M.lookup "token" d | ||
263 | , Just (BString nodes) <- M.lookup "nodes" d | ||
264 | = return (nid, tok, Left $ decodeCompact nodes) | ||
265 | |||
266 | extrResp _ = throw $ RPCException msg | ||
267 | where msg = ProtocolError "unable to extract getPeers resp" | ||
268 | |||
269 | -- remove token from signature, handle the all token stuff by NodeSession | ||
270 | |||
271 | -- | Note that before ever calling this method you should call the | ||
272 | -- getPeerList. | ||
273 | announcePeer :: DHT (InfoHash, Token) NodeId | ||
274 | announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do | ||
275 | nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) | ||
276 | updateTimestamp ses nid | ||
277 | return nid | ||
278 | |||
279 | {----------------------------------------------------------------------- | ||
280 | DHT Server | ||
281 | -----------------------------------------------------------------------} | ||
282 | -- TODO: update node timestamp on each successful call | ||
283 | -- NOTE: ensure all server operations run in O(1) | ||
284 | |||
285 | type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b | ||
286 | |||
287 | pingS :: ServerHandler NodeId NodeId | ||
288 | pingS NodeSession {..} addr nid = do | ||
289 | atomically $ modifyTVar' routingTable $ insertNode nid addr | ||
290 | return nodeId | ||
291 | |||
292 | findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) | ||
293 | findNodeS ses @ NodeSession {..} _ (nid, qnid) = do | ||
294 | updateTimestamp ses nid | ||
295 | rt <- atomically $ readTVar routingTable | ||
296 | return (nodeId, encodeCompact $ kclosest alpha qnid rt) | ||
297 | |||
298 | getPeersS :: ServerHandler (NodeId, InfoHash) BEncode | ||
299 | getPeersS ses @ NodeSession {..} _ (nid, ih) = do | ||
300 | updateTimestamp ses nid | ||
301 | mkResp <$> assignToken ses nid <*> findPeers | ||
302 | where | ||
303 | findPeers = do | ||
304 | list <- lookupPeers ih <$> readTVarIO contactInfo | ||
305 | if not (L.null list) | ||
306 | then return $ Right list | ||
307 | else do | ||
308 | rt <- readTVarIO routingTable | ||
309 | let nodes = kclosest alpha (getInfoHash ih) rt | ||
310 | return $ Left nodes | ||
311 | |||
312 | mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] | ||
313 | mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) | ||
314 | mkResult (Right values) = ("values", BList $ encodePeerList values) | ||
315 | mkResp tok = BDict . M.fromList . mkDict tok . mkResult | ||
316 | |||
317 | announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId | ||
318 | announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do | ||
319 | updateTimestamp ses nid | ||
320 | registered <- checkToken nid token ses | ||
321 | when registered $ do | ||
322 | atomically $ do | ||
323 | let peerAddr = PeerAddr Nothing nodeIP port | ||
324 | modifyTVar contactInfo $ insertPeer ih peerAddr | ||
325 | return nodeId | ||
326 | |||
327 | dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () | ||
328 | dhtTracker = undefined | ||
329 | |||
330 | dhtServer :: NodeSession -> PortNumber -> IO () | ||
331 | dhtServer s p = server p methods | ||
332 | where | ||
333 | methods = | ||
334 | [ pingM ==> pingS s undefined | ||
335 | , findNodeM ==> findNodeS s undefined | ||
336 | , getPeersM ==> getPeersS s undefined | ||
337 | , announcePeerM ==> announcePeerS s undefined | ||
338 | ] \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs new file mode 100644 index 00000000..5267a916 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Protocol.hs | |||
@@ -0,0 +1,339 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT.Protocol | ||
4 | ( | ||
5 | newNodeSession | ||
6 | |||
7 | -- * Tracker | ||
8 | , ping | ||
9 | , findNode | ||
10 | , getPeers | ||
11 | , announcePeer | ||
12 | |||
13 | -- * Server | ||
14 | , dhtServer | ||
15 | ) where | ||
16 | |||
17 | import Control.Applicative | ||
18 | import Control.Concurrent | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Monad | ||
21 | import Control.Exception | ||
22 | import Data.ByteString | ||
23 | import Data.Serialize as S | ||
24 | import Data.Function | ||
25 | import Data.Ord | ||
26 | import Data.Maybe | ||
27 | import Data.List as L | ||
28 | import Data.Map as M | ||
29 | import Data.HashMap.Strict as HM | ||
30 | import Network | ||
31 | import Network.Socket | ||
32 | import System.Entropy | ||
33 | |||
34 | import Remote.KRPC | ||
35 | import Remote.KRPC.Protocol | ||
36 | import Data.BEncode | ||
37 | import Data.Torrent | ||
38 | import Network.BitTorrent.Peer | ||
39 | |||
40 | {----------------------------------------------------------------------- | ||
41 | Node | ||
42 | -----------------------------------------------------------------------} | ||
43 | |||
44 | type NodeId = ByteString | ||
45 | |||
46 | -- WARN is the 'system' random suitable for this? | ||
47 | -- | Generate random NodeID used for the entire session. | ||
48 | -- Distribution of ID's should be as uniform as possible. | ||
49 | -- | ||
50 | genNodeId :: IO NodeId | ||
51 | genNodeId = getEntropy 20 | ||
52 | |||
53 | instance Serialize PortNumber where | ||
54 | get = fromIntegral <$> getWord16be | ||
55 | put = putWord16be . fromIntegral | ||
56 | |||
57 | |||
58 | data NodeAddr = NodeAddr { | ||
59 | nodeIP :: {-# UNPACK #-} !HostAddress | ||
60 | , nodePort :: {-# UNPACK #-} !PortNumber | ||
61 | } deriving (Show, Eq) | ||
62 | |||
63 | instance Serialize NodeAddr where | ||
64 | get = NodeAddr <$> getWord32be <*> get | ||
65 | put NodeAddr {..} = do | ||
66 | putWord32be nodeIP | ||
67 | put nodePort | ||
68 | |||
69 | |||
70 | data NodeInfo = NodeInfo { | ||
71 | nodeID :: !NodeId | ||
72 | , nodeAddr :: !NodeAddr | ||
73 | } deriving (Show, Eq) | ||
74 | |||
75 | instance Serialize NodeInfo where | ||
76 | get = NodeInfo <$> getByteString 20 <*> get | ||
77 | put NodeInfo {..} = put nodeID >> put nodeAddr | ||
78 | |||
79 | type CompactInfo = ByteString | ||
80 | |||
81 | decodeCompact :: CompactInfo -> [NodeInfo] | ||
82 | decodeCompact = either (const []) id . S.runGet (many get) | ||
83 | |||
84 | encodeCompact :: [NodeId] -> CompactInfo | ||
85 | encodeCompact = S.runPut . mapM_ put | ||
86 | |||
87 | decodePeerList :: [BEncode] -> [PeerAddr] | ||
88 | decodePeerList = undefined | ||
89 | |||
90 | encodePeerList :: [PeerAddr] -> [BEncode] | ||
91 | encodePeerList = undefined | ||
92 | |||
93 | type Distance = NodeId | ||
94 | |||
95 | {----------------------------------------------------------------------- | ||
96 | Tokens | ||
97 | -----------------------------------------------------------------------} | ||
98 | |||
99 | type Secret = Int | ||
100 | |||
101 | genSecret :: IO Secret | ||
102 | genSecret = error "secret" | ||
103 | |||
104 | -- | Instead of periodically loop over the all nodes in the routing | ||
105 | -- table with some given interval (or some other tricky method | ||
106 | -- e.g. using timeouts) we can just update tokens on demand - if no | ||
107 | -- one asks for a token then the token _should_ not change at all. | ||
108 | -- | ||
109 | type Token = ByteString | ||
110 | |||
111 | defaultToken :: Token | ||
112 | defaultToken = "0xdeadbeef" | ||
113 | |||
114 | genToken :: NodeAddr -> Secret -> Token | ||
115 | genToken _ _ = defaultToken | ||
116 | |||
117 | {----------------------------------------------------------------------- | ||
118 | Routing table | ||
119 | -----------------------------------------------------------------------} | ||
120 | |||
121 | type ContactInfo = HashMap InfoHash [PeerAddr] | ||
122 | |||
123 | insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo | ||
124 | insertPeer ih addr = HM.insertWith (++) ih [addr] | ||
125 | |||
126 | lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] | ||
127 | lookupPeers ih = fromMaybe [] . HM.lookup ih | ||
128 | |||
129 | -- TODO use more compact routing table | ||
130 | type RoutingTable = HashMap NodeId NodeAddr | ||
131 | |||
132 | insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable | ||
133 | insertNode = HM.insert | ||
134 | |||
135 | type Alpha = Int | ||
136 | |||
137 | defaultAlpha :: Alpha | ||
138 | defaultAlpha = 8 | ||
139 | |||
140 | -- TODO | ||
141 | kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] | ||
142 | kclosest = undefined | ||
143 | |||
144 | {----------------------------------------------------------------------- | ||
145 | Node session | ||
146 | -----------------------------------------------------------------------} | ||
147 | |||
148 | data NodeSession = NodeSession { | ||
149 | nodeId :: !NodeId | ||
150 | , routingTable :: !(TVar RoutingTable) | ||
151 | , contactInfo :: !(TVar ContactInfo) | ||
152 | -- , currentSecret :: !(TVar Secret) | ||
153 | -- , secretTimestamp :: !(TVar Timestamp) | ||
154 | , alpha :: !Alpha | ||
155 | , listenerPort :: !PortNumber | ||
156 | } | ||
157 | |||
158 | instance Eq NodeSession where | ||
159 | (==) = (==) `on` nodeId | ||
160 | |||
161 | instance Ord NodeSession where | ||
162 | compare = comparing nodeId | ||
163 | |||
164 | newNodeSession :: PortNumber -> IO NodeSession | ||
165 | newNodeSession lport | ||
166 | = NodeSession | ||
167 | <$> genNodeId | ||
168 | <*> newTVarIO HM.empty | ||
169 | <*> newTVarIO HM.empty | ||
170 | <*> pure defaultAlpha | ||
171 | <*> pure lport | ||
172 | |||
173 | assignToken :: NodeSession -> NodeId -> IO Token | ||
174 | assignToken _ _ = return "" | ||
175 | |||
176 | -- TODO | ||
177 | checkToken :: NodeId -> Token -> NodeSession -> IO Bool | ||
178 | checkToken nid token _ = return True | ||
179 | |||
180 | updateTimestamp :: NodeSession -> NodeId -> IO () | ||
181 | updateTimestamp = error "updateTimestamp" | ||
182 | |||
183 | updateToken :: NodeSession -> NodeId -> Token -> IO () | ||
184 | updateToken _ _ _ = error "updateToken" | ||
185 | |||
186 | {----------------------------------------------------------------------- | ||
187 | DHT Queries | ||
188 | -----------------------------------------------------------------------} | ||
189 | |||
190 | pingM :: Method NodeId NodeId | ||
191 | pingM = method "ping" ["id"] ["id"] | ||
192 | |||
193 | findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) | ||
194 | findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] | ||
195 | |||
196 | -- | Lookup peers by a torrent infohash. This method might return | ||
197 | -- different kind of responses depending on the routing table of | ||
198 | -- queried node: | ||
199 | -- | ||
200 | -- * If quieried node contains a peer list for the given infohash | ||
201 | -- then the node should return the list in a "value" key. Note that | ||
202 | -- list is encoded as compact peer address, not a compact node info. | ||
203 | -- The result of 'get_peers' method have the following scheme: | ||
204 | -- | ||
205 | -- > { "id" : "dht_server_node_id" | ||
206 | -- > , "token" : "assigned_token" | ||
207 | -- > , "values" : ["_IP_PO", "_ip_po"] | ||
208 | -- > } | ||
209 | -- | ||
210 | -- * If quieried node does not contain a list of peers associated | ||
211 | -- with the given infohash, then node should return | ||
212 | -- | ||
213 | -- > { "id" : "dht_server_node_id" | ||
214 | -- > , "token" : "assigned_token" | ||
215 | -- > , "nodes" : "compact_nodes_info" | ||
216 | -- > } | ||
217 | -- | ||
218 | -- The resulting dictionaries might differ only in a values\/nodes | ||
219 | -- keys. | ||
220 | -- | ||
221 | getPeersM :: Method (NodeId, InfoHash) BEncode | ||
222 | getPeersM = method "get_peers" ["id", "info_hash"] [] | ||
223 | |||
224 | -- | Used to announce that the peer, controlling the quering node is | ||
225 | -- downloading a torrent on a port. | ||
226 | announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId | ||
227 | announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] | ||
228 | |||
229 | {----------------------------------------------------------------------- | ||
230 | DHT Tracker | ||
231 | -----------------------------------------------------------------------} | ||
232 | -- TODO: update node timestamp on each successful call | ||
233 | |||
234 | -- | Note that tracker side query functions could throw RPCException. | ||
235 | type DHT a b = NodeSession -> NodeAddr -> a -> IO b | ||
236 | |||
237 | ping :: DHT () () | ||
238 | ping NodeSession {..} addr @ NodeAddr {..} () = do | ||
239 | nid <- call (nodeIP, nodePort) pingM nodeId | ||
240 | atomically $ modifyTVar' routingTable $ HM.insert nid addr | ||
241 | |||
242 | findNode :: DHT NodeId [NodeInfo] | ||
243 | findNode ses @ NodeSession {..} NodeAddr {..} qnid = do | ||
244 | (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) | ||
245 | updateTimestamp ses nid | ||
246 | return (decodeCompact info) | ||
247 | |||
248 | getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) | ||
249 | getPeers ses @ NodeSession {..} NodeAddr {..} ih = do | ||
250 | resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) | ||
251 | (nid, tok, res) <- extrResp resp | ||
252 | updateTimestamp ses nid | ||
253 | updateToken ses nid tok | ||
254 | return res | ||
255 | where | ||
256 | extrResp (BDict d) | ||
257 | | Just (BString nid ) <- M.lookup "id" d | ||
258 | , Just (BString tok ) <- M.lookup "token" d | ||
259 | , Just (BList values) <- M.lookup "values" d | ||
260 | = return $ (nid, tok, Right $ decodePeerList values) | ||
261 | |||
262 | | Just (BString nid ) <- M.lookup "id" d | ||
263 | , Just (BString tok ) <- M.lookup "token" d | ||
264 | , Just (BString nodes) <- M.lookup "nodes" d | ||
265 | = return (nid, tok, Left $ decodeCompact nodes) | ||
266 | |||
267 | extrResp _ = throw $ RPCException msg | ||
268 | where msg = ProtocolError "unable to extract getPeers resp" | ||
269 | |||
270 | -- remove token from signature, handle the all token stuff by NodeSession | ||
271 | |||
272 | -- | Note that before ever calling this method you should call the | ||
273 | -- getPeerList. | ||
274 | announcePeer :: DHT (InfoHash, Token) NodeId | ||
275 | announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do | ||
276 | nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) | ||
277 | updateTimestamp ses nid | ||
278 | return nid | ||
279 | |||
280 | {----------------------------------------------------------------------- | ||
281 | DHT Server | ||
282 | -----------------------------------------------------------------------} | ||
283 | -- TODO: update node timestamp on each successful call | ||
284 | -- NOTE: ensure all server operations run in O(1) | ||
285 | |||
286 | type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b | ||
287 | |||
288 | pingS :: ServerHandler NodeId NodeId | ||
289 | pingS NodeSession {..} addr nid = do | ||
290 | atomically $ modifyTVar' routingTable $ insertNode nid addr | ||
291 | return nodeId | ||
292 | |||
293 | findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) | ||
294 | findNodeS ses @ NodeSession {..} _ (nid, qnid) = do | ||
295 | updateTimestamp ses nid | ||
296 | rt <- atomically $ readTVar routingTable | ||
297 | return (nodeId, encodeCompact $ kclosest alpha qnid rt) | ||
298 | |||
299 | getPeersS :: ServerHandler (NodeId, InfoHash) BEncode | ||
300 | getPeersS ses @ NodeSession {..} _ (nid, ih) = do | ||
301 | updateTimestamp ses nid | ||
302 | mkResp <$> assignToken ses nid <*> findPeers | ||
303 | where | ||
304 | findPeers = do | ||
305 | list <- lookupPeers ih <$> readTVarIO contactInfo | ||
306 | if not (L.null list) | ||
307 | then return $ Right list | ||
308 | else do | ||
309 | rt <- readTVarIO routingTable | ||
310 | let nodes = kclosest alpha (getInfoHash ih) rt | ||
311 | return $ Left nodes | ||
312 | |||
313 | mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] | ||
314 | mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) | ||
315 | mkResult (Right values) = ("values", BList $ encodePeerList values) | ||
316 | mkResp tok = BDict . M.fromList . mkDict tok . mkResult | ||
317 | |||
318 | announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId | ||
319 | announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do | ||
320 | updateTimestamp ses nid | ||
321 | registered <- checkToken nid token ses | ||
322 | when registered $ do | ||
323 | atomically $ do | ||
324 | let peerAddr = PeerAddr Nothing nodeIP port | ||
325 | modifyTVar contactInfo $ insertPeer ih peerAddr | ||
326 | return nodeId | ||
327 | |||
328 | dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () | ||
329 | dhtTracker = undefined | ||
330 | |||
331 | dhtServer :: NodeSession -> PortNumber -> IO () | ||
332 | dhtServer s p = server p methods | ||
333 | where | ||
334 | methods = | ||
335 | [ pingM ==> pingS s undefined | ||
336 | , findNodeM ==> findNodeS s undefined | ||
337 | , getPeersM ==> getPeersS s undefined | ||
338 | , announcePeerM ==> announcePeerS s undefined | ||
339 | ] \ No newline at end of file | ||
diff --git a/src/Network/BitTorrent/Discovery.hs b/src/Network/BitTorrent/Discovery.hs new file mode 100644 index 00000000..770ae818 --- /dev/null +++ b/src/Network/BitTorrent/Discovery.hs | |||
@@ -0,0 +1,56 @@ | |||
1 | {-# LANGUAGE RecordWildCards #-} | ||
2 | {-# LANGUAGE OverloadedStrings #-} | ||
3 | module Network.BitTorrent.Discovery | ||
4 | (discover, startListener, startDHT | ||
5 | ) where | ||
6 | |||
7 | import Control.Monad | ||
8 | import Control.Concurrent | ||
9 | import Control.Exception | ||
10 | import Network.Socket | ||
11 | |||
12 | import Data.Torrent | ||
13 | import Network.BitTorrent.Peer | ||
14 | import Network.BitTorrent.Internal | ||
15 | import Network.BitTorrent.Exchange | ||
16 | import Network.BitTorrent.Tracker | ||
17 | import Network.BitTorrent.DHT | ||
18 | |||
19 | |||
20 | -- discover should hide tracker and DHT communication under the hood | ||
21 | -- thus we can obtain an unified interface | ||
22 | |||
23 | discover :: SwarmSession -> P2P () -> IO () | ||
24 | discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do | ||
25 | port <- listenerPort clientSession | ||
26 | |||
27 | let conn = TConnection { | ||
28 | tconnAnnounce = tAnnounce torrentMeta | ||
29 | , tconnInfoHash = tInfoHash torrentMeta | ||
30 | , tconnPeerId = clientPeerId clientSession | ||
31 | , tconnPort = port | ||
32 | } | ||
33 | |||
34 | progress <- getCurrentProgress clientSession | ||
35 | |||
36 | withTracker progress conn $ \tses -> do | ||
37 | forever $ do | ||
38 | addr <- getPeerAddr tses | ||
39 | spawnP2P swarm addr $ do | ||
40 | action | ||
41 | |||
42 | startListener :: ClientSession -> PortNumber -> IO () | ||
43 | startListener cs @ ClientSession {..} port = | ||
44 | putMVar peerListener =<< startService port (listener cs (error "listener")) | ||
45 | |||
46 | startDHT :: ClientSession -> PortNumber -> IO () | ||
47 | startDHT ClientSession {..} nodePort = do | ||
48 | maybe failure start =<< tryTakeMVar peerListener | ||
49 | where | ||
50 | start ClientService {..} = do | ||
51 | ses <- newNodeSession servPort | ||
52 | serv <- startService nodePort (dhtServer ses) | ||
53 | putMVar nodeListener serv | ||
54 | |||
55 | failure = throwIO $ userError msg | ||
56 | msg = "unable to start DHT server: peer listener is not running" | ||
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs index 15606c57..8461a841 100644 --- a/src/Network/BitTorrent/Internal.lhs +++ b/src/Network/BitTorrent/Internal.lhs | |||
@@ -23,10 +23,18 @@ | |||
23 | > ( -- * Progress | 23 | > ( -- * Progress |
24 | > Progress(..), startProgress | 24 | > Progress(..), startProgress |
25 | > | 25 | > |
26 | > , ClientService(..) | ||
27 | > | ||
26 | > -- * Client | 28 | > -- * Client |
27 | > , ClientSession (clientPeerId, allowedExtensions) | 29 | > , ClientSession ( ClientSession |
30 | > , clientPeerId, allowedExtensions | ||
31 | > , nodeListener, peerListener | ||
32 | > ) | ||
33 | > , withClientSession | ||
28 | > , listenerPort, dhtPort | 34 | > , listenerPort, dhtPort |
29 | > | 35 | > |
36 | > , startService | ||
37 | > | ||
30 | > , ThreadCount | 38 | > , ThreadCount |
31 | > , defaultThreadCount | 39 | > , defaultThreadCount |
32 | > | 40 | > |
@@ -34,8 +42,6 @@ | |||
34 | > , registerTorrent | 42 | > , registerTorrent |
35 | > , unregisterTorrent | 43 | > , unregisterTorrent |
36 | > | 44 | > |
37 | > , newClient | ||
38 | > | ||
39 | > , getCurrentProgress | 45 | > , getCurrentProgress |
40 | > , getSwarmCount | 46 | > , getSwarmCount |
41 | > , getPeerCount | 47 | > , getPeerCount |
@@ -64,6 +70,7 @@ | |||
64 | > , SessionState | 70 | > , SessionState |
65 | > , initiatePeerSession | 71 | > , initiatePeerSession |
66 | > , acceptPeerSession | 72 | > , acceptPeerSession |
73 | > , listener | ||
67 | > | 74 | > |
68 | > -- ** Broadcasting | 75 | > -- ** Broadcasting |
69 | > , available | 76 | > , available |
@@ -89,7 +96,7 @@ | |||
89 | > import Control.Concurrent.STM | 96 | > import Control.Concurrent.STM |
90 | > import Control.Concurrent.MSem as MSem | 97 | > import Control.Concurrent.MSem as MSem |
91 | > import Control.Lens | 98 | > import Control.Lens |
92 | > import Control.Monad (when) | 99 | > import Control.Monad (when, forever) |
93 | > import Control.Exception | 100 | > import Control.Exception |
94 | > import Control.Monad.Trans | 101 | > import Control.Monad.Trans |
95 | 102 | ||
@@ -106,7 +113,7 @@ | |||
106 | > import Data.Serialize hiding (get) | 113 | > import Data.Serialize hiding (get) |
107 | > import Text.PrettyPrint | 114 | > import Text.PrettyPrint |
108 | 115 | ||
109 | > import Network | 116 | > import Network hiding (accept) |
110 | > import Network.Socket | 117 | > import Network.Socket |
111 | > import Network.Socket.ByteString | 118 | > import Network.Socket.ByteString |
112 | 119 | ||
@@ -118,6 +125,7 @@ | |||
118 | > import Network.BitTorrent.Peer | 125 | > import Network.BitTorrent.Peer |
119 | > import Network.BitTorrent.Exchange.Protocol as BT | 126 | > import Network.BitTorrent.Exchange.Protocol as BT |
120 | > import Network.BitTorrent.Tracker.Protocol as BT | 127 | > import Network.BitTorrent.Tracker.Protocol as BT |
128 | > import Network.BitTorrent.DHT.Protocol as BT | ||
121 | 129 | ||
122 | Progress | 130 | Progress |
123 | ------------------------------------------------------------------------ | 131 | ------------------------------------------------------------------------ |
@@ -195,7 +203,7 @@ Peer session is one always forked thread. | |||
195 | When client\/swarm\/peer session gets closed kill the corresponding | 203 | When client\/swarm\/peer session gets closed kill the corresponding |
196 | threads, but flush data to disc. (for e.g. storage block map) | 204 | threads, but flush data to disc. (for e.g. storage block map) |
197 | 205 | ||
198 | So for e.g., in order to obtain our first block we need to run at | 206 | So for e.g., in order to obtain our first block we need to spawn at |
199 | least 7 threads: main thread, 2 client session threads, 3 swarm session | 207 | least 7 threads: main thread, 2 client session threads, 3 swarm session |
200 | threads and PeerSession thread. | 208 | threads and PeerSession thread. |
201 | 209 | ||
@@ -296,10 +304,8 @@ so we can abstract out into ClientService: | |||
296 | > , servThread :: !ThreadId | 304 | > , servThread :: !ThreadId |
297 | > } deriving Show | 305 | > } deriving Show |
298 | 306 | ||
299 | startService :: PortNumber -> IO a -> IO ClientService | 307 | > startService :: PortNumber -> (PortNumber -> IO ()) -> IO ClientService |
300 | startService p m = forkIO $ handle $ m p | 308 | > startService port m = ClientService port <$> forkIO (m port) |
301 | where | ||
302 | handle :: IOError -> IO () | ||
303 | 309 | ||
304 | > stopService :: ClientService -> IO () | 310 | > stopService :: ClientService -> IO () |
305 | > stopService ClientService {..} = killThread servThread | 311 | > stopService ClientService {..} = killThread servThread |
@@ -339,8 +345,8 @@ and different enabled extensions at the same time. | |||
339 | > -- 'PeerSession'. | 345 | > -- 'PeerSession'. |
340 | > , allowedExtensions :: [Extension] | 346 | > , allowedExtensions :: [Extension] |
341 | 347 | ||
342 | > , peerListener :: !ClientService | 348 | > , peerListener :: !(MVar ClientService) |
343 | > , nodeListener :: !ClientService | 349 | > , nodeListener :: !(MVar ClientService) |
344 | 350 | ||
345 | > -- | Semaphor used to bound number of active P2P sessions. | 351 | > -- | Semaphor used to bound number of active P2P sessions. |
346 | > , activeThreads :: !(MSem ThreadCount) | 352 | > , activeThreads :: !(MSem ThreadCount) |
@@ -407,20 +413,20 @@ Retrieving client info | |||
407 | 413 | ||
408 | > -- | Create a new client session. The data passed to this function are | 414 | > -- | Create a new client session. The data passed to this function are |
409 | > -- usually loaded from configuration file. | 415 | > -- usually loaded from configuration file. |
410 | > newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. | 416 | > openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions. |
411 | > -> [Extension] -- ^ Extensions allowed to use. | 417 | > -> [Extension] -- ^ Extensions allowed to use. |
412 | > -> IO ClientSession -- ^ Client with unique peer ID. | 418 | > -> IO ClientSession -- ^ Client with unique peer ID. |
413 | 419 | ||
414 | > newClient n exts = do | 420 | > openClientSession n exts = do |
415 | > mgr <- Ev.new | 421 | > mgr <- Ev.new |
416 | > -- TODO kill this thread when leave client | 422 | > -- TODO kill this thread when leave client |
417 | > _ <- forkIO $ loop mgr | 423 | > _ <- forkIO $ loop mgr |
418 | 424 | > | |
419 | > ClientSession | 425 | > ClientSession |
420 | > <$> genPeerId | 426 | > <$> genPeerId |
421 | > <*> pure exts | 427 | > <*> pure exts |
422 | > <*> pure (ClientService 10 undefined) -- TODO | 428 | > <*> newEmptyMVar |
423 | > <*> pure (ClientService 20 undefined) -- TODO | 429 | > <*> newEmptyMVar |
424 | > <*> MSem.new n | 430 | > <*> MSem.new n |
425 | > <*> pure n | 431 | > <*> pure n |
426 | > <*> newTVarIO M.empty | 432 | > <*> newTVarIO M.empty |
@@ -428,11 +434,21 @@ Retrieving client info | |||
428 | > <*> newTVarIO (startProgress 0) | 434 | > <*> newTVarIO (startProgress 0) |
429 | > <*> newTVarIO HM.empty | 435 | > <*> newTVarIO HM.empty |
430 | 436 | ||
431 | > listenerPort :: ClientSession -> PortNumber | 437 | > closeClientSession :: ClientSession -> IO () |
432 | > listenerPort = servPort . peerListener | 438 | > closeClientSession ClientSession {..} = |
439 | > maybeStop (tryTakeMVar peerListener) `finally` | ||
440 | > maybeStop (tryTakeMVar nodeListener) | ||
441 | > where | ||
442 | > maybeStop m = maybe (return ()) stopService =<< m | ||
443 | |||
444 | > withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO () | ||
445 | > withClientSession c es = bracket (openClientSession c es) closeClientSession | ||
433 | 446 | ||
434 | > dhtPort :: ClientSession -> PortNumber | 447 | > listenerPort :: ClientSession -> IO PortNumber |
435 | > dhtPort = servPort . nodeListener | 448 | > listenerPort ClientSession {..} = servPort <$> readMVar peerListener |
449 | |||
450 | > dhtPort :: ClientSession -> IO PortNumber | ||
451 | > dhtPort ClientSession {..} = servPort <$> readMVar nodeListener | ||
436 | 452 | ||
437 | Swarm sessions | 453 | Swarm sessions |
438 | ------------------------------------------------------------------------ | 454 | ------------------------------------------------------------------------ |
@@ -736,8 +752,10 @@ TODO: use STM semaphore | |||
736 | > sendClientStatus (sock, PeerSession {..}) = do | 752 | > sendClientStatus (sock, PeerSession {..}) = do |
737 | > cbf <- readTVarIO $ clientBitfield $ swarmSession | 753 | > cbf <- readTVarIO $ clientBitfield $ swarmSession |
738 | > sendAll sock $ encode $ Bitfield cbf | 754 | > sendAll sock $ encode $ Bitfield cbf |
755 | > | ||
756 | > port <- dhtPort $ clientSession swarmSession | ||
739 | > when (ExtDHT `elem` enabledExtensions) $ do | 757 | > when (ExtDHT `elem` enabledExtensions) $ do |
740 | > sendAll sock $ encode $ Port $ dhtPort $ clientSession swarmSession | 758 | > sendAll sock $ encode $ Port port |
741 | 759 | ||
742 | Exchange action depends on session and socket, whereas session depends | 760 | Exchange action depends on session and socket, whereas session depends |
743 | on socket: | 761 | on socket: |
@@ -786,6 +804,26 @@ Used the a peer want to connect to the client. | |||
786 | > sendClientStatus (sock, ps) | 804 | > sendClientStatus (sock, ps) |
787 | > return ps | 805 | > return ps |
788 | 806 | ||
807 | |||
808 | > listener :: ClientSession -> Exchange -> PortNumber -> IO () | ||
809 | > listener cs action serverPort = bracket openListener close loop | ||
810 | > where | ||
811 | > loop sock = forever $ handle isIOError $ do | ||
812 | > (conn, addr) <- accept sock | ||
813 | > case addr of | ||
814 | > SockAddrInet port host -> do | ||
815 | > acceptPeerSession cs (PeerAddr Nothing host port) conn action | ||
816 | > _ -> return () | ||
817 | > | ||
818 | > isIOError :: IOError -> IO () | ||
819 | > isIOError _ = return () | ||
820 | > | ||
821 | > openListener = do | ||
822 | > sock <- socket AF_INET Stream defaultProtocol | ||
823 | > bindSocket sock (SockAddrInet serverPort 0) | ||
824 | > listen sock 1 | ||
825 | > return sock | ||
826 | |||
789 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece | 827 | Broadcasting: Have, Cancel, Bitfield, SuggestPiece |
790 | ------------------------------------------------------------------------ | 828 | ------------------------------------------------------------------------ |
791 | 829 | ||