diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 750 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 571 |
2 files changed, 0 insertions, 1321 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs deleted file mode 100644 index 003bb5b9..00000000 --- a/src/Network/BitTorrent/DHT/Query.hs +++ /dev/null | |||
@@ -1,750 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides functions to interact with other nodes. | ||
9 | -- Normally, you don't need to import this module, use | ||
10 | -- "Network.BitTorrent.DHT" instead. | ||
11 | -- | ||
12 | {-# LANGUAGE CPP #-} | ||
13 | {-# LANGUAGE FlexibleContexts #-} | ||
14 | {-# LANGUAGE ScopedTypeVariables #-} | ||
15 | {-# LANGUAGE TemplateHaskell #-} | ||
16 | {-# LANGUAGE TupleSections #-} | ||
17 | {-# LANGUAGE PartialTypeSignatures #-} | ||
18 | {-# LANGUAGE GADTs #-} | ||
19 | {-# LANGUAGE RankNTypes #-} | ||
20 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
21 | module Network.BitTorrent.DHT.Query | ||
22 | ( -- * Handler | ||
23 | -- | To bind specific set of handlers you need to pass | ||
24 | -- handler list to the 'startNode' function. | ||
25 | pingH | ||
26 | , findNodeH | ||
27 | , getPeersH | ||
28 | , announceH | ||
29 | , defaultHandlers | ||
30 | |||
31 | -- * Query | ||
32 | -- ** Basic | ||
33 | -- | A basic query perform a single request expecting a | ||
34 | -- single response. | ||
35 | , Iteration | ||
36 | , pingQ | ||
37 | , coldPingQ | ||
38 | , findNodeQ | ||
39 | , getPeersQ | ||
40 | , announceQ | ||
41 | |||
42 | -- ** Iterative | ||
43 | -- | An iterative query perform multiple basic queries, | ||
44 | -- concatenate its responses, optionally yielding result and | ||
45 | -- continue to the next iteration. | ||
46 | , Search | ||
47 | -- , search | ||
48 | , publish | ||
49 | , ioFindNode | ||
50 | , ioFindNodes | ||
51 | , ioGetPeers | ||
52 | , isearch | ||
53 | , bgsearch | ||
54 | |||
55 | -- ** Routing table | ||
56 | , insertNode | ||
57 | , refreshNodes | ||
58 | |||
59 | -- ** Messaging | ||
60 | , queryNode | ||
61 | , queryNode' | ||
62 | , (<@>) | ||
63 | ) where | ||
64 | |||
65 | import Data.Bits | ||
66 | import Data.Default | ||
67 | #ifdef THREAD_DEBUG | ||
68 | import Control.Concurrent.Lifted.Instrument hiding (yield) | ||
69 | #else | ||
70 | import GHC.Conc (labelThread) | ||
71 | import Control.Concurrent.Lifted hiding (yield) | ||
72 | #endif | ||
73 | import Control.Exception.Lifted hiding (Handler) | ||
74 | import Control.Monad.Reader | ||
75 | import Control.Monad.Logger | ||
76 | import Data.Maybe | ||
77 | import Data.Conduit | ||
78 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
79 | import Data.Either | ||
80 | import Data.List as L | ||
81 | import Data.Monoid | ||
82 | import Data.Text as T | ||
83 | import qualified Data.Set as Set | ||
84 | ;import Data.Set (Set) | ||
85 | import Network | ||
86 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
87 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
88 | import Data.Time | ||
89 | import Data.Time.Clock.POSIX | ||
90 | import Data.Hashable (Hashable) | ||
91 | import Data.Serialize | ||
92 | import Data.Hashable | ||
93 | |||
94 | import Network.DatagramServer as KRPC hiding (Options, def) | ||
95 | import Network.KRPC.Method as KRPC | ||
96 | import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..)) | ||
97 | import Network.DatagramServer (QueryFailure(..)) | ||
98 | import Data.Torrent | ||
99 | import qualified Network.DHT as DHT | ||
100 | import Network.DHT.Mainline | ||
101 | import Network.DHT.Routing as R | ||
102 | import Network.BitTorrent.DHT.Session | ||
103 | import Control.Concurrent.STM | ||
104 | import qualified Network.BitTorrent.DHT.Search as Search | ||
105 | #ifdef VERSION_bencoding | ||
106 | import Data.BEncode (BValue) | ||
107 | import Network.DatagramServer.Mainline (KMessageOf) | ||
108 | #else | ||
109 | import Data.ByteString (ByteString) | ||
110 | import Network.DatagramServer.Tox | ||
111 | #endif | ||
112 | import Network.Address hiding (NodeId) | ||
113 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | ||
114 | import Network.DHT.Types | ||
115 | import Control.Monad.Trans.Control | ||
116 | import Data.Typeable | ||
117 | import Data.Serialize | ||
118 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
119 | import Data.String | ||
120 | |||
121 | |||
122 | {----------------------------------------------------------------------- | ||
123 | -- Handlers | ||
124 | -----------------------------------------------------------------------} | ||
125 | |||
126 | {- | ||
127 | nodeHandler :: ( Address ip | ||
128 | , KRPC dht (Query KMessageOf a) (Response KMessageOf b) | ||
129 | ) | ||
130 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | ||
131 | -} | ||
132 | nodeHandler :: forall raw dht addr u t q r. | ||
133 | (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u), | ||
134 | Default u, | ||
135 | IsString t, Functor dht, | ||
136 | KRPC dht (Query dht q) (Response dht r), | ||
137 | SerializableTo raw (Response dht r), | ||
138 | SerializableTo raw (Query dht q), | ||
139 | Show (QueryMethod dht)) => | ||
140 | (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) | ||
141 | -> (NodeAddr addr -> IO (NodeId dht)) | ||
142 | -> (Char -> t -> Text -> IO ()) | ||
143 | -> DHTData dht addr | ||
144 | -> QueryMethod dht | ||
145 | -> (NodeAddr addr -> q -> IO r) | ||
146 | -> Handler IO dht raw | ||
147 | nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do | ||
148 | let remoteId = messageSender (msg :: dht (Query dht q)) resptype | ||
149 | qextra = queryExtra qry | ||
150 | resptype = Proxy :: Proxy (Response dht r) | ||
151 | q = queryParams qry | ||
152 | qry = envelopePayload msg :: Query dht q | ||
153 | case fromSockAddr sockAddr of | ||
154 | Nothing -> throwIO BadAddress | ||
155 | Just naddr -> do | ||
156 | logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method) | ||
157 | me <- myNodeIdAccordingTo naddr | ||
158 | rextra <- liftIO $ makeResponseExtra dta me qry resptype | ||
159 | let ni = NodeInfo remoteId naddr def | ||
160 | -- Do not route read-only nodes. (bep 43) | ||
161 | if fromRoutableNode qextra | ||
162 | then insertNode ni Nothing >> return () -- TODO need to block. why? | ||
163 | else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | ||
164 | Response | ||
165 | <$> pure rextra | ||
166 | <*> action naddr q | ||
167 | |||
168 | -- | Default 'Ping' handler. | ||
169 | pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) | ||
170 | pingH dht _ _ = return (DHT.pongMessage dht) | ||
171 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | ||
172 | |||
173 | -- | Default 'FindNode' handler. | ||
174 | findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) | ||
175 | findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) | ||
176 | |||
177 | -- | Default 'GetPeers' handler. | ||
178 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) | ||
179 | getPeersH getPeerList toks naddr (GetPeers ih) = do | ||
180 | ps <- getPeerList ih | ||
181 | tok <- grantToken toks naddr | ||
182 | return $ GotPeers ps tok | ||
183 | |||
184 | -- | Default 'Announce' handler. | ||
185 | announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced | ||
186 | announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do | ||
187 | valid <- checkToken toks naddr sessionToken | ||
188 | unless valid $ do | ||
189 | throwIO $ InvalidParameter "token" | ||
190 | |||
191 | let annPort = if impliedPort then nodePort else port | ||
192 | peerAddr = PeerAddr Nothing nodeHost annPort | ||
193 | insertPeer peers topic announcedName peerAddr | ||
194 | return Announced | ||
195 | |||
196 | -- | Includes all Kademlia-related handlers. | ||
197 | kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip | ||
198 | , Ord (TransactionID dht) | ||
199 | , Ord (NodeId dht) | ||
200 | , Show u | ||
201 | , SerializableTo raw (Response dht (Ping dht)) | ||
202 | , SerializableTo raw (Query dht (Ping dht)) | ||
203 | , Show (QueryMethod dht) | ||
204 | , Show (NodeId dht) | ||
205 | , FiniteBits (NodeId dht) | ||
206 | , Default u | ||
207 | , Serialize (TransactionID dht) | ||
208 | , WireFormat raw dht | ||
209 | , Kademlia dht | ||
210 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
211 | , Functor dht | ||
212 | , Pretty (NodeInfo dht ip u) | ||
213 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
214 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
215 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
216 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
217 | -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] | ||
218 | kademliaHandlers logger = do | ||
219 | groknode <- insertNode1 | ||
220 | mynid <- myNodeIdAccordingTo1 | ||
221 | dta <- asks dhtData | ||
222 | let handler :: ( KRPC dht (Query dht a) (Response dht b) | ||
223 | , SerializableTo raw (Response dht b) | ||
224 | , SerializableTo raw (Query dht a) | ||
225 | ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw | ||
226 | handler = nodeHandler groknode mynid (logt logger) dta | ||
227 | dht = Proxy :: Proxy dht | ||
228 | getclosest <- getClosest1 | ||
229 | return [ handler (namePing dht) $ pingH dht | ||
230 | , handler (nameFindNodes dht) $ findNodeH getclosest | ||
231 | ] | ||
232 | |||
233 | instance DataHandlers BValue KMessageOf where | ||
234 | dataHandlers = bthandlers | ||
235 | |||
236 | bthandlers :: | ||
237 | ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => | ||
238 | (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) | ||
239 | -> DHTData KMessageOf ip | ||
240 | -> [MethodHandler BValue KMessageOf ip] | ||
241 | bthandlers getclosest dta = | ||
242 | [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta) | ||
243 | , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta) | ||
244 | ] | ||
245 | where | ||
246 | getpeers dta ih = do | ||
247 | ps <- lookupPeers (contactInfo dta) ih | ||
248 | if L.null ps | ||
249 | then Left <$> getclosest (toNodeId ih) | ||
250 | else return (Right ps) | ||
251 | |||
252 | |||
253 | -- | Includes all default query handlers. | ||
254 | defaultHandlers :: forall raw dht u ip. | ||
255 | ( Ord (TransactionID dht) | ||
256 | , Ord (NodeId dht) | ||
257 | , Show u | ||
258 | , SerializableTo raw (Response dht (Ping dht)) | ||
259 | , SerializableTo raw (Query dht (Ping dht)) | ||
260 | , Show (QueryMethod dht) | ||
261 | , Show (NodeId dht) | ||
262 | , FiniteBits (NodeId dht) | ||
263 | , Default u | ||
264 | , Serialize (TransactionID dht) | ||
265 | , WireFormat raw dht | ||
266 | , Kademlia dht | ||
267 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
268 | , Functor dht | ||
269 | , Pretty (NodeInfo dht ip u) | ||
270 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
271 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
272 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
273 | , Eq ip, Ord ip, Address ip, DataHandlers raw dht | ||
274 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
275 | defaultHandlers logger = do | ||
276 | groknode <- insertNode1 | ||
277 | mynid <- myNodeIdAccordingTo1 | ||
278 | dta <- asks dhtData | ||
279 | let handler :: MethodHandler raw dht ip -> Handler IO dht raw | ||
280 | handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action | ||
281 | getclosest <- getClosest1 | ||
282 | hs <- kademliaHandlers logger | ||
283 | return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta) | ||
284 | |||
285 | {----------------------------------------------------------------------- | ||
286 | -- Basic queries | ||
287 | -----------------------------------------------------------------------} | ||
288 | |||
289 | type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) | ||
290 | |||
291 | -- | The most basic query. May be used to check if the given node is | ||
292 | -- alive or get its 'NodeId'. | ||
293 | pingQ :: forall raw dht u ip. | ||
294 | ( DHT.Kademlia dht | ||
295 | , Address ip | ||
296 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
297 | , Default u | ||
298 | , Show u | ||
299 | , Ord (TransactionID dht) | ||
300 | , Serialize (TransactionID dht) | ||
301 | , WireFormat raw dht | ||
302 | , SerializableTo raw (Response dht (Ping dht)) | ||
303 | , SerializableTo raw (Query dht (Ping dht)) | ||
304 | , Ord (NodeId dht) | ||
305 | , FiniteBits (NodeId dht) | ||
306 | , Show (NodeId dht) | ||
307 | , Show (QueryMethod dht) | ||
308 | ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
309 | pingQ ni = do | ||
310 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
311 | (nid, pong, mip) <- queryNode' ni ping | ||
312 | let _ = pong `asTypeOf` ping | ||
313 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
314 | return (NodeInfo nid (nodeAddr ni) def, mip) | ||
315 | |||
316 | -- | The most basic query. May be used to check if the given node is | ||
317 | -- alive or get its 'NodeId'. | ||
318 | coldPingQ :: forall raw dht u ip. | ||
319 | ( DHT.Kademlia dht | ||
320 | , Address ip | ||
321 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
322 | , Default u | ||
323 | , Show u | ||
324 | , Ord (TransactionID dht) | ||
325 | , Serialize (TransactionID dht) | ||
326 | , WireFormat raw dht | ||
327 | , SerializableTo raw (Response dht (Ping dht)) | ||
328 | , SerializableTo raw (Query dht (Ping dht)) | ||
329 | , Ord (NodeId dht) | ||
330 | , FiniteBits (NodeId dht) | ||
331 | , Show (NodeId dht) | ||
332 | , Show (QueryMethod dht) | ||
333 | ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
334 | coldPingQ dest = do | ||
335 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
336 | naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination") | ||
337 | return | ||
338 | $ fromAddr dest | ||
339 | (nid, pong, mip) <- coldQueryNode' naddr dest ping | ||
340 | let _ = pong `asTypeOf` ping | ||
341 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
342 | return (NodeInfo nid naddr def, mip) | ||
343 | |||
344 | -- TODO [robustness] match range of returned node ids with the | ||
345 | -- expected range and either filter bad nodes or discard response at | ||
346 | -- all throwing an exception | ||
347 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo | ||
348 | findNodeQ proxy key ni = do | ||
349 | closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni | ||
350 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | ||
351 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) | ||
352 | return $ Right closest | ||
353 | |||
354 | #ifdef VERSION_bencoding | ||
355 | getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr | ||
356 | getPeersQ topic ni = do | ||
357 | GotPeers {..} <- GetPeers topic <@> ni | ||
358 | let dist = distance (toNodeId topic) (nodeId ni) | ||
359 | $(logInfoS) "getPeersQ" $ T.pack | ||
360 | $ "distance: " <> render (pPrint dist) <> " , result: " | ||
361 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } | ||
362 | return peers | ||
363 | |||
364 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr | ||
365 | announceQ ih p ni = do | ||
366 | GotPeers {..} <- GetPeers ih <@> ni | ||
367 | case peers of | ||
368 | Left ns | ||
369 | | False -> undefined -- TODO check if we can announce | ||
370 | | otherwise -> return (Left ns) | ||
371 | Right _ -> do -- TODO *probably* add to peer cache | ||
372 | Announced <- Announce False ih Nothing p grantedToken <@> ni | ||
373 | return (Right [nodeAddr ni]) | ||
374 | #endif | ||
375 | |||
376 | {----------------------------------------------------------------------- | ||
377 | -- Iterative queries | ||
378 | -----------------------------------------------------------------------} | ||
379 | |||
380 | |||
381 | ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) | ||
382 | ioGetPeers ih = do | ||
383 | session <- ask | ||
384 | return $ \ni -> runDHT session $ do | ||
385 | r <- try $ getPeersQ ih ni | ||
386 | case r of | ||
387 | Right e -> return $ either (,[]) ([],) e | ||
388 | Left e -> let _ = e :: QueryFailure in return ([],[]) | ||
389 | |||
390 | ioFindNode :: ( DHT.Kademlia dht | ||
391 | , WireFormat raw dht | ||
392 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
393 | , Address ip | ||
394 | , Default u | ||
395 | , Show u | ||
396 | , Show (QueryMethod dht) | ||
397 | , TableKey dht infohash | ||
398 | , Eq (NodeId dht) | ||
399 | , Ord (NodeId dht) | ||
400 | , FiniteBits (NodeId dht) | ||
401 | , Show (NodeId dht) | ||
402 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
403 | , Ord (TransactionID dht) | ||
404 | , Serialize (TransactionID dht) | ||
405 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
406 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
407 | , SerializableTo raw (Response dht (Ping dht)) | ||
408 | , SerializableTo raw (Query dht (Ping dht)) | ||
409 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
410 | ioFindNode ih = do | ||
411 | session <- ask | ||
412 | return $ \ni -> runDHT session $ do | ||
413 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
414 | let ns' = L.map (fmap (const def)) ns | ||
415 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' | ||
416 | |||
417 | |||
418 | -- | Like ioFindNode, but considers all found nodes to be 'Right' results. | ||
419 | ioFindNodes :: ( DHT.Kademlia dht | ||
420 | , WireFormat raw dht | ||
421 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
422 | , Address ip | ||
423 | , Default u | ||
424 | , Show u | ||
425 | , Show (QueryMethod dht) | ||
426 | , TableKey dht infohash | ||
427 | , Eq (NodeId dht) | ||
428 | , Ord (NodeId dht) | ||
429 | , FiniteBits (NodeId dht) | ||
430 | , Show (NodeId dht) | ||
431 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
432 | , Ord (TransactionID dht) | ||
433 | , Serialize (TransactionID dht) | ||
434 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
435 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
436 | , SerializableTo raw (Response dht (Ping dht)) | ||
437 | , SerializableTo raw (Query dht (Ping dht)) | ||
438 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
439 | ioFindNodes ih = do | ||
440 | session <- ask | ||
441 | return $ \ni -> runDHT session $ do | ||
442 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
443 | let ns' = L.map (fmap (const def)) ns | ||
444 | return ([], ns') | ||
445 | |||
446 | isearch :: ( Ord r | ||
447 | , Ord ip | ||
448 | , Ord (NodeId dht) | ||
449 | , FiniteBits (NodeId dht) | ||
450 | , TableKey dht ih | ||
451 | , Show ih) => | ||
452 | (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) | ||
453 | -> ih | ||
454 | -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) | ||
455 | isearch f ih = do | ||
456 | qry <- f ih | ||
457 | ns <- kclosest 8 ih <$> getTable | ||
458 | liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns | ||
459 | a <- fork $ do | ||
460 | tid <- myThreadId | ||
461 | labelThread tid ("search."++show ih) | ||
462 | Search.search s | ||
463 | -- atomically \$ readTVar (Search.searchResults s) | ||
464 | return (a, s) | ||
465 | |||
466 | -- | Background search: fill a lazy list using a background thread. | ||
467 | bgsearch f ih = do | ||
468 | (tid, s) <- isearch f ih | ||
469 | let again shown = do | ||
470 | (chk,fin) <- atomically $ do | ||
471 | r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) | ||
472 | if not $ Set.null r | ||
473 | then (,) r <$> Search.searchIsFinished s | ||
474 | else Search.searchIsFinished s >>= check >> return (Set.empty,True) | ||
475 | let ps = Set.toList chk | ||
476 | if fin then return ps | ||
477 | else do | ||
478 | xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) | ||
479 | return $ ps ++ xs | ||
480 | liftIO $ again Set.empty | ||
481 | |||
482 | type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] | ||
483 | |||
484 | #if 0 | ||
485 | |||
486 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
487 | -- search :: k -> IterationI ip o -> Search ip o | ||
488 | search _ action = do | ||
489 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
490 | $(logWarnS) "search" "start query" | ||
491 | responses <- lift $ queryParallel (action <$> batch) | ||
492 | let (nodes, results) = partitionEithers responses | ||
493 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) | ||
494 | leftover $ L.concat nodes | ||
495 | let r = mapM_ yield results | ||
496 | _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) | ||
497 | r | ||
498 | |||
499 | #endif | ||
500 | |||
501 | publish = error "todo" | ||
502 | -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () | ||
503 | -- publish ih p = do | ||
504 | -- nodes <- getClosest ih | ||
505 | -- r <- asks (optReplication . options) | ||
506 | -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
507 | -- return () | ||
508 | |||
509 | |||
510 | probeNode :: ( Default u | ||
511 | , Show u | ||
512 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
513 | , DHT.Kademlia dht | ||
514 | , Address ip | ||
515 | , Ord (TransactionID dht) | ||
516 | , Serialize (TransactionID dht) | ||
517 | , WireFormat raw dht | ||
518 | , SerializableTo raw (Response dht (Ping dht)) | ||
519 | , SerializableTo raw (Query dht (Ping dht)) | ||
520 | , Ord (NodeId dht) | ||
521 | , FiniteBits (NodeId dht) | ||
522 | , Show (NodeId dht) | ||
523 | , Show (QueryMethod dht) | ||
524 | ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP) | ||
525 | probeNode addr = do | ||
526 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr))) | ||
527 | result <- try $ pingQ addr | ||
528 | let _ = fmap (const ()) result :: Either QueryFailure () | ||
529 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result | ||
530 | |||
531 | |||
532 | refreshNodes :: forall raw dht u ip. | ||
533 | ( Address ip | ||
534 | , Ord (NodeId dht) | ||
535 | , Default u | ||
536 | , FiniteBits (NodeId dht) | ||
537 | , Pretty (NodeId dht) | ||
538 | , DHT.Kademlia dht | ||
539 | , Ord ip | ||
540 | , Ord (TransactionID dht) | ||
541 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
542 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
543 | , SerializableTo raw (Response dht (Ping dht)) | ||
544 | , SerializableTo raw (Query dht (Ping dht)) | ||
545 | , Pretty (NodeInfo dht ip u) | ||
546 | , Show (NodeId dht) | ||
547 | , Show u | ||
548 | , Show (QueryMethod dht) | ||
549 | , Serialize (TransactionID dht) | ||
550 | , WireFormat raw dht | ||
551 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
552 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
553 | ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] | ||
554 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
555 | refreshNodes nid = do | ||
556 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | ||
557 | nodes <- getClosest nid | ||
558 | do | ||
559 | -- forM (L.take 1 nodes) \$ \ addr -> do | ||
560 | -- NodeFound ns <- FindNode nid <@> addr | ||
561 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
562 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
563 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume | ||
564 | -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume | ||
565 | ns <- bgsearch ioFindNodes nid | ||
566 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes." | ||
567 | _ <- queryParallel $ flip L.map ns $ \n -> do | ||
568 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | ||
569 | pingQ n | ||
570 | -- pingQ takes care of inserting the node. | ||
571 | return () | ||
572 | return () -- \$ L.concat nss | ||
573 | |||
574 | logc :: Char -> String -> DHT raw dht u ip () | ||
575 | logc 'D' = $(logDebugS) "insertNode" . T.pack | ||
576 | logc 'W' = $(logWarnS) "insertNode" . T.pack | ||
577 | logc 'I' = $(logInfoS) "insertNode" . T.pack | ||
578 | logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | ||
579 | |||
580 | -- | This operation do not block but acquire exclusive access to | ||
581 | -- routing table. | ||
582 | insertNode :: forall raw dht u ip. | ||
583 | ( Address ip | ||
584 | , Ord (NodeId dht) | ||
585 | , FiniteBits (NodeId dht) | ||
586 | , Show (NodeId dht) | ||
587 | , Default u | ||
588 | , Show u | ||
589 | , DHT.Kademlia dht | ||
590 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
591 | , Ord (TransactionID dht) | ||
592 | , WireFormat raw dht | ||
593 | , Serialize (TransactionID dht) | ||
594 | , SerializableTo raw (Response dht (Ping dht)) | ||
595 | , SerializableTo raw (Query dht (Ping dht)) | ||
596 | , Ord (NodeId dht) | ||
597 | , Show (NodeId dht) | ||
598 | , Show (QueryMethod dht) | ||
599 | ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () | ||
600 | insertNode info witnessed_ip0 = do | ||
601 | f <- insertNode1 | ||
602 | liftIO $ f info witnessed_ip0 | ||
603 | |||
604 | insertNode1 :: forall raw dht u ip. | ||
605 | ( Address ip | ||
606 | , Default u | ||
607 | , Show u | ||
608 | , Ord (NodeId dht) | ||
609 | , FiniteBits (NodeId dht) | ||
610 | , Show (NodeId dht) | ||
611 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
612 | , DHT.Kademlia dht | ||
613 | , Ord (TransactionID dht) | ||
614 | , WireFormat raw dht | ||
615 | , Serialize (TransactionID dht) | ||
616 | , SerializableTo raw (Response dht (Ping dht)) | ||
617 | , SerializableTo raw (Query dht (Ping dht)) | ||
618 | , Ord (NodeId dht) | ||
619 | , Show (NodeId dht) | ||
620 | , Show (QueryMethod dht) | ||
621 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
622 | insertNode1 = do | ||
623 | bc <- optBucketCount <$> asks options | ||
624 | nid <- asks tentativeNodeId | ||
625 | logm0 <- embed_ (uncurry logc) | ||
626 | let logm c = logm0 . (c,) | ||
627 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. | ||
628 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | ||
629 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM | ||
630 | {- | ||
631 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive | ||
632 | ip <- fromSockAddr ip0 :: Maybe ip | ||
633 | listToMaybe | ||
634 | $ rank id (nodeId $ foreignNode arrival) | ||
635 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive | ||
636 | -} | ||
637 | params = DHT.TableParameters | ||
638 | { maxBuckets = bc :: Int | ||
639 | , fallbackID = nid :: NodeId dht | ||
640 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht | ||
641 | , logMessage = logm :: Char -> String -> IO () | ||
642 | , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) | ||
643 | } | ||
644 | tbl <- asks routingInfo | ||
645 | let state = DHT.TableKeeper | ||
646 | { routingInfo = tbl | ||
647 | , grokNode = DHT.insertNode params state | ||
648 | , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () | ||
649 | } | ||
650 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | ||
651 | |||
652 | -- | Throws exception if node is not responding. | ||
653 | queryNode :: forall raw dht u a b ip. | ||
654 | ( Address ip | ||
655 | , KRPC dht (Query dht a) (Response dht b) | ||
656 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
657 | , Default u | ||
658 | , Show u | ||
659 | , DHT.Kademlia dht | ||
660 | , Ord (TransactionID dht) | ||
661 | , Serialize (TransactionID dht) | ||
662 | , WireFormat raw dht | ||
663 | , SerializableTo raw (Response dht b) | ||
664 | , SerializableTo raw (Query dht a) | ||
665 | , Ord (NodeId dht) | ||
666 | , FiniteBits (NodeId dht) | ||
667 | , Show (NodeId dht) | ||
668 | , Show (QueryMethod dht) | ||
669 | , SerializableTo raw (Response dht (Ping dht)) | ||
670 | , SerializableTo raw (Query dht (Ping dht)) | ||
671 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b) | ||
672 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | ||
673 | |||
674 | queryNode' :: forall raw dht u a b ip. | ||
675 | ( Address ip | ||
676 | , Default u | ||
677 | , Show u | ||
678 | , DHT.Kademlia dht | ||
679 | , KRPC dht (Query dht a) (Response dht b) | ||
680 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
681 | , Ord (TransactionID dht) | ||
682 | , Serialize (TransactionID dht) | ||
683 | , WireFormat raw dht | ||
684 | , SerializableTo raw (Response dht b) | ||
685 | , SerializableTo raw (Query dht a) | ||
686 | , Ord (NodeId dht) | ||
687 | , FiniteBits (NodeId dht) | ||
688 | , Show (NodeId dht) | ||
689 | , Show (QueryMethod dht) | ||
690 | , SerializableTo raw (Response dht (Ping dht)) | ||
691 | , SerializableTo raw (Query dht (Ping dht)) | ||
692 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
693 | queryNode' ni q = do | ||
694 | let addr = nodeAddr ni | ||
695 | dest = makeAddress (Left $ nodeId ni) (toSockAddr addr) | ||
696 | coldQueryNode' addr dest q | ||
697 | |||
698 | coldQueryNode' :: forall raw dht u a b ip. | ||
699 | ( Address ip | ||
700 | , Default u | ||
701 | , Show u | ||
702 | , DHT.Kademlia dht | ||
703 | , KRPC dht (Query dht a) (Response dht b) | ||
704 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
705 | , Ord (TransactionID dht) | ||
706 | , Serialize (TransactionID dht) | ||
707 | , WireFormat raw dht | ||
708 | , SerializableTo raw (Response dht b) | ||
709 | , SerializableTo raw (Query dht a) | ||
710 | , Ord (NodeId dht) | ||
711 | , FiniteBits (NodeId dht) | ||
712 | , Show (NodeId dht) | ||
713 | , Show (QueryMethod dht) | ||
714 | , SerializableTo raw (Response dht (Ping dht)) | ||
715 | , SerializableTo raw (Query dht (Ping dht)) | ||
716 | ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
717 | coldQueryNode' addr dest q = do | ||
718 | nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest | ||
719 | dta <- asks dhtData | ||
720 | qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b)) | ||
721 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | ||
722 | -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b) | ||
723 | mgr <- asks manager | ||
724 | (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q) | ||
725 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | ||
726 | -- <> " by " <> T.pack (show (toSockAddr addr)) | ||
727 | _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip | ||
728 | return (remoteId, r, witnessed_ip) | ||
729 | |||
730 | -- | Infix version of 'queryNode' function. | ||
731 | (<@>) :: ( Address ip | ||
732 | , KRPC dht (Query dht a) (Response dht b) | ||
733 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
734 | , Default u | ||
735 | , Show u | ||
736 | , Show (QueryMethod dht) | ||
737 | , Ord (NodeId dht) | ||
738 | , FiniteBits (NodeId dht) | ||
739 | , Show (NodeId dht) | ||
740 | , Ord (TransactionID dht) | ||
741 | , Serialize (TransactionID dht) | ||
742 | , SerializableTo raw (Response dht b) | ||
743 | , SerializableTo raw (Query dht a) | ||
744 | , SerializableTo raw (Response dht (Ping dht)) | ||
745 | , SerializableTo raw (Query dht (Ping dht)) | ||
746 | , WireFormat raw dht | ||
747 | , Kademlia dht | ||
748 | ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b | ||
749 | q <@> addr = snd <$> queryNode addr q | ||
750 | {-# INLINE (<@>) #-} | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs deleted file mode 100644 index b775e7d3..00000000 --- a/src/Network/BitTorrent/DHT/Session.hs +++ /dev/null | |||
@@ -1,571 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013-2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module defines internal state of a node instance. You can | ||
9 | -- have multiple nodes per application but usually you don't have | ||
10 | -- to. Normally, you don't need to import this module, use | ||
11 | -- "Network.BitTorrent.DHT" instead. | ||
12 | -- | ||
13 | {-# LANGUAGE CPP #-} | ||
14 | {-# LANGUAGE RecordWildCards #-} | ||
15 | {-# LANGUAGE FlexibleContexts #-} | ||
16 | {-# LANGUAGE FlexibleInstances #-} | ||
17 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
18 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
19 | {-# LANGUAGE ScopedTypeVariables #-} | ||
20 | {-# LANGUAGE TypeFamilies #-} | ||
21 | {-# LANGUAGE TemplateHaskell #-} | ||
22 | module Network.BitTorrent.DHT.Session | ||
23 | ( -- * Options | ||
24 | -- | Use @optFooBar def@ to get default 'Alpha' or 'K'. | ||
25 | Alpha | ||
26 | , K | ||
27 | , Options (..) | ||
28 | |||
29 | -- * Session | ||
30 | , Node | ||
31 | , options | ||
32 | , tentativeNodeId | ||
33 | , myNodeIdAccordingTo | ||
34 | , myNodeIdAccordingTo1 | ||
35 | , routingInfo | ||
36 | , routableAddress | ||
37 | , getTimestamp | ||
38 | -- , SessionTokens | ||
39 | -- , sessionTokens | ||
40 | -- , contactInfo | ||
41 | , dhtData | ||
42 | , PeerStore | ||
43 | , manager | ||
44 | |||
45 | -- ** Initialization | ||
46 | , LogFun | ||
47 | , logt | ||
48 | , NodeHandler | ||
49 | , newNode | ||
50 | , closeNode | ||
51 | |||
52 | -- * DHT | ||
53 | -- | Use @asks options@ to get options passed to 'startNode' | ||
54 | -- or @asks thisNodeId@ to get id of locally running node. | ||
55 | , DHT | ||
56 | , runDHT | ||
57 | |||
58 | -- ** Tokens | ||
59 | -- , grantToken | ||
60 | -- , checkToken | ||
61 | |||
62 | -- ** Routing table | ||
63 | , getTable | ||
64 | , getClosest | ||
65 | , getClosest1 | ||
66 | |||
67 | #ifdef VERSION_bencoding | ||
68 | -- ** Peer storage | ||
69 | , insertPeer | ||
70 | , getPeerList | ||
71 | , getPeerList1 | ||
72 | , lookupPeers | ||
73 | , insertTopic | ||
74 | , deleteTopic | ||
75 | , getSwarms | ||
76 | , savePeerStore | ||
77 | , mergeSavedPeers | ||
78 | , allPeers | ||
79 | #endif | ||
80 | |||
81 | -- ** Messaging | ||
82 | , queryParallel | ||
83 | ) where | ||
84 | |||
85 | import Prelude hiding (ioError) | ||
86 | |||
87 | import Control.Concurrent.STM | ||
88 | #ifdef THREAD_DEBUG | ||
89 | import Control.Concurrent.Async.Lifted.Instrument | ||
90 | #else | ||
91 | import Control.Concurrent.Async.Lifted | ||
92 | #endif | ||
93 | import Control.Exception.Lifted hiding (Handler) | ||
94 | import Control.Monad.Base | ||
95 | import Control.Monad.Logger | ||
96 | import Control.Monad.Reader | ||
97 | import Control.Monad.Trans.Control | ||
98 | import Control.Monad.Trans.Resource | ||
99 | import Data.Typeable | ||
100 | import Data.String | ||
101 | import Data.Bits | ||
102 | import Data.ByteString | ||
103 | import Data.Conduit.Lazy | ||
104 | import Data.Default | ||
105 | import Data.Fixed | ||
106 | import Data.Hashable | ||
107 | import Data.List as L | ||
108 | import Data.Maybe | ||
109 | import Data.Monoid | ||
110 | import Data.Set as S | ||
111 | import Data.Time | ||
112 | import Network (PortNumber) | ||
113 | import System.Random (randomIO) | ||
114 | import Data.Time.Clock.POSIX | ||
115 | import Data.Text as Text | ||
116 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
117 | import Data.Serialize as S | ||
118 | import Network.DHT.Types | ||
119 | import Network.DatagramServer.Types | ||
120 | |||
121 | |||
122 | import Data.Torrent as Torrent | ||
123 | import Network.DatagramServer as KRPC hiding (Options, def) | ||
124 | import qualified Network.DatagramServer as KRPC (def) | ||
125 | #ifdef VERSION_bencoding | ||
126 | import Data.BEncode (BValue) | ||
127 | import Network.DatagramServer.Mainline (KMessageOf) | ||
128 | #else | ||
129 | import Network.DatagramServer.Tox as Tox | ||
130 | #endif | ||
131 | import Network.Address | ||
132 | import Network.BitTorrent.DHT.ContactInfo (PeerStore) | ||
133 | import qualified Network.BitTorrent.DHT.ContactInfo as P | ||
134 | import Network.DHT.Mainline | ||
135 | import Network.DHT.Routing as R | ||
136 | import Network.BitTorrent.DHT.Token as T | ||
137 | import GHC.Stack as GHC | ||
138 | |||
139 | {----------------------------------------------------------------------- | ||
140 | -- Options | ||
141 | -----------------------------------------------------------------------} | ||
142 | |||
143 | -- | Node lookups can proceed asynchronously. | ||
144 | type Alpha = Int | ||
145 | |||
146 | -- NOTE: libtorrent uses 5, azureus uses 10 | ||
147 | -- | The quantity of simultaneous lookups is typically three. | ||
148 | defaultAlpha :: Alpha | ||
149 | defaultAlpha = 3 | ||
150 | |||
151 | -- TODO add replication loop | ||
152 | |||
153 | -- TODO do not insert infohash -> peeraddr if infohash is too far from | ||
154 | -- this node id | ||
155 | {- | ||
156 | data Order | ||
157 | = NearFirst | ||
158 | | FarFirst | ||
159 | | Random | ||
160 | |||
161 | data Traversal | ||
162 | = Greedy -- ^ aggressive short-circuit traversal | ||
163 | | Exhaustive -- ^ | ||
164 | -} | ||
165 | |||
166 | -- | Original Kamelia DHT uses term /publish/ for data replication | ||
167 | -- process. BitTorrent DHT uses term /announce/ since the purpose of | ||
168 | -- the DHT is peer discovery. Later in documentation, we use terms | ||
169 | -- /publish/ and /announce/ interchangible. | ||
170 | data Options = Options | ||
171 | { -- | The degree of parallelism in 'find_node' queries. More | ||
172 | -- parallism lead to faster bootstrapping and lookup operations, | ||
173 | -- but also increase resource usage. | ||
174 | -- | ||
175 | -- Normally this parameter should not exceed 'optK'. | ||
176 | optAlpha :: {-# UNPACK #-} !Alpha | ||
177 | |||
178 | -- | /K/ parameter - number of nodes to return in 'find_node' | ||
179 | -- responses. | ||
180 | , optK :: {-# UNPACK #-} !K | ||
181 | |||
182 | -- | Number of buckets to maintain. This parameter depends on | ||
183 | -- amount of nodes in the DHT network. | ||
184 | , optBucketCount :: {-# UNPACK #-} !BucketCount | ||
185 | |||
186 | -- | RPC timeout. | ||
187 | , optTimeout :: !NominalDiffTime | ||
188 | |||
189 | -- | /R/ parameter - how many target nodes the 'announce' query | ||
190 | -- should affect. | ||
191 | -- | ||
192 | -- A large replica set compensates for inconsistent routing and | ||
193 | -- reduces the need to frequently republish data for | ||
194 | -- persistence. This comes at an increased cost for | ||
195 | -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes | ||
196 | -- contacted, and storage. | ||
197 | , optReplication :: {-# UNPACK #-} !NodeCount | ||
198 | |||
199 | -- | How often this node should republish (or reannounce) its | ||
200 | -- data. | ||
201 | -- | ||
202 | -- Large replica set ('optReplication') should require | ||
203 | -- smaller reannounce intervals ('optReannounce'). | ||
204 | , optReannounce :: !NominalDiffTime | ||
205 | |||
206 | -- | The time it takes for data to expire in the | ||
207 | -- network. Publisher of the data should republish (or | ||
208 | -- reannounce) data to keep it in the network. | ||
209 | -- | ||
210 | -- The /data expired timeout/ should be more than 'optReannounce' | ||
211 | -- interval. | ||
212 | , optDataExpired :: !NominalDiffTime | ||
213 | } deriving (Show, Eq) | ||
214 | |||
215 | -- | Optimal options for bittorrent client. For short-lifetime | ||
216 | -- utilities you most likely need to tune 'optAlpha' and | ||
217 | -- 'optBucketCount'. | ||
218 | instance Default Options where | ||
219 | def = Options | ||
220 | { optAlpha = defaultAlpha | ||
221 | , optK = defaultK | ||
222 | |||
223 | -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper. | ||
224 | , optBucketCount = defaultBucketCount | ||
225 | |||
226 | -- see Fig.4 from "Profiling a Million User DHT" paper. | ||
227 | , optTimeout = 5 -- seconds | ||
228 | , optReplication = 20 -- nodes | ||
229 | , optReannounce = 15 * 60 | ||
230 | , optDataExpired = 60 * 60 | ||
231 | } | ||
232 | |||
233 | seconds :: NominalDiffTime -> Int | ||
234 | seconds dt = fromEnum (realToFrac dt :: Uni) | ||
235 | {----------------------------------------------------------------------- | ||
236 | -- Session | ||
237 | -----------------------------------------------------------------------} | ||
238 | |||
239 | -- | A set of torrents this peer intends to share. | ||
240 | type AnnounceSet = Set (InfoHash, PortNumber) | ||
241 | |||
242 | -- | Logger function. | ||
243 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | ||
244 | |||
245 | -- | DHT session keep track state of /this/ node. | ||
246 | data Node raw dht u ip = Node | ||
247 | { -- | Session configuration; | ||
248 | options :: !Options | ||
249 | |||
250 | -- | Pseudo-unique self-assigned session identifier. This value is | ||
251 | -- constant during DHT session and (optionally) between sessions. | ||
252 | , tentativeNodeId :: !(NodeId dht) | ||
253 | |||
254 | , resources :: !InternalState | ||
255 | , manager :: !(Manager raw dht) -- ^ RPC manager; | ||
256 | , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; | ||
257 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | ||
258 | , dhtData :: DHTData dht ip | ||
259 | , loggerFun :: !LogFun | ||
260 | } | ||
261 | |||
262 | -- | DHT keep track current session and proper resource allocation for | ||
263 | -- safe multithreading. | ||
264 | newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } | ||
265 | deriving ( Functor, Applicative, Monad, MonadIO | ||
266 | , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow | ||
267 | ) | ||
268 | |||
269 | #if MIN_VERSION_monad_control(1,0,0) | ||
270 | newtype DHTStM raw dht u ip a = StM { | ||
271 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a | ||
272 | } | ||
273 | #endif | ||
274 | |||
275 | instance MonadBaseControl IO (DHT raw dht u ip) where | ||
276 | #if MIN_VERSION_monad_control(1,0,0) | ||
277 | type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a | ||
278 | #else | ||
279 | newtype StM (DHT raw dht u ip) a = StM { | ||
280 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a | ||
281 | } | ||
282 | #endif | ||
283 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | ||
284 | cc $ \ (DHT m) -> StM <$> cc' m | ||
285 | {-# INLINE liftBaseWith #-} | ||
286 | |||
287 | restoreM = DHT . restoreM . unSt | ||
288 | {-# INLINE restoreM #-} | ||
289 | |||
290 | -- | Check is it is possible to run 'queryNode' or handle pending | ||
291 | -- query from remote node. | ||
292 | instance MonadActive (DHT raw dht u ip) where | ||
293 | monadActive = getManager >>= liftIO . isActive | ||
294 | {-# INLINE monadActive #-} | ||
295 | |||
296 | -- | All allocated resources will be closed at 'closeNode'. | ||
297 | instance MonadResource (DHT raw dht u ip) where | ||
298 | liftResourceT m = do | ||
299 | s <- asks resources | ||
300 | liftIO $ runInternalState m s | ||
301 | |||
302 | -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where | ||
303 | |||
304 | getManager :: DHT raw dht u ip (Manager raw dht) | ||
305 | getManager = asks manager | ||
306 | |||
307 | instance MonadLogger (DHT raw dht u ip) where | ||
308 | monadLoggerLog loc src lvl msg = do | ||
309 | logger <- asks loggerFun | ||
310 | liftIO $ logger loc src lvl (toLogStr msg) | ||
311 | |||
312 | #ifdef VERSION_bencoding | ||
313 | type NodeHandler = Handler IO KMessageOf BValue | ||
314 | #else | ||
315 | type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString | ||
316 | #endif | ||
317 | |||
318 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () | ||
319 | logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt) | ||
320 | where | ||
321 | lvl 'D' = LevelDebug | ||
322 | lvl 'I' = LevelInfo | ||
323 | lvl 'W' = LevelWarn | ||
324 | lvl 'E' = LevelError | ||
325 | lvl ch = LevelOther $ Text.cons ch Text.empty | ||
326 | |||
327 | mkLoggerLoc :: GHC.SrcLoc -> Loc | ||
328 | mkLoggerLoc loc = | ||
329 | Loc { loc_filename = GHC.srcLocFile loc | ||
330 | , loc_package = GHC.srcLocPackage loc | ||
331 | , loc_module = GHC.srcLocModule loc | ||
332 | , loc_start = ( GHC.srcLocStartLine loc | ||
333 | , GHC.srcLocStartCol loc) | ||
334 | , loc_end = ( GHC.srcLocEndLine loc | ||
335 | , GHC.srcLocEndCol loc) | ||
336 | } | ||
337 | |||
338 | locFromCS :: GHC.CallStack -> Loc | ||
339 | locFromCS cs = case getCallStack cs of | ||
340 | ((_, loc):_) -> mkLoggerLoc loc | ||
341 | _ -> Loc "<unknown>" "<unknown>" "<unknown>" (0,0) (0,0) | ||
342 | |||
343 | |||
344 | -- | Run DHT session. You /must/ properly close session using | ||
345 | -- 'closeNode' function, otherwise socket or other scarce resources may | ||
346 | -- leak. | ||
347 | newNode :: forall raw dht ip u. | ||
348 | ( Address ip | ||
349 | , FiniteBits (NodeId dht) | ||
350 | , Serialize (NodeId dht) | ||
351 | , Kademlia dht | ||
352 | , WireFormat raw dht | ||
353 | ) | ||
354 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; | ||
355 | Options -- ^ various dht options; | ||
356 | -> NodeAddr ip -- ^ node address to bind; | ||
357 | -> LogFun -- ^ invoked on log messages; | ||
358 | -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. | ||
359 | -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. | ||
360 | newNode opts naddr logger mbid = do | ||
361 | s <- createInternalState | ||
362 | runInternalState initNode s | ||
363 | `onException` closeInternalState s | ||
364 | where | ||
365 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | ||
366 | nodeAddr = toSockAddr naddr | ||
367 | initNode = do | ||
368 | s <- getInternalState | ||
369 | (myId, infovar, getst) <- liftIO $ do | ||
370 | (i, ctx) <- initializeServerState (Proxy :: Proxy (dht raw)) mbid | ||
371 | var <- atomically (newTVar Nothing) | ||
372 | let getst dest = do | ||
373 | info <- atomically . readTVar $ var | ||
374 | return ( maybe i myNodeId info, ctx) | ||
375 | return (i, var, getst) | ||
376 | (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr getst []) closeManager | ||
377 | liftIO $ do | ||
378 | dta <- initializeDHTData | ||
379 | node <- Node opts myId s m infovar | ||
380 | <$> newTVarIO S.empty | ||
381 | <*> pure dta | ||
382 | <*> pure logger | ||
383 | return node | ||
384 | |||
385 | -- | Some resources like listener thread may live for | ||
386 | -- some short period of time right after this DHT session closed. | ||
387 | closeNode :: Node raw dht u ip -> IO () | ||
388 | closeNode Node {..} = closeInternalState resources | ||
389 | |||
390 | -- | Run DHT operation on the given session. | ||
391 | runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a | ||
392 | runDHT node action = runReaderT (unDHT action) node | ||
393 | {-# INLINE runDHT #-} | ||
394 | |||
395 | {----------------------------------------------------------------------- | ||
396 | -- Routing | ||
397 | -----------------------------------------------------------------------} | ||
398 | |||
399 | -- /pick a random ID/ in the range of the bucket and perform a | ||
400 | -- find_nodes search on it. | ||
401 | |||
402 | |||
403 | {----------------------------------------------------------------------- | ||
404 | -- Routing table | ||
405 | -----------------------------------------------------------------------} | ||
406 | |||
407 | -- | This nodes externally routable address reported by remote peers. | ||
408 | routableAddress :: DHT raw dht u ip (Maybe SockAddr) | ||
409 | routableAddress = do | ||
410 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
411 | return $ myAddress <$> info | ||
412 | |||
413 | -- | The current NodeId that the given remote node should know us by. | ||
414 | myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) | ||
415 | myNodeIdAccordingTo _ = do | ||
416 | info <- asks routingInfo >>= liftIO . atomically . readTVar | ||
417 | maybe (asks tentativeNodeId) | ||
418 | (return . myNodeId) | ||
419 | info | ||
420 | |||
421 | myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) | ||
422 | myNodeIdAccordingTo1 = do | ||
423 | var <- asks routingInfo | ||
424 | tid <- asks tentativeNodeId | ||
425 | return $ \ _ -> do | ||
426 | info <- atomically $ readTVar var | ||
427 | return $ maybe tid myNodeId info | ||
428 | |||
429 | -- | Get current routing table. Normally you don't need to use this | ||
430 | -- function, but it can be usefull for debugging and profiling purposes. | ||
431 | getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) | ||
432 | getTable = do | ||
433 | Node { tentativeNodeId = myId | ||
434 | , routingInfo = var | ||
435 | , options = opts } <- ask | ||
436 | let nil = nullTable myId (optBucketCount opts) | ||
437 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | ||
438 | |||
439 | getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ] | ||
440 | getSwarms = do | ||
441 | store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar | ||
442 | return $ P.knownSwarms store | ||
443 | |||
444 | savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString | ||
445 | savePeerStore = do | ||
446 | var <- asks (contactInfo . dhtData) | ||
447 | peers <- liftIO $ atomically $ readTVar var | ||
448 | return $ S.encode peers | ||
449 | |||
450 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip () | ||
451 | mergeSavedPeers bs = do | ||
452 | var <- asks (contactInfo . dhtData) | ||
453 | case S.decode bs of | ||
454 | Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies) | ||
455 | Left _ -> return () | ||
456 | |||
457 | |||
458 | allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ] | ||
459 | allPeers ih = do | ||
460 | store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar | ||
461 | return $ P.lookup ih store | ||
462 | |||
463 | -- | Find a set of closest nodes from routing table of this node. (in | ||
464 | -- no particular order) | ||
465 | -- | ||
466 | -- This operation used for 'find_nodes' query. | ||
467 | -- | ||
468 | getClosest :: ( Eq ip | ||
469 | , Ord (NodeId dht) | ||
470 | , FiniteBits (NodeId dht) | ||
471 | , TableKey dht k ) => | ||
472 | k -> DHT raw dht u ip [NodeInfo dht ip u] | ||
473 | getClosest node = do | ||
474 | k <- asks (optK . options) | ||
475 | kclosest k node <$> getTable | ||
476 | |||
477 | getClosest1 :: ( Eq ip | ||
478 | , Ord (NodeId dht) | ||
479 | , FiniteBits (NodeId dht) | ||
480 | , TableKey dht k | ||
481 | ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) | ||
482 | getClosest1 = do | ||
483 | k <- asks (optK . options) | ||
484 | nobkts <- asks (optBucketCount . options) | ||
485 | myid <- asks tentativeNodeId | ||
486 | var <- asks routingInfo | ||
487 | return $ \node -> do nfo <- atomically $ readTVar var | ||
488 | let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo | ||
489 | return $ kclosest k node tbl | ||
490 | |||
491 | {----------------------------------------------------------------------- | ||
492 | -- Peer storage | ||
493 | -----------------------------------------------------------------------} | ||
494 | |||
495 | refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO () | ||
496 | refreshContacts var = | ||
497 | -- TODO limit dht peer store in size (probably by removing oldest peers) | ||
498 | return () | ||
499 | |||
500 | |||
501 | -- | Insert peer to peer store. Used to handle announce requests. | ||
502 | insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO () | ||
503 | insertPeer var ih name addr = do | ||
504 | refreshContacts var | ||
505 | atomically $ modifyTVar' var (P.insertPeer ih name addr) | ||
506 | |||
507 | -- | Get peer set for specific swarm. | ||
508 | lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip] | ||
509 | lookupPeers var ih = do | ||
510 | refreshContacts var | ||
511 | tm <- getTimestamp | ||
512 | atomically $ do | ||
513 | (ps,store') <- P.freshPeers ih tm <$> readTVar var | ||
514 | writeTVar var store' | ||
515 | return ps | ||
516 | |||
517 | getTimestamp :: IO Timestamp | ||
518 | getTimestamp = do | ||
519 | utcTime <- getCurrentTime | ||
520 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) | ||
521 | return $ utcTimeToPOSIXSeconds utcTime | ||
522 | |||
523 | #ifdef VERSION_bencoding | ||
524 | -- | Prepare result for 'get_peers' query. | ||
525 | -- | ||
526 | -- This operation use 'getClosest' as failback so it may block. | ||
527 | -- | ||
528 | getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) | ||
529 | getPeerList ih = do | ||
530 | var <- asks (contactInfo . dhtData) | ||
531 | ps <- liftIO $ lookupPeers var ih | ||
532 | if L.null ps | ||
533 | then Left <$> getClosest ih | ||
534 | else return (Right ps) | ||
535 | |||
536 | getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) | ||
537 | getPeerList1 = do | ||
538 | var <- asks (contactInfo . dhtData) | ||
539 | getclosest <- getClosest1 | ||
540 | return $ \ih -> do | ||
541 | ps <- lookupPeers var ih | ||
542 | if L.null ps | ||
543 | then Left <$> getclosest ih | ||
544 | else return (Right ps) | ||
545 | |||
546 | |||
547 | insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () | ||
548 | insertTopic ih p = do | ||
549 | var <- asks announceInfo | ||
550 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | ||
551 | |||
552 | deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () | ||
553 | deleteTopic ih p = do | ||
554 | var <- asks announceInfo | ||
555 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | ||
556 | |||
557 | #endif | ||
558 | |||
559 | {----------------------------------------------------------------------- | ||
560 | -- Messaging | ||
561 | -----------------------------------------------------------------------} | ||
562 | |||
563 | -- | Failed queries are ignored. | ||
564 | queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] | ||
565 | queryParallel queries = do | ||
566 | -- TODO: use alpha | ||
567 | -- alpha <- asks (optAlpha . options) | ||
568 | cleanup <$> mapConcurrently try queries | ||
569 | where | ||
570 | cleanup :: [Either QueryFailure a] -> [a] | ||
571 | cleanup = mapMaybe (either (const Nothing) Just) | ||