summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bittorrent.cabal6
-rw-r--r--src/Network/BitTorrent.hs36
-rw-r--r--src/Network/BitTorrent/DHT.hs336
-rw-r--r--src/Network/BitTorrent/DHT/Protocol.hs339
-rw-r--r--src/Network/BitTorrent/Discovery.hs56
-rw-r--r--src/Network/BitTorrent/Internal.lhs82
6 files changed, 473 insertions, 382 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index 22c2794a..fc19fef9 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -44,9 +44,10 @@ library
44 , Network.BitTorrent.Tracker 44 , Network.BitTorrent.Tracker
45 , Network.BitTorrent.Exchange 45 , Network.BitTorrent.Exchange
46 , Network.BitTorrent.DHT 46 , Network.BitTorrent.DHT
47 , Network.BitTorrent.Discovery
47 , System.Torrent.Storage 48 , System.Torrent.Storage
48 Network.BitTorrent.Internal 49
49 other-modules: 50 other-modules: Network.BitTorrent.Internal
50 if flag(testing) 51 if flag(testing)
51 exposed-modules: Network.BitTorrent.Exchange.Protocol 52 exposed-modules: Network.BitTorrent.Exchange.Protocol
52 , Network.BitTorrent.Tracker.Protocol 53 , Network.BitTorrent.Tracker.Protocol
@@ -54,6 +55,7 @@ library
54 if !flag(testing) 55 if !flag(testing)
55 other-modules: Network.BitTorrent.Exchange.Protocol 56 other-modules: Network.BitTorrent.Exchange.Protocol
56 , Network.BitTorrent.Tracker.Protocol 57 , Network.BitTorrent.Tracker.Protocol
58 , Network.BitTorrent.DHT.Protocol
57 , System.IO.MMap.Fixed 59 , System.IO.MMap.Fixed
58 60
59 build-depends: base == 4.* 61 build-depends: base == 4.*
diff --git a/src/Network/BitTorrent.hs b/src/Network/BitTorrent.hs
index 3602dd7e..06df77dd 100644
--- a/src/Network/BitTorrent.hs
+++ b/src/Network/BitTorrent.hs
@@ -17,8 +17,7 @@ module Network.BitTorrent
17 -- ** Client 17 -- ** Client
18 , ClientSession( clientPeerId, allowedExtensions ) 18 , ClientSession( clientPeerId, allowedExtensions )
19 19
20 , newClient 20 , withDefaultClient
21 , defaultClient
22 21
23 , Progress(..) 22 , Progress(..)
24 , getCurrentProgress 23 , getCurrentProgress
@@ -91,36 +90,25 @@ import Network
91import Data.Bitfield as BF 90import Data.Bitfield as BF
92import Data.Torrent 91import Data.Torrent
93import Network.BitTorrent.Internal 92import Network.BitTorrent.Internal
93import Network.BitTorrent.Peer
94import Network.BitTorrent.Extension
94import Network.BitTorrent.Exchange 95import Network.BitTorrent.Exchange
95import Network.BitTorrent.Exchange.Protocol 96import Network.BitTorrent.Exchange.Protocol
96import Network.BitTorrent.Tracker 97import Network.BitTorrent.Tracker
97import Network.BitTorrent.Extension 98import Network.BitTorrent.Discovery
98import Network.BitTorrent.Peer
99 99
100import System.Torrent.Storage 100import System.Torrent.Storage
101 101
102-- TODO remove fork from Network.BitTorrent.Exchange
103-- TODO make all forks in Internal.
102 104
103-- | Client session with default parameters. Use it for testing only. 105-- | Client session with default parameters. Use it for testing only.
104defaultClient :: IO ClientSession 106withDefaultClient :: PortNumber -> PortNumber -> (ClientSession -> IO ()) -> IO ()
105defaultClient = newClient defaultThreadCount defaultExtensions 107withDefaultClient dhtPort listPort action = do
106 108 withClientSession defaultThreadCount defaultExtensions $ \client -> do
107-- discover should hide tracker and DHT communication under the hood 109 startListener client listPort
108-- thus we can obtain an unified interface 110 startDHT client dhtPort
109 111 action client
110discover :: SwarmSession -> P2P () -> IO ()
111discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do
112 let conn = TConnection (tAnnounce torrentMeta)
113 (tInfoHash torrentMeta)
114 (clientPeerId clientSession)
115 (listenerPort clientSession)
116
117 progress <- getCurrentProgress clientSession
118
119 withTracker progress conn $ \tses -> do
120 forever $ do
121 addr <- getPeerAddr tses
122 spawnP2P swarm addr $ do
123 action
124 112
125{----------------------------------------------------------------------- 113{-----------------------------------------------------------------------
126 Torrent management 114 Torrent management
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs
index f3c993c3..b0aac002 100644
--- a/src/Network/BitTorrent/DHT.hs
+++ b/src/Network/BitTorrent/DHT.hs
@@ -1,338 +1,6 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE RecordWildCards #-}
3module Network.BitTorrent.DHT 1module Network.BitTorrent.DHT
4 ( 2 ( newNodeSession
5 newNodeSession
6
7 -- * Tracker
8 , ping
9 , findNode
10 , getPeers
11 , announcePeer
12
13 -- * Server
14 , dhtServer 3 , dhtServer
15 ) where 4 ) where
16 5
17import Control.Applicative 6import Network.BitTorrent.DHT.Protocol \ No newline at end of file
18import Control.Concurrent
19import Control.Concurrent.STM
20import Control.Monad
21import Control.Exception
22import Data.ByteString
23import Data.Serialize as S
24import Data.Function
25import Data.Ord
26import Data.Maybe
27import Data.List as L
28import Data.Map as M
29import Data.HashMap.Strict as HM
30import Network
31import Network.Socket
32import System.Entropy
33
34import Remote.KRPC
35import Remote.KRPC.Protocol
36import Data.BEncode
37import Data.Torrent
38import Network.BitTorrent.Peer
39
40{-----------------------------------------------------------------------
41 Node
42-----------------------------------------------------------------------}
43
44type NodeId = ByteString
45
46-- WARN is the 'system' random suitable for this?
47-- | Generate random NodeID used for the entire session.
48-- Distribution of ID's should be as uniform as possible.
49--
50genNodeId :: IO NodeId
51genNodeId = getEntropy 20
52
53instance Serialize PortNumber where
54 get = fromIntegral <$> getWord16be
55 put = putWord16be . fromIntegral
56
57
58data NodeAddr = NodeAddr {
59 nodeIP :: {-# UNPACK #-} !HostAddress
60 , nodePort :: {-# UNPACK #-} !PortNumber
61 } deriving (Show, Eq)
62
63instance Serialize NodeAddr where
64 get = NodeAddr <$> getWord32be <*> get
65 put NodeAddr {..} = do
66 putWord32be nodeIP
67 put nodePort
68
69
70data NodeInfo = NodeInfo {
71 nodeID :: !NodeId
72 , nodeAddr :: !NodeAddr
73 } deriving (Show, Eq)
74
75instance Serialize NodeInfo where
76 get = NodeInfo <$> getByteString 20 <*> get
77 put NodeInfo {..} = put nodeID >> put nodeAddr
78
79type CompactInfo = ByteString
80
81decodeCompact :: CompactInfo -> [NodeInfo]
82decodeCompact = either (const []) id . S.runGet (many get)
83
84encodeCompact :: [NodeId] -> CompactInfo
85encodeCompact = S.runPut . mapM_ put
86
87decodePeerList :: [BEncode] -> [PeerAddr]
88decodePeerList = undefined
89
90encodePeerList :: [PeerAddr] -> [BEncode]
91encodePeerList = undefined
92
93type Distance = NodeId
94
95
96{-----------------------------------------------------------------------
97 Tokens
98-----------------------------------------------------------------------}
99
100type Secret = Int
101
102genSecret :: IO Secret
103genSecret = undefined
104
105-- | Instead of periodically loop over the all nodes in the routing
106-- table with some given interval (or some other tricky method
107-- e.g. using timeouts) we can just update tokens on demand - if no
108-- one asks for a token then the token _should_ not change at all.
109--
110type Token = ByteString
111
112genToken :: NodeAddr -> Secret -> Token
113genToken = return undefined
114
115defaultToken :: Token
116defaultToken = "0xdeadbeef"
117
118{-----------------------------------------------------------------------
119 Routing table
120-----------------------------------------------------------------------}
121
122type ContactInfo = HashMap InfoHash [PeerAddr]
123
124insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo
125insertPeer ih addr = HM.insertWith (++) ih [addr]
126
127lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr]
128lookupPeers ih = fromMaybe [] . HM.lookup ih
129
130-- TODO use more compact routing table
131type RoutingTable = HashMap NodeId NodeAddr
132
133insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable
134insertNode = HM.insert
135
136type Alpha = Int
137
138defaultAlpha :: Alpha
139defaultAlpha = 8
140
141-- TODO
142kclosest :: Int -> NodeId -> RoutingTable -> [NodeId]
143kclosest = undefined
144
145{-----------------------------------------------------------------------
146 Node session
147-----------------------------------------------------------------------}
148
149data NodeSession = NodeSession {
150 nodeId :: !NodeId
151 , routingTable :: !(TVar RoutingTable)
152 , contactInfo :: !(TVar ContactInfo)
153 , alpha :: !Alpha
154 , listenerPort :: !PortNumber
155 }
156
157instance Eq NodeSession where
158 (==) = (==) `on` nodeId
159
160instance Ord NodeSession where
161 compare = comparing nodeId
162
163newNodeSession :: PortNumber -> IO NodeSession
164newNodeSession lport
165 = NodeSession
166 <$> genNodeId
167 <*> newTVarIO HM.empty
168 <*> newTVarIO HM.empty
169 <*> pure defaultAlpha
170 <*> pure lport
171
172assignToken :: NodeSession -> NodeId -> IO Token
173assignToken _ _ = return ""
174
175-- TODO
176checkToken :: NodeId -> Token -> NodeSession -> IO Bool
177checkToken nid token _ = return True
178
179updateTimestamp :: NodeSession -> NodeId -> IO ()
180updateTimestamp = error "updateTimestamp"
181
182updateToken :: NodeSession -> NodeId -> Token -> IO ()
183updateToken _ _ _ = error "updateToken"
184
185{-----------------------------------------------------------------------
186 DHT Queries
187-----------------------------------------------------------------------}
188
189pingM :: Method NodeId NodeId
190pingM = method "ping" ["id"] ["id"]
191
192findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo)
193findNodeM = method "find_node" ["id", "target"] ["id", "nodes"]
194
195-- | Lookup peers by a torrent infohash. This method might return
196-- different kind of responses depending on the routing table of
197-- queried node:
198--
199-- * If quieried node contains a peer list for the given infohash
200-- then the node should return the list in a "value" key. Note that
201-- list is encoded as compact peer address, not a compact node info.
202-- The result of 'get_peers' method have the following scheme:
203--
204-- > { "id" : "dht_server_node_id"
205-- > , "token" : "assigned_token"
206-- > , "values" : ["_IP_PO", "_ip_po"]
207-- > }
208--
209-- * If quieried node does not contain a list of peers associated
210-- with the given infohash, then node should return
211--
212-- > { "id" : "dht_server_node_id"
213-- > , "token" : "assigned_token"
214-- > , "nodes" : "compact_nodes_info"
215-- > }
216--
217-- The resulting dictionaries might differ only in a values\/nodes
218-- keys.
219--
220getPeersM :: Method (NodeId, InfoHash) BEncode
221getPeersM = method "get_peers" ["id", "info_hash"] []
222
223-- | Used to announce that the peer, controlling the quering node is
224-- downloading a torrent on a port.
225announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId
226announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"]
227
228{-----------------------------------------------------------------------
229 DHT Tracker
230-----------------------------------------------------------------------}
231-- TODO: update node timestamp on each successful call
232
233-- | Note that tracker side query functions could throw RPCException.
234type DHT a b = NodeSession -> NodeAddr -> a -> IO b
235
236ping :: DHT () ()
237ping NodeSession {..} addr @ NodeAddr {..} () = do
238 nid <- call (nodeIP, nodePort) pingM nodeId
239 atomically $ modifyTVar' routingTable $ HM.insert nid addr
240
241findNode :: DHT NodeId [NodeInfo]
242findNode ses @ NodeSession {..} NodeAddr {..} qnid = do
243 (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid)
244 updateTimestamp ses nid
245 return (decodeCompact info)
246
247getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr])
248getPeers ses @ NodeSession {..} NodeAddr {..} ih = do
249 resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih)
250 (nid, tok, res) <- extrResp resp
251 updateTimestamp ses nid
252 updateToken ses nid tok
253 return res
254 where
255 extrResp (BDict d)
256 | Just (BString nid ) <- M.lookup "id" d
257 , Just (BString tok ) <- M.lookup "token" d
258 , Just (BList values) <- M.lookup "values" d
259 = return $ (nid, tok, Right $ decodePeerList values)
260
261 | Just (BString nid ) <- M.lookup "id" d
262 , Just (BString tok ) <- M.lookup "token" d
263 , Just (BString nodes) <- M.lookup "nodes" d
264 = return (nid, tok, Left $ decodeCompact nodes)
265
266 extrResp _ = throw $ RPCException msg
267 where msg = ProtocolError "unable to extract getPeers resp"
268
269-- remove token from signature, handle the all token stuff by NodeSession
270
271-- | Note that before ever calling this method you should call the
272-- getPeerList.
273announcePeer :: DHT (InfoHash, Token) NodeId
274announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do
275 nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok)
276 updateTimestamp ses nid
277 return nid
278
279{-----------------------------------------------------------------------
280 DHT Server
281-----------------------------------------------------------------------}
282-- TODO: update node timestamp on each successful call
283-- NOTE: ensure all server operations run in O(1)
284
285type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b
286
287pingS :: ServerHandler NodeId NodeId
288pingS NodeSession {..} addr nid = do
289 atomically $ modifyTVar' routingTable $ insertNode nid addr
290 return nodeId
291
292findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo)
293findNodeS ses @ NodeSession {..} _ (nid, qnid) = do
294 updateTimestamp ses nid
295 rt <- atomically $ readTVar routingTable
296 return (nodeId, encodeCompact $ kclosest alpha qnid rt)
297
298getPeersS :: ServerHandler (NodeId, InfoHash) BEncode
299getPeersS ses @ NodeSession {..} _ (nid, ih) = do
300 updateTimestamp ses nid
301 mkResp <$> assignToken ses nid <*> findPeers
302 where
303 findPeers = do
304 list <- lookupPeers ih <$> readTVarIO contactInfo
305 if not (L.null list)
306 then return $ Right list
307 else do
308 rt <- readTVarIO routingTable
309 let nodes = kclosest alpha (getInfoHash ih) rt
310 return $ Left nodes
311
312 mkDict tok res = [("id",BString nodeId), ("token", BString tok), res]
313 mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes)
314 mkResult (Right values) = ("values", BList $ encodePeerList values)
315 mkResp tok = BDict . M.fromList . mkDict tok . mkResult
316
317announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId
318announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do
319 updateTimestamp ses nid
320 registered <- checkToken nid token ses
321 when registered $ do
322 atomically $ do
323 let peerAddr = PeerAddr Nothing nodeIP port
324 modifyTVar contactInfo $ insertPeer ih peerAddr
325 return nodeId
326
327dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO ()
328dhtTracker = undefined
329
330dhtServer :: NodeSession -> PortNumber -> IO ()
331dhtServer s p = server p methods
332 where
333 methods =
334 [ pingM ==> pingS s undefined
335 , findNodeM ==> findNodeS s undefined
336 , getPeersM ==> getPeersS s undefined
337 , announcePeerM ==> announcePeerS s undefined
338 ] \ No newline at end of file
diff --git a/src/Network/BitTorrent/DHT/Protocol.hs b/src/Network/BitTorrent/DHT/Protocol.hs
new file mode 100644
index 00000000..5267a916
--- /dev/null
+++ b/src/Network/BitTorrent/DHT/Protocol.hs
@@ -0,0 +1,339 @@
1{-# LANGUAGE OverloadedStrings #-}
2{-# LANGUAGE RecordWildCards #-}
3module Network.BitTorrent.DHT.Protocol
4 (
5 newNodeSession
6
7 -- * Tracker
8 , ping
9 , findNode
10 , getPeers
11 , announcePeer
12
13 -- * Server
14 , dhtServer
15 ) where
16
17import Control.Applicative
18import Control.Concurrent
19import Control.Concurrent.STM
20import Control.Monad
21import Control.Exception
22import Data.ByteString
23import Data.Serialize as S
24import Data.Function
25import Data.Ord
26import Data.Maybe
27import Data.List as L
28import Data.Map as M
29import Data.HashMap.Strict as HM
30import Network
31import Network.Socket
32import System.Entropy
33
34import Remote.KRPC
35import Remote.KRPC.Protocol
36import Data.BEncode
37import Data.Torrent
38import Network.BitTorrent.Peer
39
40{-----------------------------------------------------------------------
41 Node
42-----------------------------------------------------------------------}
43
44type NodeId = ByteString
45
46-- WARN is the 'system' random suitable for this?
47-- | Generate random NodeID used for the entire session.
48-- Distribution of ID's should be as uniform as possible.
49--
50genNodeId :: IO NodeId
51genNodeId = getEntropy 20
52
53instance Serialize PortNumber where
54 get = fromIntegral <$> getWord16be
55 put = putWord16be . fromIntegral
56
57
58data NodeAddr = NodeAddr {
59 nodeIP :: {-# UNPACK #-} !HostAddress
60 , nodePort :: {-# UNPACK #-} !PortNumber
61 } deriving (Show, Eq)
62
63instance Serialize NodeAddr where
64 get = NodeAddr <$> getWord32be <*> get
65 put NodeAddr {..} = do
66 putWord32be nodeIP
67 put nodePort
68
69
70data NodeInfo = NodeInfo {
71 nodeID :: !NodeId
72 , nodeAddr :: !NodeAddr
73 } deriving (Show, Eq)
74
75instance Serialize NodeInfo where
76 get = NodeInfo <$> getByteString 20 <*> get
77 put NodeInfo {..} = put nodeID >> put nodeAddr
78
79type CompactInfo = ByteString
80
81decodeCompact :: CompactInfo -> [NodeInfo]
82decodeCompact = either (const []) id . S.runGet (many get)
83
84encodeCompact :: [NodeId] -> CompactInfo
85encodeCompact = S.runPut . mapM_ put
86
87decodePeerList :: [BEncode] -> [PeerAddr]
88decodePeerList = undefined
89
90encodePeerList :: [PeerAddr] -> [BEncode]
91encodePeerList = undefined
92
93type Distance = NodeId
94
95{-----------------------------------------------------------------------
96 Tokens
97-----------------------------------------------------------------------}
98
99type Secret = Int
100
101genSecret :: IO Secret
102genSecret = error "secret"
103
104-- | Instead of periodically loop over the all nodes in the routing
105-- table with some given interval (or some other tricky method
106-- e.g. using timeouts) we can just update tokens on demand - if no
107-- one asks for a token then the token _should_ not change at all.
108--
109type Token = ByteString
110
111defaultToken :: Token
112defaultToken = "0xdeadbeef"
113
114genToken :: NodeAddr -> Secret -> Token
115genToken _ _ = defaultToken
116
117{-----------------------------------------------------------------------
118 Routing table
119-----------------------------------------------------------------------}
120
121type ContactInfo = HashMap InfoHash [PeerAddr]
122
123insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo
124insertPeer ih addr = HM.insertWith (++) ih [addr]
125
126lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr]
127lookupPeers ih = fromMaybe [] . HM.lookup ih
128
129-- TODO use more compact routing table
130type RoutingTable = HashMap NodeId NodeAddr
131
132insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable
133insertNode = HM.insert
134
135type Alpha = Int
136
137defaultAlpha :: Alpha
138defaultAlpha = 8
139
140-- TODO
141kclosest :: Int -> NodeId -> RoutingTable -> [NodeId]
142kclosest = undefined
143
144{-----------------------------------------------------------------------
145 Node session
146-----------------------------------------------------------------------}
147
148data NodeSession = NodeSession {
149 nodeId :: !NodeId
150 , routingTable :: !(TVar RoutingTable)
151 , contactInfo :: !(TVar ContactInfo)
152-- , currentSecret :: !(TVar Secret)
153-- , secretTimestamp :: !(TVar Timestamp)
154 , alpha :: !Alpha
155 , listenerPort :: !PortNumber
156 }
157
158instance Eq NodeSession where
159 (==) = (==) `on` nodeId
160
161instance Ord NodeSession where
162 compare = comparing nodeId
163
164newNodeSession :: PortNumber -> IO NodeSession
165newNodeSession lport
166 = NodeSession
167 <$> genNodeId
168 <*> newTVarIO HM.empty
169 <*> newTVarIO HM.empty
170 <*> pure defaultAlpha
171 <*> pure lport
172
173assignToken :: NodeSession -> NodeId -> IO Token
174assignToken _ _ = return ""
175
176-- TODO
177checkToken :: NodeId -> Token -> NodeSession -> IO Bool
178checkToken nid token _ = return True
179
180updateTimestamp :: NodeSession -> NodeId -> IO ()
181updateTimestamp = error "updateTimestamp"
182
183updateToken :: NodeSession -> NodeId -> Token -> IO ()
184updateToken _ _ _ = error "updateToken"
185
186{-----------------------------------------------------------------------
187 DHT Queries
188-----------------------------------------------------------------------}
189
190pingM :: Method NodeId NodeId
191pingM = method "ping" ["id"] ["id"]
192
193findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo)
194findNodeM = method "find_node" ["id", "target"] ["id", "nodes"]
195
196-- | Lookup peers by a torrent infohash. This method might return
197-- different kind of responses depending on the routing table of
198-- queried node:
199--
200-- * If quieried node contains a peer list for the given infohash
201-- then the node should return the list in a "value" key. Note that
202-- list is encoded as compact peer address, not a compact node info.
203-- The result of 'get_peers' method have the following scheme:
204--
205-- > { "id" : "dht_server_node_id"
206-- > , "token" : "assigned_token"
207-- > , "values" : ["_IP_PO", "_ip_po"]
208-- > }
209--
210-- * If quieried node does not contain a list of peers associated
211-- with the given infohash, then node should return
212--
213-- > { "id" : "dht_server_node_id"
214-- > , "token" : "assigned_token"
215-- > , "nodes" : "compact_nodes_info"
216-- > }
217--
218-- The resulting dictionaries might differ only in a values\/nodes
219-- keys.
220--
221getPeersM :: Method (NodeId, InfoHash) BEncode
222getPeersM = method "get_peers" ["id", "info_hash"] []
223
224-- | Used to announce that the peer, controlling the quering node is
225-- downloading a torrent on a port.
226announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId
227announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"]
228
229{-----------------------------------------------------------------------
230 DHT Tracker
231-----------------------------------------------------------------------}
232-- TODO: update node timestamp on each successful call
233
234-- | Note that tracker side query functions could throw RPCException.
235type DHT a b = NodeSession -> NodeAddr -> a -> IO b
236
237ping :: DHT () ()
238ping NodeSession {..} addr @ NodeAddr {..} () = do
239 nid <- call (nodeIP, nodePort) pingM nodeId
240 atomically $ modifyTVar' routingTable $ HM.insert nid addr
241
242findNode :: DHT NodeId [NodeInfo]
243findNode ses @ NodeSession {..} NodeAddr {..} qnid = do
244 (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid)
245 updateTimestamp ses nid
246 return (decodeCompact info)
247
248getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr])
249getPeers ses @ NodeSession {..} NodeAddr {..} ih = do
250 resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih)
251 (nid, tok, res) <- extrResp resp
252 updateTimestamp ses nid
253 updateToken ses nid tok
254 return res
255 where
256 extrResp (BDict d)
257 | Just (BString nid ) <- M.lookup "id" d
258 , Just (BString tok ) <- M.lookup "token" d
259 , Just (BList values) <- M.lookup "values" d
260 = return $ (nid, tok, Right $ decodePeerList values)
261
262 | Just (BString nid ) <- M.lookup "id" d
263 , Just (BString tok ) <- M.lookup "token" d
264 , Just (BString nodes) <- M.lookup "nodes" d
265 = return (nid, tok, Left $ decodeCompact nodes)
266
267 extrResp _ = throw $ RPCException msg
268 where msg = ProtocolError "unable to extract getPeers resp"
269
270-- remove token from signature, handle the all token stuff by NodeSession
271
272-- | Note that before ever calling this method you should call the
273-- getPeerList.
274announcePeer :: DHT (InfoHash, Token) NodeId
275announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do
276 nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok)
277 updateTimestamp ses nid
278 return nid
279
280{-----------------------------------------------------------------------
281 DHT Server
282-----------------------------------------------------------------------}
283-- TODO: update node timestamp on each successful call
284-- NOTE: ensure all server operations run in O(1)
285
286type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b
287
288pingS :: ServerHandler NodeId NodeId
289pingS NodeSession {..} addr nid = do
290 atomically $ modifyTVar' routingTable $ insertNode nid addr
291 return nodeId
292
293findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo)
294findNodeS ses @ NodeSession {..} _ (nid, qnid) = do
295 updateTimestamp ses nid
296 rt <- atomically $ readTVar routingTable
297 return (nodeId, encodeCompact $ kclosest alpha qnid rt)
298
299getPeersS :: ServerHandler (NodeId, InfoHash) BEncode
300getPeersS ses @ NodeSession {..} _ (nid, ih) = do
301 updateTimestamp ses nid
302 mkResp <$> assignToken ses nid <*> findPeers
303 where
304 findPeers = do
305 list <- lookupPeers ih <$> readTVarIO contactInfo
306 if not (L.null list)
307 then return $ Right list
308 else do
309 rt <- readTVarIO routingTable
310 let nodes = kclosest alpha (getInfoHash ih) rt
311 return $ Left nodes
312
313 mkDict tok res = [("id",BString nodeId), ("token", BString tok), res]
314 mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes)
315 mkResult (Right values) = ("values", BList $ encodePeerList values)
316 mkResp tok = BDict . M.fromList . mkDict tok . mkResult
317
318announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId
319announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do
320 updateTimestamp ses nid
321 registered <- checkToken nid token ses
322 when registered $ do
323 atomically $ do
324 let peerAddr = PeerAddr Nothing nodeIP port
325 modifyTVar contactInfo $ insertPeer ih peerAddr
326 return nodeId
327
328dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO ()
329dhtTracker = undefined
330
331dhtServer :: NodeSession -> PortNumber -> IO ()
332dhtServer s p = server p methods
333 where
334 methods =
335 [ pingM ==> pingS s undefined
336 , findNodeM ==> findNodeS s undefined
337 , getPeersM ==> getPeersS s undefined
338 , announcePeerM ==> announcePeerS s undefined
339 ] \ No newline at end of file
diff --git a/src/Network/BitTorrent/Discovery.hs b/src/Network/BitTorrent/Discovery.hs
new file mode 100644
index 00000000..770ae818
--- /dev/null
+++ b/src/Network/BitTorrent/Discovery.hs
@@ -0,0 +1,56 @@
1{-# LANGUAGE RecordWildCards #-}
2{-# LANGUAGE OverloadedStrings #-}
3module Network.BitTorrent.Discovery
4 (discover, startListener, startDHT
5 ) where
6
7import Control.Monad
8import Control.Concurrent
9import Control.Exception
10import Network.Socket
11
12import Data.Torrent
13import Network.BitTorrent.Peer
14import Network.BitTorrent.Internal
15import Network.BitTorrent.Exchange
16import Network.BitTorrent.Tracker
17import Network.BitTorrent.DHT
18
19
20-- discover should hide tracker and DHT communication under the hood
21-- thus we can obtain an unified interface
22
23discover :: SwarmSession -> P2P () -> IO ()
24discover swarm @ SwarmSession {..} action = {-# SCC discover #-} do
25 port <- listenerPort clientSession
26
27 let conn = TConnection {
28 tconnAnnounce = tAnnounce torrentMeta
29 , tconnInfoHash = tInfoHash torrentMeta
30 , tconnPeerId = clientPeerId clientSession
31 , tconnPort = port
32 }
33
34 progress <- getCurrentProgress clientSession
35
36 withTracker progress conn $ \tses -> do
37 forever $ do
38 addr <- getPeerAddr tses
39 spawnP2P swarm addr $ do
40 action
41
42startListener :: ClientSession -> PortNumber -> IO ()
43startListener cs @ ClientSession {..} port =
44 putMVar peerListener =<< startService port (listener cs (error "listener"))
45
46startDHT :: ClientSession -> PortNumber -> IO ()
47startDHT ClientSession {..} nodePort = do
48 maybe failure start =<< tryTakeMVar peerListener
49 where
50 start ClientService {..} = do
51 ses <- newNodeSession servPort
52 serv <- startService nodePort (dhtServer ses)
53 putMVar nodeListener serv
54
55 failure = throwIO $ userError msg
56 msg = "unable to start DHT server: peer listener is not running"
diff --git a/src/Network/BitTorrent/Internal.lhs b/src/Network/BitTorrent/Internal.lhs
index 15606c57..8461a841 100644
--- a/src/Network/BitTorrent/Internal.lhs
+++ b/src/Network/BitTorrent/Internal.lhs
@@ -23,10 +23,18 @@
23> ( -- * Progress 23> ( -- * Progress
24> Progress(..), startProgress 24> Progress(..), startProgress
25> 25>
26> , ClientService(..)
27>
26> -- * Client 28> -- * Client
27> , ClientSession (clientPeerId, allowedExtensions) 29> , ClientSession ( ClientSession
30> , clientPeerId, allowedExtensions
31> , nodeListener, peerListener
32> )
33> , withClientSession
28> , listenerPort, dhtPort 34> , listenerPort, dhtPort
29> 35>
36> , startService
37>
30> , ThreadCount 38> , ThreadCount
31> , defaultThreadCount 39> , defaultThreadCount
32> 40>
@@ -34,8 +42,6 @@
34> , registerTorrent 42> , registerTorrent
35> , unregisterTorrent 43> , unregisterTorrent
36> 44>
37> , newClient
38>
39> , getCurrentProgress 45> , getCurrentProgress
40> , getSwarmCount 46> , getSwarmCount
41> , getPeerCount 47> , getPeerCount
@@ -64,6 +70,7 @@
64> , SessionState 70> , SessionState
65> , initiatePeerSession 71> , initiatePeerSession
66> , acceptPeerSession 72> , acceptPeerSession
73> , listener
67> 74>
68> -- ** Broadcasting 75> -- ** Broadcasting
69> , available 76> , available
@@ -89,7 +96,7 @@
89> import Control.Concurrent.STM 96> import Control.Concurrent.STM
90> import Control.Concurrent.MSem as MSem 97> import Control.Concurrent.MSem as MSem
91> import Control.Lens 98> import Control.Lens
92> import Control.Monad (when) 99> import Control.Monad (when, forever)
93> import Control.Exception 100> import Control.Exception
94> import Control.Monad.Trans 101> import Control.Monad.Trans
95 102
@@ -106,7 +113,7 @@
106> import Data.Serialize hiding (get) 113> import Data.Serialize hiding (get)
107> import Text.PrettyPrint 114> import Text.PrettyPrint
108 115
109> import Network 116> import Network hiding (accept)
110> import Network.Socket 117> import Network.Socket
111> import Network.Socket.ByteString 118> import Network.Socket.ByteString
112 119
@@ -118,6 +125,7 @@
118> import Network.BitTorrent.Peer 125> import Network.BitTorrent.Peer
119> import Network.BitTorrent.Exchange.Protocol as BT 126> import Network.BitTorrent.Exchange.Protocol as BT
120> import Network.BitTorrent.Tracker.Protocol as BT 127> import Network.BitTorrent.Tracker.Protocol as BT
128> import Network.BitTorrent.DHT.Protocol as BT
121 129
122Progress 130Progress
123------------------------------------------------------------------------ 131------------------------------------------------------------------------
@@ -195,7 +203,7 @@ Peer session is one always forked thread.
195When client\/swarm\/peer session gets closed kill the corresponding 203When client\/swarm\/peer session gets closed kill the corresponding
196threads, but flush data to disc. (for e.g. storage block map) 204threads, but flush data to disc. (for e.g. storage block map)
197 205
198So for e.g., in order to obtain our first block we need to run at 206So for e.g., in order to obtain our first block we need to spawn at
199least 7 threads: main thread, 2 client session threads, 3 swarm session 207least 7 threads: main thread, 2 client session threads, 3 swarm session
200threads and PeerSession thread. 208threads and PeerSession thread.
201 209
@@ -296,10 +304,8 @@ so we can abstract out into ClientService:
296> , servThread :: !ThreadId 304> , servThread :: !ThreadId
297> } deriving Show 305> } deriving Show
298 306
299startService :: PortNumber -> IO a -> IO ClientService 307> startService :: PortNumber -> (PortNumber -> IO ()) -> IO ClientService
300startService p m = forkIO $ handle $ m p 308> startService port m = ClientService port <$> forkIO (m port)
301 where
302 handle :: IOError -> IO ()
303 309
304> stopService :: ClientService -> IO () 310> stopService :: ClientService -> IO ()
305> stopService ClientService {..} = killThread servThread 311> stopService ClientService {..} = killThread servThread
@@ -339,8 +345,8 @@ and different enabled extensions at the same time.
339> -- 'PeerSession'. 345> -- 'PeerSession'.
340> , allowedExtensions :: [Extension] 346> , allowedExtensions :: [Extension]
341 347
342> , peerListener :: !ClientService 348> , peerListener :: !(MVar ClientService)
343> , nodeListener :: !ClientService 349> , nodeListener :: !(MVar ClientService)
344 350
345> -- | Semaphor used to bound number of active P2P sessions. 351> -- | Semaphor used to bound number of active P2P sessions.
346> , activeThreads :: !(MSem ThreadCount) 352> , activeThreads :: !(MSem ThreadCount)
@@ -407,20 +413,20 @@ Retrieving client info
407 413
408> -- | Create a new client session. The data passed to this function are 414> -- | Create a new client session. The data passed to this function are
409> -- usually loaded from configuration file. 415> -- usually loaded from configuration file.
410> newClient :: SessionCount -- ^ Maximum count of active P2P Sessions. 416> openClientSession :: SessionCount -- ^ Maximum count of active P2P Sessions.
411> -> [Extension] -- ^ Extensions allowed to use. 417> -> [Extension] -- ^ Extensions allowed to use.
412> -> IO ClientSession -- ^ Client with unique peer ID. 418> -> IO ClientSession -- ^ Client with unique peer ID.
413 419
414> newClient n exts = do 420> openClientSession n exts = do
415> mgr <- Ev.new 421> mgr <- Ev.new
416> -- TODO kill this thread when leave client 422> -- TODO kill this thread when leave client
417> _ <- forkIO $ loop mgr 423> _ <- forkIO $ loop mgr
418 424>
419> ClientSession 425> ClientSession
420> <$> genPeerId 426> <$> genPeerId
421> <*> pure exts 427> <*> pure exts
422> <*> pure (ClientService 10 undefined) -- TODO 428> <*> newEmptyMVar
423> <*> pure (ClientService 20 undefined) -- TODO 429> <*> newEmptyMVar
424> <*> MSem.new n 430> <*> MSem.new n
425> <*> pure n 431> <*> pure n
426> <*> newTVarIO M.empty 432> <*> newTVarIO M.empty
@@ -428,11 +434,21 @@ Retrieving client info
428> <*> newTVarIO (startProgress 0) 434> <*> newTVarIO (startProgress 0)
429> <*> newTVarIO HM.empty 435> <*> newTVarIO HM.empty
430 436
431> listenerPort :: ClientSession -> PortNumber 437> closeClientSession :: ClientSession -> IO ()
432> listenerPort = servPort . peerListener 438> closeClientSession ClientSession {..} =
439> maybeStop (tryTakeMVar peerListener) `finally`
440> maybeStop (tryTakeMVar nodeListener)
441> where
442> maybeStop m = maybe (return ()) stopService =<< m
443
444> withClientSession :: SessionCount -> [Extension] -> (ClientSession -> IO ()) -> IO ()
445> withClientSession c es = bracket (openClientSession c es) closeClientSession
433 446
434> dhtPort :: ClientSession -> PortNumber 447> listenerPort :: ClientSession -> IO PortNumber
435> dhtPort = servPort . nodeListener 448> listenerPort ClientSession {..} = servPort <$> readMVar peerListener
449
450> dhtPort :: ClientSession -> IO PortNumber
451> dhtPort ClientSession {..} = servPort <$> readMVar nodeListener
436 452
437Swarm sessions 453Swarm sessions
438------------------------------------------------------------------------ 454------------------------------------------------------------------------
@@ -736,8 +752,10 @@ TODO: use STM semaphore
736> sendClientStatus (sock, PeerSession {..}) = do 752> sendClientStatus (sock, PeerSession {..}) = do
737> cbf <- readTVarIO $ clientBitfield $ swarmSession 753> cbf <- readTVarIO $ clientBitfield $ swarmSession
738> sendAll sock $ encode $ Bitfield cbf 754> sendAll sock $ encode $ Bitfield cbf
755>
756> port <- dhtPort $ clientSession swarmSession
739> when (ExtDHT `elem` enabledExtensions) $ do 757> when (ExtDHT `elem` enabledExtensions) $ do
740> sendAll sock $ encode $ Port $ dhtPort $ clientSession swarmSession 758> sendAll sock $ encode $ Port port
741 759
742Exchange action depends on session and socket, whereas session depends 760Exchange action depends on session and socket, whereas session depends
743on socket: 761on socket:
@@ -786,6 +804,26 @@ Used the a peer want to connect to the client.
786> sendClientStatus (sock, ps) 804> sendClientStatus (sock, ps)
787> return ps 805> return ps
788 806
807
808> listener :: ClientSession -> Exchange -> PortNumber -> IO ()
809> listener cs action serverPort = bracket openListener close loop
810> where
811> loop sock = forever $ handle isIOError $ do
812> (conn, addr) <- accept sock
813> case addr of
814> SockAddrInet port host -> do
815> acceptPeerSession cs (PeerAddr Nothing host port) conn action
816> _ -> return ()
817>
818> isIOError :: IOError -> IO ()
819> isIOError _ = return ()
820>
821> openListener = do
822> sock <- socket AF_INET Stream defaultProtocol
823> bindSocket sock (SockAddrInet serverPort 0)
824> listen sock 1
825> return sock
826
789Broadcasting: Have, Cancel, Bitfield, SuggestPiece 827Broadcasting: Have, Cancel, Bitfield, SuggestPiece
790------------------------------------------------------------------------ 828------------------------------------------------------------------------
791 829