1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.BitTorrent.DHT
(
newNodeSession
-- * Tracker
, ping
, findNode
, getPeers
, announcePeer
-- * Server
, dhtServer
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Exception
import Data.ByteString
import Data.Serialize as S
import Data.Function
import Data.Ord
import Data.Maybe
import Data.List as L
import Data.Map as M
import Data.HashMap.Strict as HM
import Network
import Network.Socket
import System.Entropy
import Remote.KRPC
import Remote.KRPC.Protocol
import Data.BEncode
import Data.Torrent
import Network.BitTorrent.Peer
{-----------------------------------------------------------------------
Node
-----------------------------------------------------------------------}
type NodeId = ByteString
-- WARN is the 'system' random suitable for this?
-- | Generate random NodeID used for the entire session.
-- Distribution of ID's should be as uniform as possible.
--
genNodeId :: IO NodeId
genNodeId = getEntropy 20
instance Serialize PortNumber where
get = fromIntegral <$> getWord16be
put = putWord16be . fromIntegral
data NodeAddr = NodeAddr {
nodeIP :: {-# UNPACK #-} !HostAddress
, nodePort :: {-# UNPACK #-} !PortNumber
} deriving (Show, Eq)
instance Serialize NodeAddr where
get = NodeAddr <$> getWord32be <*> get
put NodeAddr {..} = do
putWord32be nodeIP
put nodePort
data NodeInfo = NodeInfo {
nodeID :: !NodeId
, nodeAddr :: !NodeAddr
} deriving (Show, Eq)
instance Serialize NodeInfo where
get = NodeInfo <$> getByteString 20 <*> get
put NodeInfo {..} = put nodeID >> put nodeAddr
type CompactInfo = ByteString
decodeCompact :: CompactInfo -> [NodeInfo]
decodeCompact = either (const []) id . S.runGet (many get)
encodeCompact :: [NodeId] -> CompactInfo
encodeCompact = S.runPut . mapM_ put
decodePeerList :: [BEncode] -> [PeerAddr]
decodePeerList = undefined
encodePeerList :: [PeerAddr] -> [BEncode]
encodePeerList = undefined
type Distance = NodeId
{-----------------------------------------------------------------------
Tokens
-----------------------------------------------------------------------}
type Secret = Int
genSecret :: IO Secret
genSecret = undefined
-- | Instead of periodically loop over the all nodes in the routing
-- table with some given interval (or some other tricky method
-- e.g. using timeouts) we can just update tokens on demand - if no
-- one asks for a token then the token _should_ not change at all.
--
type Token = ByteString
genToken :: NodeAddr -> Secret -> Token
genToken = return undefined
defaultToken :: Token
defaultToken = "0xdeadbeef"
{-----------------------------------------------------------------------
Routing table
-----------------------------------------------------------------------}
type ContactInfo = HashMap InfoHash [PeerAddr]
insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo
insertPeer ih addr = HM.insertWith (++) ih [addr]
lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr]
lookupPeers ih = fromMaybe [] . HM.lookup ih
-- TODO use more compact routing table
type RoutingTable = HashMap NodeId NodeAddr
insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable
insertNode = HM.insert
type Alpha = Int
defaultAlpha :: Alpha
defaultAlpha = 8
-- TODO
kclosest :: Int -> NodeId -> RoutingTable -> [NodeId]
kclosest = undefined
{-----------------------------------------------------------------------
Node session
-----------------------------------------------------------------------}
data NodeSession = NodeSession {
nodeId :: !NodeId
, routingTable :: !(TVar RoutingTable)
, contactInfo :: !(TVar ContactInfo)
, alpha :: !Alpha
, listenerPort :: !PortNumber
}
instance Eq NodeSession where
(==) = (==) `on` nodeId
instance Ord NodeSession where
compare = comparing nodeId
newNodeSession :: PortNumber -> IO NodeSession
newNodeSession lport
= NodeSession
<$> genNodeId
<*> newTVarIO HM.empty
<*> newTVarIO HM.empty
<*> pure defaultAlpha
<*> pure lport
assignToken :: NodeSession -> NodeId -> IO Token
assignToken _ _ = return ""
-- TODO
checkToken :: NodeId -> Token -> NodeSession -> IO Bool
checkToken nid token _ = return True
updateTimestamp :: NodeSession -> NodeId -> IO ()
updateTimestamp = error "updateTimestamp"
updateToken :: NodeSession -> NodeId -> Token -> IO ()
updateToken _ _ _ = error "updateToken"
{-----------------------------------------------------------------------
DHT Queries
-----------------------------------------------------------------------}
pingM :: Method NodeId NodeId
pingM = method "ping" ["id"] ["id"]
findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo)
findNodeM = method "find_node" ["id", "target"] ["id", "nodes"]
-- | Lookup peers by a torrent infohash. This method might return
-- different kind of responses depending on the routing table of
-- queried node:
--
-- * If quieried node contains a peer list for the given infohash
-- then the node should return the list in a "value" key. Note that
-- list is encoded as compact peer address, not a compact node info.
-- The result of 'get_peers' method have the following scheme:
--
-- > { "id" : "dht_server_node_id"
-- > , "token" : "assigned_token"
-- > , "values" : ["_IP_PO", "_ip_po"]
-- > }
--
-- * If quieried node does not contain a list of peers associated
-- with the given infohash, then node should return
--
-- > { "id" : "dht_server_node_id"
-- > , "token" : "assigned_token"
-- > , "nodes" : "compact_nodes_info"
-- > }
--
-- The resulting dictionaries might differ only in a values\/nodes
-- keys.
--
getPeersM :: Method (NodeId, InfoHash) BEncode
getPeersM = method "get_peers" ["id", "info_hash"] []
-- | Used to announce that the peer, controlling the quering node is
-- downloading a torrent on a port.
announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId
announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"]
{-----------------------------------------------------------------------
DHT Tracker
-----------------------------------------------------------------------}
-- TODO: update node timestamp on each successful call
-- | Note that tracker side query functions could throw RPCException.
type DHT a b = NodeSession -> NodeAddr -> a -> IO b
ping :: DHT () ()
ping NodeSession {..} addr @ NodeAddr {..} () = do
nid <- call (nodeIP, nodePort) pingM nodeId
atomically $ modifyTVar' routingTable $ HM.insert nid addr
findNode :: DHT NodeId [NodeInfo]
findNode ses @ NodeSession {..} NodeAddr {..} qnid = do
(nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid)
updateTimestamp ses nid
return (decodeCompact info)
getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr])
getPeers ses @ NodeSession {..} NodeAddr {..} ih = do
resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih)
(nid, tok, res) <- extrResp resp
updateTimestamp ses nid
updateToken ses nid tok
return res
where
extrResp (BDict d)
| Just (BString nid ) <- M.lookup "id" d
, Just (BString tok ) <- M.lookup "token" d
, Just (BList values) <- M.lookup "values" d
= return $ (nid, tok, Right $ decodePeerList values)
| Just (BString nid ) <- M.lookup "id" d
, Just (BString tok ) <- M.lookup "token" d
, Just (BString nodes) <- M.lookup "nodes" d
= return (nid, tok, Left $ decodeCompact nodes)
extrResp _ = throw $ RPCException msg
where msg = ProtocolError "unable to extract getPeers resp"
-- remove token from signature, handle the all token stuff by NodeSession
-- | Note that before ever calling this method you should call the
-- getPeerList.
announcePeer :: DHT (InfoHash, Token) NodeId
announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do
nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok)
updateTimestamp ses nid
return nid
{-----------------------------------------------------------------------
DHT Server
-----------------------------------------------------------------------}
-- TODO: update node timestamp on each successful call
-- NOTE: ensure all server operations run in O(1)
type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b
pingS :: ServerHandler NodeId NodeId
pingS NodeSession {..} addr nid = do
atomically $ modifyTVar' routingTable $ insertNode nid addr
return nodeId
findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo)
findNodeS ses @ NodeSession {..} _ (nid, qnid) = do
updateTimestamp ses nid
rt <- atomically $ readTVar routingTable
return (nodeId, encodeCompact $ kclosest alpha qnid rt)
getPeersS :: ServerHandler (NodeId, InfoHash) BEncode
getPeersS ses @ NodeSession {..} _ (nid, ih) = do
updateTimestamp ses nid
mkResp <$> assignToken ses nid <*> findPeers
where
findPeers = do
list <- lookupPeers ih <$> readTVarIO contactInfo
if not (L.null list)
then return $ Right list
else do
rt <- readTVarIO routingTable
let nodes = kclosest alpha (getInfoHash ih) rt
return $ Left nodes
mkDict tok res = [("id",BString nodeId), ("token", BString tok), res]
mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes)
mkResult (Right values) = ("values", BList $ encodePeerList values)
mkResp tok = BDict . M.fromList . mkDict tok . mkResult
announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId
announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do
updateTimestamp ses nid
registered <- checkToken nid token ses
when registered $ do
atomically $ do
let peerAddr = PeerAddr Nothing nodeIP port
modifyTVar contactInfo $ insertPeer ih peerAddr
return nodeId
dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO ()
dhtTracker = undefined
dhtServer :: NodeSession -> PortNumber -> IO ()
dhtServer s p = server p methods
where
methods =
[ pingM ==> pingS s undefined
, findNodeM ==> findNodeS s undefined
, getPeersM ==> getPeersS s undefined
, announcePeerM ==> announcePeerS s undefined
]
|