summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Query.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs750
1 files changed, 0 insertions, 750 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
deleted file mode 100644
index 003bb5b9..00000000
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ /dev/null
@@ -1,750 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module provides functions to interact with other nodes.
9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead.
11--
12{-# LANGUAGE CPP #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
17{-# LANGUAGE PartialTypeSignatures #-}
18{-# LANGUAGE GADTs #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE MultiParamTypeClasses #-}
21module Network.BitTorrent.DHT.Query
22 ( -- * Handler
23 -- | To bind specific set of handlers you need to pass
24 -- handler list to the 'startNode' function.
25 pingH
26 , findNodeH
27 , getPeersH
28 , announceH
29 , defaultHandlers
30
31 -- * Query
32 -- ** Basic
33 -- | A basic query perform a single request expecting a
34 -- single response.
35 , Iteration
36 , pingQ
37 , coldPingQ
38 , findNodeQ
39 , getPeersQ
40 , announceQ
41
42 -- ** Iterative
43 -- | An iterative query perform multiple basic queries,
44 -- concatenate its responses, optionally yielding result and
45 -- continue to the next iteration.
46 , Search
47 -- , search
48 , publish
49 , ioFindNode
50 , ioFindNodes
51 , ioGetPeers
52 , isearch
53 , bgsearch
54
55 -- ** Routing table
56 , insertNode
57 , refreshNodes
58
59 -- ** Messaging
60 , queryNode
61 , queryNode'
62 , (<@>)
63 ) where
64
65import Data.Bits
66import Data.Default
67#ifdef THREAD_DEBUG
68import Control.Concurrent.Lifted.Instrument hiding (yield)
69#else
70import GHC.Conc (labelThread)
71import Control.Concurrent.Lifted hiding (yield)
72#endif
73import Control.Exception.Lifted hiding (Handler)
74import Control.Monad.Reader
75import Control.Monad.Logger
76import Data.Maybe
77import Data.Conduit
78import Data.Conduit.List as C hiding (mapMaybe, mapM_)
79import Data.Either
80import Data.List as L
81import Data.Monoid
82import Data.Text as T
83import qualified Data.Set as Set
84 ;import Data.Set (Set)
85import Network
86import Text.PrettyPrint as PP hiding ((<>), ($$))
87import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
88import Data.Time
89import Data.Time.Clock.POSIX
90import Data.Hashable (Hashable)
91import Data.Serialize
92import Data.Hashable
93
94import Network.DatagramServer as KRPC hiding (Options, def)
95import Network.KRPC.Method as KRPC
96import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..))
97import Network.DatagramServer (QueryFailure(..))
98import Data.Torrent
99import qualified Network.DHT as DHT
100import Network.DHT.Mainline
101import Network.DHT.Routing as R
102import Network.BitTorrent.DHT.Session
103import Control.Concurrent.STM
104import qualified Network.BitTorrent.DHT.Search as Search
105#ifdef VERSION_bencoding
106import Data.BEncode (BValue)
107import Network.DatagramServer.Mainline (KMessageOf)
108#else
109import Data.ByteString (ByteString)
110import Network.DatagramServer.Tox
111#endif
112import Network.Address hiding (NodeId)
113import Network.DatagramServer.Types as RPC hiding (Query,Response)
114import Network.DHT.Types
115import Control.Monad.Trans.Control
116import Data.Typeable
117import Data.Serialize
118import System.IO.Unsafe (unsafeInterleaveIO)
119import Data.String
120
121
122{-----------------------------------------------------------------------
123-- Handlers
124-----------------------------------------------------------------------}
125
126{-
127nodeHandler :: ( Address ip
128 , KRPC dht (Query KMessageOf a) (Response KMessageOf b)
129 )
130 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
131-}
132nodeHandler :: forall raw dht addr u t q r.
133 (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u),
134 Default u,
135 IsString t, Functor dht,
136 KRPC dht (Query dht q) (Response dht r),
137 SerializableTo raw (Response dht r),
138 SerializableTo raw (Query dht q),
139 Show (QueryMethod dht)) =>
140 (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ())
141 -> (NodeAddr addr -> IO (NodeId dht))
142 -> (Char -> t -> Text -> IO ())
143 -> DHTData dht addr
144 -> QueryMethod dht
145 -> (NodeAddr addr -> q -> IO r)
146 -> Handler IO dht raw
147nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do
148 let remoteId = messageSender (msg :: dht (Query dht q)) resptype
149 qextra = queryExtra qry
150 resptype = Proxy :: Proxy (Response dht r)
151 q = queryParams qry
152 qry = envelopePayload msg :: Query dht q
153 case fromSockAddr sockAddr of
154 Nothing -> throwIO BadAddress
155 Just naddr -> do
156 logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method)
157 me <- myNodeIdAccordingTo naddr
158 rextra <- liftIO $ makeResponseExtra dta me qry resptype
159 let ni = NodeInfo remoteId naddr def
160 -- Do not route read-only nodes. (bep 43)
161 if fromRoutableNode qextra
162 then insertNode ni Nothing >> return () -- TODO need to block. why?
163 else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
164 Response
165 <$> pure rextra
166 <*> action naddr q
167
168-- | Default 'Ping' handler.
169pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht)
170pingH dht _ _ = return (DHT.pongMessage dht)
171-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
172
173-- | Default 'FindNode' handler.
174findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip)
175findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg)
176
177-- | Default 'GetPeers' handler.
178getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
179getPeersH getPeerList toks naddr (GetPeers ih) = do
180 ps <- getPeerList ih
181 tok <- grantToken toks naddr
182 return $ GotPeers ps tok
183
184-- | Default 'Announce' handler.
185announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced
186announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
187 valid <- checkToken toks naddr sessionToken
188 unless valid $ do
189 throwIO $ InvalidParameter "token"
190
191 let annPort = if impliedPort then nodePort else port
192 peerAddr = PeerAddr Nothing nodeHost annPort
193 insertPeer peers topic announcedName peerAddr
194 return Announced
195
196-- | Includes all Kademlia-related handlers.
197kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip
198 , Ord (TransactionID dht)
199 , Ord (NodeId dht)
200 , Show u
201 , SerializableTo raw (Response dht (Ping dht))
202 , SerializableTo raw (Query dht (Ping dht))
203 , Show (QueryMethod dht)
204 , Show (NodeId dht)
205 , FiniteBits (NodeId dht)
206 , Default u
207 , Serialize (TransactionID dht)
208 , WireFormat raw dht
209 , Kademlia dht
210 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
211 , Functor dht
212 , Pretty (NodeInfo dht ip u)
213 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
214 , SerializableTo raw (Response dht (NodeFound dht ip))
215 , SerializableTo raw (Query dht (FindNode dht ip))
216 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
217-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
218kademliaHandlers logger = do
219 groknode <- insertNode1
220 mynid <- myNodeIdAccordingTo1
221 dta <- asks dhtData
222 let handler :: ( KRPC dht (Query dht a) (Response dht b)
223 , SerializableTo raw (Response dht b)
224 , SerializableTo raw (Query dht a)
225 ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw
226 handler = nodeHandler groknode mynid (logt logger) dta
227 dht = Proxy :: Proxy dht
228 getclosest <- getClosest1
229 return [ handler (namePing dht) $ pingH dht
230 , handler (nameFindNodes dht) $ findNodeH getclosest
231 ]
232
233instance DataHandlers BValue KMessageOf where
234 dataHandlers = bthandlers
235
236bthandlers ::
237 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
238 (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()])
239 -> DHTData KMessageOf ip
240 -> [MethodHandler BValue KMessageOf ip]
241bthandlers getclosest dta =
242 [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta)
243 , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta)
244 ]
245 where
246 getpeers dta ih = do
247 ps <- lookupPeers (contactInfo dta) ih
248 if L.null ps
249 then Left <$> getclosest (toNodeId ih)
250 else return (Right ps)
251
252
253-- | Includes all default query handlers.
254defaultHandlers :: forall raw dht u ip.
255 ( Ord (TransactionID dht)
256 , Ord (NodeId dht)
257 , Show u
258 , SerializableTo raw (Response dht (Ping dht))
259 , SerializableTo raw (Query dht (Ping dht))
260 , Show (QueryMethod dht)
261 , Show (NodeId dht)
262 , FiniteBits (NodeId dht)
263 , Default u
264 , Serialize (TransactionID dht)
265 , WireFormat raw dht
266 , Kademlia dht
267 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
268 , Functor dht
269 , Pretty (NodeInfo dht ip u)
270 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
271 , SerializableTo raw (Response dht (NodeFound dht ip))
272 , SerializableTo raw (Query dht (FindNode dht ip))
273 , Eq ip, Ord ip, Address ip, DataHandlers raw dht
274 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
275defaultHandlers logger = do
276 groknode <- insertNode1
277 mynid <- myNodeIdAccordingTo1
278 dta <- asks dhtData
279 let handler :: MethodHandler raw dht ip -> Handler IO dht raw
280 handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action
281 getclosest <- getClosest1
282 hs <- kademliaHandlers logger
283 return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta)
284
285{-----------------------------------------------------------------------
286-- Basic queries
287-----------------------------------------------------------------------}
288
289type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip])
290
291-- | The most basic query. May be used to check if the given node is
292-- alive or get its 'NodeId'.
293pingQ :: forall raw dht u ip.
294 ( DHT.Kademlia dht
295 , Address ip
296 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
297 , Default u
298 , Show u
299 , Ord (TransactionID dht)
300 , Serialize (TransactionID dht)
301 , WireFormat raw dht
302 , SerializableTo raw (Response dht (Ping dht))
303 , SerializableTo raw (Query dht (Ping dht))
304 , Ord (NodeId dht)
305 , FiniteBits (NodeId dht)
306 , Show (NodeId dht)
307 , Show (QueryMethod dht)
308 ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
309pingQ ni = do
310 let ping = DHT.pingMessage (Proxy :: Proxy dht)
311 (nid, pong, mip) <- queryNode' ni ping
312 let _ = pong `asTypeOf` ping
313 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
314 return (NodeInfo nid (nodeAddr ni) def, mip)
315
316-- | The most basic query. May be used to check if the given node is
317-- alive or get its 'NodeId'.
318coldPingQ :: forall raw dht u ip.
319 ( DHT.Kademlia dht
320 , Address ip
321 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
322 , Default u
323 , Show u
324 , Ord (TransactionID dht)
325 , Serialize (TransactionID dht)
326 , WireFormat raw dht
327 , SerializableTo raw (Response dht (Ping dht))
328 , SerializableTo raw (Query dht (Ping dht))
329 , Ord (NodeId dht)
330 , FiniteBits (NodeId dht)
331 , Show (NodeId dht)
332 , Show (QueryMethod dht)
333 ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
334coldPingQ dest = do
335 let ping = DHT.pingMessage (Proxy :: Proxy dht)
336 naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination")
337 return
338 $ fromAddr dest
339 (nid, pong, mip) <- coldQueryNode' naddr dest ping
340 let _ = pong `asTypeOf` ping
341 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
342 return (NodeInfo nid naddr def, mip)
343
344-- TODO [robustness] match range of returned node ids with the
345-- expected range and either filter bad nodes or discard response at
346-- all throwing an exception
347-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
348findNodeQ proxy key ni = do
349 closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni
350 $(logInfoS) "findNodeQ" $ "NodeFound\n"
351 <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest)
352 return $ Right closest
353
354#ifdef VERSION_bencoding
355getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr
356getPeersQ topic ni = do
357 GotPeers {..} <- GetPeers topic <@> ni
358 let dist = distance (toNodeId topic) (nodeId ni)
359 $(logInfoS) "getPeersQ" $ T.pack
360 $ "distance: " <> render (pPrint dist) <> " , result: "
361 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" }
362 return peers
363
364announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr
365announceQ ih p ni = do
366 GotPeers {..} <- GetPeers ih <@> ni
367 case peers of
368 Left ns
369 | False -> undefined -- TODO check if we can announce
370 | otherwise -> return (Left ns)
371 Right _ -> do -- TODO *probably* add to peer cache
372 Announced <- Announce False ih Nothing p grantedToken <@> ni
373 return (Right [nodeAddr ni])
374#endif
375
376{-----------------------------------------------------------------------
377-- Iterative queries
378-----------------------------------------------------------------------}
379
380
381ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
382ioGetPeers ih = do
383 session <- ask
384 return $ \ni -> runDHT session $ do
385 r <- try $ getPeersQ ih ni
386 case r of
387 Right e -> return $ either (,[]) ([],) e
388 Left e -> let _ = e :: QueryFailure in return ([],[])
389
390ioFindNode :: ( DHT.Kademlia dht
391 , WireFormat raw dht
392 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
393 , Address ip
394 , Default u
395 , Show u
396 , Show (QueryMethod dht)
397 , TableKey dht infohash
398 , Eq (NodeId dht)
399 , Ord (NodeId dht)
400 , FiniteBits (NodeId dht)
401 , Show (NodeId dht)
402 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
403 , Ord (TransactionID dht)
404 , Serialize (TransactionID dht)
405 , SerializableTo raw (Response dht (NodeFound dht ip))
406 , SerializableTo raw (Query dht (FindNode dht ip))
407 , SerializableTo raw (Response dht (Ping dht))
408 , SerializableTo raw (Query dht (Ping dht))
409 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
410ioFindNode ih = do
411 session <- ask
412 return $ \ni -> runDHT session $ do
413 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
414 let ns' = L.map (fmap (const def)) ns
415 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns'
416
417
418-- | Like ioFindNode, but considers all found nodes to be 'Right' results.
419ioFindNodes :: ( DHT.Kademlia dht
420 , WireFormat raw dht
421 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
422 , Address ip
423 , Default u
424 , Show u
425 , Show (QueryMethod dht)
426 , TableKey dht infohash
427 , Eq (NodeId dht)
428 , Ord (NodeId dht)
429 , FiniteBits (NodeId dht)
430 , Show (NodeId dht)
431 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
432 , Ord (TransactionID dht)
433 , Serialize (TransactionID dht)
434 , SerializableTo raw (Response dht (NodeFound dht ip))
435 , SerializableTo raw (Query dht (FindNode dht ip))
436 , SerializableTo raw (Response dht (Ping dht))
437 , SerializableTo raw (Query dht (Ping dht))
438 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
439ioFindNodes ih = do
440 session <- ask
441 return $ \ni -> runDHT session $ do
442 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
443 let ns' = L.map (fmap (const def)) ns
444 return ([], ns')
445
446isearch :: ( Ord r
447 , Ord ip
448 , Ord (NodeId dht)
449 , FiniteBits (NodeId dht)
450 , TableKey dht ih
451 , Show ih) =>
452 (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])))
453 -> ih
454 -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r)
455isearch f ih = do
456 qry <- f ih
457 ns <- kclosest 8 ih <$> getTable
458 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
459 a <- fork $ do
460 tid <- myThreadId
461 labelThread tid ("search."++show ih)
462 Search.search s
463 -- atomically \$ readTVar (Search.searchResults s)
464 return (a, s)
465
466-- | Background search: fill a lazy list using a background thread.
467bgsearch f ih = do
468 (tid, s) <- isearch f ih
469 let again shown = do
470 (chk,fin) <- atomically $ do
471 r <- (Set.\\ shown) <$> readTVar (Search.searchResults s)
472 if not $ Set.null r
473 then (,) r <$> Search.searchIsFinished s
474 else Search.searchIsFinished s >>= check >> return (Set.empty,True)
475 let ps = Set.toList chk
476 if fin then return ps
477 else do
478 xs <- unsafeInterleaveIO $ again (shown `Set.union` chk)
479 return $ ps ++ xs
480 liftIO $ again Set.empty
481
482type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u]
483
484#if 0
485
486-- TODO: use reorder and filter (Traversal option) leftovers
487-- search :: k -> IterationI ip o -> Search ip o
488search _ action = do
489 awaitForever $ \ batch -> unless (L.null batch) $ do
490 $(logWarnS) "search" "start query"
491 responses <- lift $ queryParallel (action <$> batch)
492 let (nodes, results) = partitionEithers responses
493 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
494 leftover $ L.concat nodes
495 let r = mapM_ yield results
496 _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ())
497 r
498
499#endif
500
501publish = error "todo"
502-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip ()
503-- publish ih p = do
504 -- nodes <- getClosest ih
505 -- r <- asks (optReplication . options)
506 -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
507 -- return ()
508
509
510probeNode :: ( Default u
511 , Show u
512 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
513 , DHT.Kademlia dht
514 , Address ip
515 , Ord (TransactionID dht)
516 , Serialize (TransactionID dht)
517 , WireFormat raw dht
518 , SerializableTo raw (Response dht (Ping dht))
519 , SerializableTo raw (Query dht (Ping dht))
520 , Ord (NodeId dht)
521 , FiniteBits (NodeId dht)
522 , Show (NodeId dht)
523 , Show (QueryMethod dht)
524 ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP)
525probeNode addr = do
526 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr)))
527 result <- try $ pingQ addr
528 let _ = fmap (const ()) result :: Either QueryFailure ()
529 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
530
531
532refreshNodes :: forall raw dht u ip.
533 ( Address ip
534 , Ord (NodeId dht)
535 , Default u
536 , FiniteBits (NodeId dht)
537 , Pretty (NodeId dht)
538 , DHT.Kademlia dht
539 , Ord ip
540 , Ord (TransactionID dht)
541 , SerializableTo raw (Response dht (NodeFound dht ip))
542 , SerializableTo raw (Query dht (FindNode dht ip))
543 , SerializableTo raw (Response dht (Ping dht))
544 , SerializableTo raw (Query dht (Ping dht))
545 , Pretty (NodeInfo dht ip u)
546 , Show (NodeId dht)
547 , Show u
548 , Show (QueryMethod dht)
549 , Serialize (TransactionID dht)
550 , WireFormat raw dht
551 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
552 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
553 ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()]
554-- FIXME do not use getClosest sinse we should /refresh/ them
555refreshNodes nid = do
556 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
557 nodes <- getClosest nid
558 do
559 -- forM (L.take 1 nodes) \$ \ addr -> do
560 -- NodeFound ns <- FindNode nid <@> addr
561 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
562 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
563 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume
564 -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume
565 ns <- bgsearch ioFindNodes nid
566 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes."
567 _ <- queryParallel $ flip L.map ns $ \n -> do
568 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
569 pingQ n
570 -- pingQ takes care of inserting the node.
571 return ()
572 return () -- \$ L.concat nss
573
574logc :: Char -> String -> DHT raw dht u ip ()
575logc 'D' = $(logDebugS) "insertNode" . T.pack
576logc 'W' = $(logWarnS) "insertNode" . T.pack
577logc 'I' = $(logInfoS) "insertNode" . T.pack
578logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
579
580-- | This operation do not block but acquire exclusive access to
581-- routing table.
582insertNode :: forall raw dht u ip.
583 ( Address ip
584 , Ord (NodeId dht)
585 , FiniteBits (NodeId dht)
586 , Show (NodeId dht)
587 , Default u
588 , Show u
589 , DHT.Kademlia dht
590 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
591 , Ord (TransactionID dht)
592 , WireFormat raw dht
593 , Serialize (TransactionID dht)
594 , SerializableTo raw (Response dht (Ping dht))
595 , SerializableTo raw (Query dht (Ping dht))
596 , Ord (NodeId dht)
597 , Show (NodeId dht)
598 , Show (QueryMethod dht)
599 ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip ()
600insertNode info witnessed_ip0 = do
601 f <- insertNode1
602 liftIO $ f info witnessed_ip0
603
604insertNode1 :: forall raw dht u ip.
605 ( Address ip
606 , Default u
607 , Show u
608 , Ord (NodeId dht)
609 , FiniteBits (NodeId dht)
610 , Show (NodeId dht)
611 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
612 , DHT.Kademlia dht
613 , Ord (TransactionID dht)
614 , WireFormat raw dht
615 , Serialize (TransactionID dht)
616 , SerializableTo raw (Response dht (Ping dht))
617 , SerializableTo raw (Query dht (Ping dht))
618 , Ord (NodeId dht)
619 , Show (NodeId dht)
620 , Show (QueryMethod dht)
621 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
622insertNode1 = do
623 bc <- optBucketCount <$> asks options
624 nid <- asks tentativeNodeId
625 logm0 <- embed_ (uncurry logc)
626 let logm c = logm0 . (c,)
627 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
628 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
629 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
630 {-
631 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
632 ip <- fromSockAddr ip0 :: Maybe ip
633 listToMaybe
634 $ rank id (nodeId $ foreignNode arrival)
635 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
636 -}
637 params = DHT.TableParameters
638 { maxBuckets = bc :: Int
639 , fallbackID = nid :: NodeId dht
640 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
641 , logMessage = logm :: Char -> String -> IO ()
642 , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
643 }
644 tbl <- asks routingInfo
645 let state = DHT.TableKeeper
646 { routingInfo = tbl
647 , grokNode = DHT.insertNode params state
648 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
649 }
650 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
651
652-- | Throws exception if node is not responding.
653queryNode :: forall raw dht u a b ip.
654 ( Address ip
655 , KRPC dht (Query dht a) (Response dht b)
656 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
657 , Default u
658 , Show u
659 , DHT.Kademlia dht
660 , Ord (TransactionID dht)
661 , Serialize (TransactionID dht)
662 , WireFormat raw dht
663 , SerializableTo raw (Response dht b)
664 , SerializableTo raw (Query dht a)
665 , Ord (NodeId dht)
666 , FiniteBits (NodeId dht)
667 , Show (NodeId dht)
668 , Show (QueryMethod dht)
669 , SerializableTo raw (Response dht (Ping dht))
670 , SerializableTo raw (Query dht (Ping dht))
671 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b)
672queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
673
674queryNode' :: forall raw dht u a b ip.
675 ( Address ip
676 , Default u
677 , Show u
678 , DHT.Kademlia dht
679 , KRPC dht (Query dht a) (Response dht b)
680 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
681 , Ord (TransactionID dht)
682 , Serialize (TransactionID dht)
683 , WireFormat raw dht
684 , SerializableTo raw (Response dht b)
685 , SerializableTo raw (Query dht a)
686 , Ord (NodeId dht)
687 , FiniteBits (NodeId dht)
688 , Show (NodeId dht)
689 , Show (QueryMethod dht)
690 , SerializableTo raw (Response dht (Ping dht))
691 , SerializableTo raw (Query dht (Ping dht))
692 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
693queryNode' ni q = do
694 let addr = nodeAddr ni
695 dest = makeAddress (Left $ nodeId ni) (toSockAddr addr)
696 coldQueryNode' addr dest q
697
698coldQueryNode' :: forall raw dht u a b ip.
699 ( Address ip
700 , Default u
701 , Show u
702 , DHT.Kademlia dht
703 , KRPC dht (Query dht a) (Response dht b)
704 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
705 , Ord (TransactionID dht)
706 , Serialize (TransactionID dht)
707 , WireFormat raw dht
708 , SerializableTo raw (Response dht b)
709 , SerializableTo raw (Query dht a)
710 , Ord (NodeId dht)
711 , FiniteBits (NodeId dht)
712 , Show (NodeId dht)
713 , Show (QueryMethod dht)
714 , SerializableTo raw (Response dht (Ping dht))
715 , SerializableTo raw (Query dht (Ping dht))
716 ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
717coldQueryNode' addr dest q = do
718 nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest
719 dta <- asks dhtData
720 qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b))
721 let read_only = False -- TODO: check for NAT issues. (BEP 43)
722 -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b)
723 mgr <- asks manager
724 (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q)
725 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
726 -- <> " by " <> T.pack (show (toSockAddr addr))
727 _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip
728 return (remoteId, r, witnessed_ip)
729
730-- | Infix version of 'queryNode' function.
731(<@>) :: ( Address ip
732 , KRPC dht (Query dht a) (Response dht b)
733 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
734 , Default u
735 , Show u
736 , Show (QueryMethod dht)
737 , Ord (NodeId dht)
738 , FiniteBits (NodeId dht)
739 , Show (NodeId dht)
740 , Ord (TransactionID dht)
741 , Serialize (TransactionID dht)
742 , SerializableTo raw (Response dht b)
743 , SerializableTo raw (Query dht a)
744 , SerializableTo raw (Response dht (Ping dht))
745 , SerializableTo raw (Query dht (Ping dht))
746 , WireFormat raw dht
747 , Kademlia dht
748 ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b
749q <@> addr = snd <$> queryNode addr q
750{-# INLINE (<@>) #-}