diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 336 |
1 files changed, 2 insertions, 334 deletions
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index f3c993c3..b0aac002 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -1,338 +1,6 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
3 | module Network.BitTorrent.DHT | 1 | module Network.BitTorrent.DHT |
4 | ( | 2 | ( newNodeSession |
5 | newNodeSession | ||
6 | |||
7 | -- * Tracker | ||
8 | , ping | ||
9 | , findNode | ||
10 | , getPeers | ||
11 | , announcePeer | ||
12 | |||
13 | -- * Server | ||
14 | , dhtServer | 3 | , dhtServer |
15 | ) where | 4 | ) where |
16 | 5 | ||
17 | import Control.Applicative | 6 | import Network.BitTorrent.DHT.Protocol \ No newline at end of file |
18 | import Control.Concurrent | ||
19 | import Control.Concurrent.STM | ||
20 | import Control.Monad | ||
21 | import Control.Exception | ||
22 | import Data.ByteString | ||
23 | import Data.Serialize as S | ||
24 | import Data.Function | ||
25 | import Data.Ord | ||
26 | import Data.Maybe | ||
27 | import Data.List as L | ||
28 | import Data.Map as M | ||
29 | import Data.HashMap.Strict as HM | ||
30 | import Network | ||
31 | import Network.Socket | ||
32 | import System.Entropy | ||
33 | |||
34 | import Remote.KRPC | ||
35 | import Remote.KRPC.Protocol | ||
36 | import Data.BEncode | ||
37 | import Data.Torrent | ||
38 | import Network.BitTorrent.Peer | ||
39 | |||
40 | {----------------------------------------------------------------------- | ||
41 | Node | ||
42 | -----------------------------------------------------------------------} | ||
43 | |||
44 | type NodeId = ByteString | ||
45 | |||
46 | -- WARN is the 'system' random suitable for this? | ||
47 | -- | Generate random NodeID used for the entire session. | ||
48 | -- Distribution of ID's should be as uniform as possible. | ||
49 | -- | ||
50 | genNodeId :: IO NodeId | ||
51 | genNodeId = getEntropy 20 | ||
52 | |||
53 | instance Serialize PortNumber where | ||
54 | get = fromIntegral <$> getWord16be | ||
55 | put = putWord16be . fromIntegral | ||
56 | |||
57 | |||
58 | data NodeAddr = NodeAddr { | ||
59 | nodeIP :: {-# UNPACK #-} !HostAddress | ||
60 | , nodePort :: {-# UNPACK #-} !PortNumber | ||
61 | } deriving (Show, Eq) | ||
62 | |||
63 | instance Serialize NodeAddr where | ||
64 | get = NodeAddr <$> getWord32be <*> get | ||
65 | put NodeAddr {..} = do | ||
66 | putWord32be nodeIP | ||
67 | put nodePort | ||
68 | |||
69 | |||
70 | data NodeInfo = NodeInfo { | ||
71 | nodeID :: !NodeId | ||
72 | , nodeAddr :: !NodeAddr | ||
73 | } deriving (Show, Eq) | ||
74 | |||
75 | instance Serialize NodeInfo where | ||
76 | get = NodeInfo <$> getByteString 20 <*> get | ||
77 | put NodeInfo {..} = put nodeID >> put nodeAddr | ||
78 | |||
79 | type CompactInfo = ByteString | ||
80 | |||
81 | decodeCompact :: CompactInfo -> [NodeInfo] | ||
82 | decodeCompact = either (const []) id . S.runGet (many get) | ||
83 | |||
84 | encodeCompact :: [NodeId] -> CompactInfo | ||
85 | encodeCompact = S.runPut . mapM_ put | ||
86 | |||
87 | decodePeerList :: [BEncode] -> [PeerAddr] | ||
88 | decodePeerList = undefined | ||
89 | |||
90 | encodePeerList :: [PeerAddr] -> [BEncode] | ||
91 | encodePeerList = undefined | ||
92 | |||
93 | type Distance = NodeId | ||
94 | |||
95 | |||
96 | {----------------------------------------------------------------------- | ||
97 | Tokens | ||
98 | -----------------------------------------------------------------------} | ||
99 | |||
100 | type Secret = Int | ||
101 | |||
102 | genSecret :: IO Secret | ||
103 | genSecret = undefined | ||
104 | |||
105 | -- | Instead of periodically loop over the all nodes in the routing | ||
106 | -- table with some given interval (or some other tricky method | ||
107 | -- e.g. using timeouts) we can just update tokens on demand - if no | ||
108 | -- one asks for a token then the token _should_ not change at all. | ||
109 | -- | ||
110 | type Token = ByteString | ||
111 | |||
112 | genToken :: NodeAddr -> Secret -> Token | ||
113 | genToken = return undefined | ||
114 | |||
115 | defaultToken :: Token | ||
116 | defaultToken = "0xdeadbeef" | ||
117 | |||
118 | {----------------------------------------------------------------------- | ||
119 | Routing table | ||
120 | -----------------------------------------------------------------------} | ||
121 | |||
122 | type ContactInfo = HashMap InfoHash [PeerAddr] | ||
123 | |||
124 | insertPeer :: InfoHash -> PeerAddr -> ContactInfo -> ContactInfo | ||
125 | insertPeer ih addr = HM.insertWith (++) ih [addr] | ||
126 | |||
127 | lookupPeers :: InfoHash -> ContactInfo -> [PeerAddr] | ||
128 | lookupPeers ih = fromMaybe [] . HM.lookup ih | ||
129 | |||
130 | -- TODO use more compact routing table | ||
131 | type RoutingTable = HashMap NodeId NodeAddr | ||
132 | |||
133 | insertNode :: NodeId -> NodeAddr -> RoutingTable -> RoutingTable | ||
134 | insertNode = HM.insert | ||
135 | |||
136 | type Alpha = Int | ||
137 | |||
138 | defaultAlpha :: Alpha | ||
139 | defaultAlpha = 8 | ||
140 | |||
141 | -- TODO | ||
142 | kclosest :: Int -> NodeId -> RoutingTable -> [NodeId] | ||
143 | kclosest = undefined | ||
144 | |||
145 | {----------------------------------------------------------------------- | ||
146 | Node session | ||
147 | -----------------------------------------------------------------------} | ||
148 | |||
149 | data NodeSession = NodeSession { | ||
150 | nodeId :: !NodeId | ||
151 | , routingTable :: !(TVar RoutingTable) | ||
152 | , contactInfo :: !(TVar ContactInfo) | ||
153 | , alpha :: !Alpha | ||
154 | , listenerPort :: !PortNumber | ||
155 | } | ||
156 | |||
157 | instance Eq NodeSession where | ||
158 | (==) = (==) `on` nodeId | ||
159 | |||
160 | instance Ord NodeSession where | ||
161 | compare = comparing nodeId | ||
162 | |||
163 | newNodeSession :: PortNumber -> IO NodeSession | ||
164 | newNodeSession lport | ||
165 | = NodeSession | ||
166 | <$> genNodeId | ||
167 | <*> newTVarIO HM.empty | ||
168 | <*> newTVarIO HM.empty | ||
169 | <*> pure defaultAlpha | ||
170 | <*> pure lport | ||
171 | |||
172 | assignToken :: NodeSession -> NodeId -> IO Token | ||
173 | assignToken _ _ = return "" | ||
174 | |||
175 | -- TODO | ||
176 | checkToken :: NodeId -> Token -> NodeSession -> IO Bool | ||
177 | checkToken nid token _ = return True | ||
178 | |||
179 | updateTimestamp :: NodeSession -> NodeId -> IO () | ||
180 | updateTimestamp = error "updateTimestamp" | ||
181 | |||
182 | updateToken :: NodeSession -> NodeId -> Token -> IO () | ||
183 | updateToken _ _ _ = error "updateToken" | ||
184 | |||
185 | {----------------------------------------------------------------------- | ||
186 | DHT Queries | ||
187 | -----------------------------------------------------------------------} | ||
188 | |||
189 | pingM :: Method NodeId NodeId | ||
190 | pingM = method "ping" ["id"] ["id"] | ||
191 | |||
192 | findNodeM :: Method (NodeId, NodeId) (NodeId, CompactInfo) | ||
193 | findNodeM = method "find_node" ["id", "target"] ["id", "nodes"] | ||
194 | |||
195 | -- | Lookup peers by a torrent infohash. This method might return | ||
196 | -- different kind of responses depending on the routing table of | ||
197 | -- queried node: | ||
198 | -- | ||
199 | -- * If quieried node contains a peer list for the given infohash | ||
200 | -- then the node should return the list in a "value" key. Note that | ||
201 | -- list is encoded as compact peer address, not a compact node info. | ||
202 | -- The result of 'get_peers' method have the following scheme: | ||
203 | -- | ||
204 | -- > { "id" : "dht_server_node_id" | ||
205 | -- > , "token" : "assigned_token" | ||
206 | -- > , "values" : ["_IP_PO", "_ip_po"] | ||
207 | -- > } | ||
208 | -- | ||
209 | -- * If quieried node does not contain a list of peers associated | ||
210 | -- with the given infohash, then node should return | ||
211 | -- | ||
212 | -- > { "id" : "dht_server_node_id" | ||
213 | -- > , "token" : "assigned_token" | ||
214 | -- > , "nodes" : "compact_nodes_info" | ||
215 | -- > } | ||
216 | -- | ||
217 | -- The resulting dictionaries might differ only in a values\/nodes | ||
218 | -- keys. | ||
219 | -- | ||
220 | getPeersM :: Method (NodeId, InfoHash) BEncode | ||
221 | getPeersM = method "get_peers" ["id", "info_hash"] [] | ||
222 | |||
223 | -- | Used to announce that the peer, controlling the quering node is | ||
224 | -- downloading a torrent on a port. | ||
225 | announcePeerM :: Method (NodeId, InfoHash, PortNumber, Token) NodeId | ||
226 | announcePeerM = method "announce_peer" ["id", "info_hash", "port", "token"] ["id"] | ||
227 | |||
228 | {----------------------------------------------------------------------- | ||
229 | DHT Tracker | ||
230 | -----------------------------------------------------------------------} | ||
231 | -- TODO: update node timestamp on each successful call | ||
232 | |||
233 | -- | Note that tracker side query functions could throw RPCException. | ||
234 | type DHT a b = NodeSession -> NodeAddr -> a -> IO b | ||
235 | |||
236 | ping :: DHT () () | ||
237 | ping NodeSession {..} addr @ NodeAddr {..} () = do | ||
238 | nid <- call (nodeIP, nodePort) pingM nodeId | ||
239 | atomically $ modifyTVar' routingTable $ HM.insert nid addr | ||
240 | |||
241 | findNode :: DHT NodeId [NodeInfo] | ||
242 | findNode ses @ NodeSession {..} NodeAddr {..} qnid = do | ||
243 | (nid, info) <- call (nodeIP, nodePort) findNodeM (nodeId, qnid) | ||
244 | updateTimestamp ses nid | ||
245 | return (decodeCompact info) | ||
246 | |||
247 | getPeers :: DHT InfoHash (Either [NodeInfo] [PeerAddr]) | ||
248 | getPeers ses @ NodeSession {..} NodeAddr {..} ih = do | ||
249 | resp <- call (nodeIP, nodePort) getPeersM (nodeId, ih) | ||
250 | (nid, tok, res) <- extrResp resp | ||
251 | updateTimestamp ses nid | ||
252 | updateToken ses nid tok | ||
253 | return res | ||
254 | where | ||
255 | extrResp (BDict d) | ||
256 | | Just (BString nid ) <- M.lookup "id" d | ||
257 | , Just (BString tok ) <- M.lookup "token" d | ||
258 | , Just (BList values) <- M.lookup "values" d | ||
259 | = return $ (nid, tok, Right $ decodePeerList values) | ||
260 | |||
261 | | Just (BString nid ) <- M.lookup "id" d | ||
262 | , Just (BString tok ) <- M.lookup "token" d | ||
263 | , Just (BString nodes) <- M.lookup "nodes" d | ||
264 | = return (nid, tok, Left $ decodeCompact nodes) | ||
265 | |||
266 | extrResp _ = throw $ RPCException msg | ||
267 | where msg = ProtocolError "unable to extract getPeers resp" | ||
268 | |||
269 | -- remove token from signature, handle the all token stuff by NodeSession | ||
270 | |||
271 | -- | Note that before ever calling this method you should call the | ||
272 | -- getPeerList. | ||
273 | announcePeer :: DHT (InfoHash, Token) NodeId | ||
274 | announcePeer ses @ NodeSession {..} NodeAddr {..} (ih, tok) = do | ||
275 | nid <- call (nodeIP, nodePort) announcePeerM (nodeId, ih, listenerPort, tok) | ||
276 | updateTimestamp ses nid | ||
277 | return nid | ||
278 | |||
279 | {----------------------------------------------------------------------- | ||
280 | DHT Server | ||
281 | -----------------------------------------------------------------------} | ||
282 | -- TODO: update node timestamp on each successful call | ||
283 | -- NOTE: ensure all server operations run in O(1) | ||
284 | |||
285 | type ServerHandler a b = NodeSession -> NodeAddr -> a -> IO b | ||
286 | |||
287 | pingS :: ServerHandler NodeId NodeId | ||
288 | pingS NodeSession {..} addr nid = do | ||
289 | atomically $ modifyTVar' routingTable $ insertNode nid addr | ||
290 | return nodeId | ||
291 | |||
292 | findNodeS :: ServerHandler (NodeId, NodeId) (NodeId, CompactInfo) | ||
293 | findNodeS ses @ NodeSession {..} _ (nid, qnid) = do | ||
294 | updateTimestamp ses nid | ||
295 | rt <- atomically $ readTVar routingTable | ||
296 | return (nodeId, encodeCompact $ kclosest alpha qnid rt) | ||
297 | |||
298 | getPeersS :: ServerHandler (NodeId, InfoHash) BEncode | ||
299 | getPeersS ses @ NodeSession {..} _ (nid, ih) = do | ||
300 | updateTimestamp ses nid | ||
301 | mkResp <$> assignToken ses nid <*> findPeers | ||
302 | where | ||
303 | findPeers = do | ||
304 | list <- lookupPeers ih <$> readTVarIO contactInfo | ||
305 | if not (L.null list) | ||
306 | then return $ Right list | ||
307 | else do | ||
308 | rt <- readTVarIO routingTable | ||
309 | let nodes = kclosest alpha (getInfoHash ih) rt | ||
310 | return $ Left nodes | ||
311 | |||
312 | mkDict tok res = [("id",BString nodeId), ("token", BString tok), res] | ||
313 | mkResult (Left nodes ) = ("nodes", BString $ encodeCompact nodes) | ||
314 | mkResult (Right values) = ("values", BList $ encodePeerList values) | ||
315 | mkResp tok = BDict . M.fromList . mkDict tok . mkResult | ||
316 | |||
317 | announcePeerS :: ServerHandler (NodeId, InfoHash, PortNumber, Token) NodeId | ||
318 | announcePeerS ses @ NodeSession {..} NodeAddr {..} (nid, ih, port, token) = do | ||
319 | updateTimestamp ses nid | ||
320 | registered <- checkToken nid token ses | ||
321 | when registered $ do | ||
322 | atomically $ do | ||
323 | let peerAddr = PeerAddr Nothing nodeIP port | ||
324 | modifyTVar contactInfo $ insertPeer ih peerAddr | ||
325 | return nodeId | ||
326 | |||
327 | dhtTracker :: NodeSession -> InfoHash -> Chan PeerAddr -> IO () | ||
328 | dhtTracker = undefined | ||
329 | |||
330 | dhtServer :: NodeSession -> PortNumber -> IO () | ||
331 | dhtServer s p = server p methods | ||
332 | where | ||
333 | methods = | ||
334 | [ pingM ==> pingS s undefined | ||
335 | , findNodeM ==> findNodeS s undefined | ||
336 | , getPeersM ==> getPeersS s undefined | ||
337 | , announcePeerM ==> announcePeerS s undefined | ||
338 | ] \ No newline at end of file | ||