diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT/Query.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 750 |
1 files changed, 0 insertions, 750 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs deleted file mode 100644 index 003bb5b9..00000000 --- a/src/Network/BitTorrent/DHT/Query.hs +++ /dev/null | |||
@@ -1,750 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides functions to interact with other nodes. | ||
9 | -- Normally, you don't need to import this module, use | ||
10 | -- "Network.BitTorrent.DHT" instead. | ||
11 | -- | ||
12 | {-# LANGUAGE CPP #-} | ||
13 | {-# LANGUAGE FlexibleContexts #-} | ||
14 | {-# LANGUAGE ScopedTypeVariables #-} | ||
15 | {-# LANGUAGE TemplateHaskell #-} | ||
16 | {-# LANGUAGE TupleSections #-} | ||
17 | {-# LANGUAGE PartialTypeSignatures #-} | ||
18 | {-# LANGUAGE GADTs #-} | ||
19 | {-# LANGUAGE RankNTypes #-} | ||
20 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
21 | module Network.BitTorrent.DHT.Query | ||
22 | ( -- * Handler | ||
23 | -- | To bind specific set of handlers you need to pass | ||
24 | -- handler list to the 'startNode' function. | ||
25 | pingH | ||
26 | , findNodeH | ||
27 | , getPeersH | ||
28 | , announceH | ||
29 | , defaultHandlers | ||
30 | |||
31 | -- * Query | ||
32 | -- ** Basic | ||
33 | -- | A basic query perform a single request expecting a | ||
34 | -- single response. | ||
35 | , Iteration | ||
36 | , pingQ | ||
37 | , coldPingQ | ||
38 | , findNodeQ | ||
39 | , getPeersQ | ||
40 | , announceQ | ||
41 | |||
42 | -- ** Iterative | ||
43 | -- | An iterative query perform multiple basic queries, | ||
44 | -- concatenate its responses, optionally yielding result and | ||
45 | -- continue to the next iteration. | ||
46 | , Search | ||
47 | -- , search | ||
48 | , publish | ||
49 | , ioFindNode | ||
50 | , ioFindNodes | ||
51 | , ioGetPeers | ||
52 | , isearch | ||
53 | , bgsearch | ||
54 | |||
55 | -- ** Routing table | ||
56 | , insertNode | ||
57 | , refreshNodes | ||
58 | |||
59 | -- ** Messaging | ||
60 | , queryNode | ||
61 | , queryNode' | ||
62 | , (<@>) | ||
63 | ) where | ||
64 | |||
65 | import Data.Bits | ||
66 | import Data.Default | ||
67 | #ifdef THREAD_DEBUG | ||
68 | import Control.Concurrent.Lifted.Instrument hiding (yield) | ||
69 | #else | ||
70 | import GHC.Conc (labelThread) | ||
71 | import Control.Concurrent.Lifted hiding (yield) | ||
72 | #endif | ||
73 | import Control.Exception.Lifted hiding (Handler) | ||
74 | import Control.Monad.Reader | ||
75 | import Control.Monad.Logger | ||
76 | import Data.Maybe | ||
77 | import Data.Conduit | ||
78 | import Data.Conduit.List as C hiding (mapMaybe, mapM_) | ||
79 | import Data.Either | ||
80 | import Data.List as L | ||
81 | import Data.Monoid | ||
82 | import Data.Text as T | ||
83 | import qualified Data.Set as Set | ||
84 | ;import Data.Set (Set) | ||
85 | import Network | ||
86 | import Text.PrettyPrint as PP hiding ((<>), ($$)) | ||
87 | import Text.PrettyPrint.HughesPJClass hiding ((<>),($$)) | ||
88 | import Data.Time | ||
89 | import Data.Time.Clock.POSIX | ||
90 | import Data.Hashable (Hashable) | ||
91 | import Data.Serialize | ||
92 | import Data.Hashable | ||
93 | |||
94 | import Network.DatagramServer as KRPC hiding (Options, def) | ||
95 | import Network.KRPC.Method as KRPC | ||
96 | import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..)) | ||
97 | import Network.DatagramServer (QueryFailure(..)) | ||
98 | import Data.Torrent | ||
99 | import qualified Network.DHT as DHT | ||
100 | import Network.DHT.Mainline | ||
101 | import Network.DHT.Routing as R | ||
102 | import Network.BitTorrent.DHT.Session | ||
103 | import Control.Concurrent.STM | ||
104 | import qualified Network.BitTorrent.DHT.Search as Search | ||
105 | #ifdef VERSION_bencoding | ||
106 | import Data.BEncode (BValue) | ||
107 | import Network.DatagramServer.Mainline (KMessageOf) | ||
108 | #else | ||
109 | import Data.ByteString (ByteString) | ||
110 | import Network.DatagramServer.Tox | ||
111 | #endif | ||
112 | import Network.Address hiding (NodeId) | ||
113 | import Network.DatagramServer.Types as RPC hiding (Query,Response) | ||
114 | import Network.DHT.Types | ||
115 | import Control.Monad.Trans.Control | ||
116 | import Data.Typeable | ||
117 | import Data.Serialize | ||
118 | import System.IO.Unsafe (unsafeInterleaveIO) | ||
119 | import Data.String | ||
120 | |||
121 | |||
122 | {----------------------------------------------------------------------- | ||
123 | -- Handlers | ||
124 | -----------------------------------------------------------------------} | ||
125 | |||
126 | {- | ||
127 | nodeHandler :: ( Address ip | ||
128 | , KRPC dht (Query KMessageOf a) (Response KMessageOf b) | ||
129 | ) | ||
130 | => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler | ||
131 | -} | ||
132 | nodeHandler :: forall raw dht addr u t q r. | ||
133 | (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u), | ||
134 | Default u, | ||
135 | IsString t, Functor dht, | ||
136 | KRPC dht (Query dht q) (Response dht r), | ||
137 | SerializableTo raw (Response dht r), | ||
138 | SerializableTo raw (Query dht q), | ||
139 | Show (QueryMethod dht)) => | ||
140 | (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ()) | ||
141 | -> (NodeAddr addr -> IO (NodeId dht)) | ||
142 | -> (Char -> t -> Text -> IO ()) | ||
143 | -> DHTData dht addr | ||
144 | -> QueryMethod dht | ||
145 | -> (NodeAddr addr -> q -> IO r) | ||
146 | -> Handler IO dht raw | ||
147 | nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do | ||
148 | let remoteId = messageSender (msg :: dht (Query dht q)) resptype | ||
149 | qextra = queryExtra qry | ||
150 | resptype = Proxy :: Proxy (Response dht r) | ||
151 | q = queryParams qry | ||
152 | qry = envelopePayload msg :: Query dht q | ||
153 | case fromSockAddr sockAddr of | ||
154 | Nothing -> throwIO BadAddress | ||
155 | Just naddr -> do | ||
156 | logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method) | ||
157 | me <- myNodeIdAccordingTo naddr | ||
158 | rextra <- liftIO $ makeResponseExtra dta me qry resptype | ||
159 | let ni = NodeInfo remoteId naddr def | ||
160 | -- Do not route read-only nodes. (bep 43) | ||
161 | if fromRoutableNode qextra | ||
162 | then insertNode ni Nothing >> return () -- TODO need to block. why? | ||
163 | else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni) | ||
164 | Response | ||
165 | <$> pure rextra | ||
166 | <*> action naddr q | ||
167 | |||
168 | -- | Default 'Ping' handler. | ||
169 | pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht) | ||
170 | pingH dht _ _ = return (DHT.pongMessage dht) | ||
171 | -- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True } | ||
172 | |||
173 | -- | Default 'FindNode' handler. | ||
174 | findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip) | ||
175 | findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg) | ||
176 | |||
177 | -- | Default 'GetPeers' handler. | ||
178 | getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip) | ||
179 | getPeersH getPeerList toks naddr (GetPeers ih) = do | ||
180 | ps <- getPeerList ih | ||
181 | tok <- grantToken toks naddr | ||
182 | return $ GotPeers ps tok | ||
183 | |||
184 | -- | Default 'Announce' handler. | ||
185 | announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced | ||
186 | announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do | ||
187 | valid <- checkToken toks naddr sessionToken | ||
188 | unless valid $ do | ||
189 | throwIO $ InvalidParameter "token" | ||
190 | |||
191 | let annPort = if impliedPort then nodePort else port | ||
192 | peerAddr = PeerAddr Nothing nodeHost annPort | ||
193 | insertPeer peers topic announcedName peerAddr | ||
194 | return Announced | ||
195 | |||
196 | -- | Includes all Kademlia-related handlers. | ||
197 | kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip | ||
198 | , Ord (TransactionID dht) | ||
199 | , Ord (NodeId dht) | ||
200 | , Show u | ||
201 | , SerializableTo raw (Response dht (Ping dht)) | ||
202 | , SerializableTo raw (Query dht (Ping dht)) | ||
203 | , Show (QueryMethod dht) | ||
204 | , Show (NodeId dht) | ||
205 | , FiniteBits (NodeId dht) | ||
206 | , Default u | ||
207 | , Serialize (TransactionID dht) | ||
208 | , WireFormat raw dht | ||
209 | , Kademlia dht | ||
210 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
211 | , Functor dht | ||
212 | , Pretty (NodeInfo dht ip u) | ||
213 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
214 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
215 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
216 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
217 | -- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler] | ||
218 | kademliaHandlers logger = do | ||
219 | groknode <- insertNode1 | ||
220 | mynid <- myNodeIdAccordingTo1 | ||
221 | dta <- asks dhtData | ||
222 | let handler :: ( KRPC dht (Query dht a) (Response dht b) | ||
223 | , SerializableTo raw (Response dht b) | ||
224 | , SerializableTo raw (Query dht a) | ||
225 | ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw | ||
226 | handler = nodeHandler groknode mynid (logt logger) dta | ||
227 | dht = Proxy :: Proxy dht | ||
228 | getclosest <- getClosest1 | ||
229 | return [ handler (namePing dht) $ pingH dht | ||
230 | , handler (nameFindNodes dht) $ findNodeH getclosest | ||
231 | ] | ||
232 | |||
233 | instance DataHandlers BValue KMessageOf where | ||
234 | dataHandlers = bthandlers | ||
235 | |||
236 | bthandlers :: | ||
237 | ( Ord ip , Hashable ip, Typeable ip, Serialize ip) => | ||
238 | (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()]) | ||
239 | -> DHTData KMessageOf ip | ||
240 | -> [MethodHandler BValue KMessageOf ip] | ||
241 | bthandlers getclosest dta = | ||
242 | [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta) | ||
243 | , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta) | ||
244 | ] | ||
245 | where | ||
246 | getpeers dta ih = do | ||
247 | ps <- lookupPeers (contactInfo dta) ih | ||
248 | if L.null ps | ||
249 | then Left <$> getclosest (toNodeId ih) | ||
250 | else return (Right ps) | ||
251 | |||
252 | |||
253 | -- | Includes all default query handlers. | ||
254 | defaultHandlers :: forall raw dht u ip. | ||
255 | ( Ord (TransactionID dht) | ||
256 | , Ord (NodeId dht) | ||
257 | , Show u | ||
258 | , SerializableTo raw (Response dht (Ping dht)) | ||
259 | , SerializableTo raw (Query dht (Ping dht)) | ||
260 | , Show (QueryMethod dht) | ||
261 | , Show (NodeId dht) | ||
262 | , FiniteBits (NodeId dht) | ||
263 | , Default u | ||
264 | , Serialize (TransactionID dht) | ||
265 | , WireFormat raw dht | ||
266 | , Kademlia dht | ||
267 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
268 | , Functor dht | ||
269 | , Pretty (NodeInfo dht ip u) | ||
270 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
271 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
272 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
273 | , Eq ip, Ord ip, Address ip, DataHandlers raw dht | ||
274 | ) => LogFun -> DHT raw dht u ip [Handler IO dht raw] | ||
275 | defaultHandlers logger = do | ||
276 | groknode <- insertNode1 | ||
277 | mynid <- myNodeIdAccordingTo1 | ||
278 | dta <- asks dhtData | ||
279 | let handler :: MethodHandler raw dht ip -> Handler IO dht raw | ||
280 | handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action | ||
281 | getclosest <- getClosest1 | ||
282 | hs <- kademliaHandlers logger | ||
283 | return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta) | ||
284 | |||
285 | {----------------------------------------------------------------------- | ||
286 | -- Basic queries | ||
287 | -----------------------------------------------------------------------} | ||
288 | |||
289 | type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip]) | ||
290 | |||
291 | -- | The most basic query. May be used to check if the given node is | ||
292 | -- alive or get its 'NodeId'. | ||
293 | pingQ :: forall raw dht u ip. | ||
294 | ( DHT.Kademlia dht | ||
295 | , Address ip | ||
296 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
297 | , Default u | ||
298 | , Show u | ||
299 | , Ord (TransactionID dht) | ||
300 | , Serialize (TransactionID dht) | ||
301 | , WireFormat raw dht | ||
302 | , SerializableTo raw (Response dht (Ping dht)) | ||
303 | , SerializableTo raw (Query dht (Ping dht)) | ||
304 | , Ord (NodeId dht) | ||
305 | , FiniteBits (NodeId dht) | ||
306 | , Show (NodeId dht) | ||
307 | , Show (QueryMethod dht) | ||
308 | ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
309 | pingQ ni = do | ||
310 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
311 | (nid, pong, mip) <- queryNode' ni ping | ||
312 | let _ = pong `asTypeOf` ping | ||
313 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
314 | return (NodeInfo nid (nodeAddr ni) def, mip) | ||
315 | |||
316 | -- | The most basic query. May be used to check if the given node is | ||
317 | -- alive or get its 'NodeId'. | ||
318 | coldPingQ :: forall raw dht u ip. | ||
319 | ( DHT.Kademlia dht | ||
320 | , Address ip | ||
321 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
322 | , Default u | ||
323 | , Show u | ||
324 | , Ord (TransactionID dht) | ||
325 | , Serialize (TransactionID dht) | ||
326 | , WireFormat raw dht | ||
327 | , SerializableTo raw (Response dht (Ping dht)) | ||
328 | , SerializableTo raw (Query dht (Ping dht)) | ||
329 | , Ord (NodeId dht) | ||
330 | , FiniteBits (NodeId dht) | ||
331 | , Show (NodeId dht) | ||
332 | , Show (QueryMethod dht) | ||
333 | ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP) | ||
334 | coldPingQ dest = do | ||
335 | let ping = DHT.pingMessage (Proxy :: Proxy dht) | ||
336 | naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination") | ||
337 | return | ||
338 | $ fromAddr dest | ||
339 | (nid, pong, mip) <- coldQueryNode' naddr dest ping | ||
340 | let _ = pong `asTypeOf` ping | ||
341 | -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid} | ||
342 | return (NodeInfo nid naddr def, mip) | ||
343 | |||
344 | -- TODO [robustness] match range of returned node ids with the | ||
345 | -- expected range and either filter bad nodes or discard response at | ||
346 | -- all throwing an exception | ||
347 | -- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo | ||
348 | findNodeQ proxy key ni = do | ||
349 | closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni | ||
350 | $(logInfoS) "findNodeQ" $ "NodeFound\n" | ||
351 | <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest) | ||
352 | return $ Right closest | ||
353 | |||
354 | #ifdef VERSION_bencoding | ||
355 | getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr | ||
356 | getPeersQ topic ni = do | ||
357 | GotPeers {..} <- GetPeers topic <@> ni | ||
358 | let dist = distance (toNodeId topic) (nodeId ni) | ||
359 | $(logInfoS) "getPeersQ" $ T.pack | ||
360 | $ "distance: " <> render (pPrint dist) <> " , result: " | ||
361 | <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" } | ||
362 | return peers | ||
363 | |||
364 | announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr | ||
365 | announceQ ih p ni = do | ||
366 | GotPeers {..} <- GetPeers ih <@> ni | ||
367 | case peers of | ||
368 | Left ns | ||
369 | | False -> undefined -- TODO check if we can announce | ||
370 | | otherwise -> return (Left ns) | ||
371 | Right _ -> do -- TODO *probably* add to peer cache | ||
372 | Announced <- Announce False ih Nothing p grantedToken <@> ni | ||
373 | return (Right [nodeAddr ni]) | ||
374 | #endif | ||
375 | |||
376 | {----------------------------------------------------------------------- | ||
377 | -- Iterative queries | ||
378 | -----------------------------------------------------------------------} | ||
379 | |||
380 | |||
381 | ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip])) | ||
382 | ioGetPeers ih = do | ||
383 | session <- ask | ||
384 | return $ \ni -> runDHT session $ do | ||
385 | r <- try $ getPeersQ ih ni | ||
386 | case r of | ||
387 | Right e -> return $ either (,[]) ([],) e | ||
388 | Left e -> let _ = e :: QueryFailure in return ([],[]) | ||
389 | |||
390 | ioFindNode :: ( DHT.Kademlia dht | ||
391 | , WireFormat raw dht | ||
392 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
393 | , Address ip | ||
394 | , Default u | ||
395 | , Show u | ||
396 | , Show (QueryMethod dht) | ||
397 | , TableKey dht infohash | ||
398 | , Eq (NodeId dht) | ||
399 | , Ord (NodeId dht) | ||
400 | , FiniteBits (NodeId dht) | ||
401 | , Show (NodeId dht) | ||
402 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
403 | , Ord (TransactionID dht) | ||
404 | , Serialize (TransactionID dht) | ||
405 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
406 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
407 | , SerializableTo raw (Response dht (Ping dht)) | ||
408 | , SerializableTo raw (Query dht (Ping dht)) | ||
409 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
410 | ioFindNode ih = do | ||
411 | session <- ask | ||
412 | return $ \ni -> runDHT session $ do | ||
413 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
414 | let ns' = L.map (fmap (const def)) ns | ||
415 | return $ L.partition (\n -> nodeId n /= toNodeId ih) ns' | ||
416 | |||
417 | |||
418 | -- | Like ioFindNode, but considers all found nodes to be 'Right' results. | ||
419 | ioFindNodes :: ( DHT.Kademlia dht | ||
420 | , WireFormat raw dht | ||
421 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
422 | , Address ip | ||
423 | , Default u | ||
424 | , Show u | ||
425 | , Show (QueryMethod dht) | ||
426 | , TableKey dht infohash | ||
427 | , Eq (NodeId dht) | ||
428 | , Ord (NodeId dht) | ||
429 | , FiniteBits (NodeId dht) | ||
430 | , Show (NodeId dht) | ||
431 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
432 | , Ord (TransactionID dht) | ||
433 | , Serialize (TransactionID dht) | ||
434 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
435 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
436 | , SerializableTo raw (Response dht (Ping dht)) | ||
437 | , SerializableTo raw (Query dht (Ping dht)) | ||
438 | ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u])) | ||
439 | ioFindNodes ih = do | ||
440 | session <- ask | ||
441 | return $ \ni -> runDHT session $ do | ||
442 | ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni | ||
443 | let ns' = L.map (fmap (const def)) ns | ||
444 | return ([], ns') | ||
445 | |||
446 | isearch :: ( Ord r | ||
447 | , Ord ip | ||
448 | , Ord (NodeId dht) | ||
449 | , FiniteBits (NodeId dht) | ||
450 | , TableKey dht ih | ||
451 | , Show ih) => | ||
452 | (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]))) | ||
453 | -> ih | ||
454 | -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r) | ||
455 | isearch f ih = do | ||
456 | qry <- f ih | ||
457 | ns <- kclosest 8 ih <$> getTable | ||
458 | liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns | ||
459 | a <- fork $ do | ||
460 | tid <- myThreadId | ||
461 | labelThread tid ("search."++show ih) | ||
462 | Search.search s | ||
463 | -- atomically \$ readTVar (Search.searchResults s) | ||
464 | return (a, s) | ||
465 | |||
466 | -- | Background search: fill a lazy list using a background thread. | ||
467 | bgsearch f ih = do | ||
468 | (tid, s) <- isearch f ih | ||
469 | let again shown = do | ||
470 | (chk,fin) <- atomically $ do | ||
471 | r <- (Set.\\ shown) <$> readTVar (Search.searchResults s) | ||
472 | if not $ Set.null r | ||
473 | then (,) r <$> Search.searchIsFinished s | ||
474 | else Search.searchIsFinished s >>= check >> return (Set.empty,True) | ||
475 | let ps = Set.toList chk | ||
476 | if fin then return ps | ||
477 | else do | ||
478 | xs <- unsafeInterleaveIO $ again (shown `Set.union` chk) | ||
479 | return $ ps ++ xs | ||
480 | liftIO $ again Set.empty | ||
481 | |||
482 | type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u] | ||
483 | |||
484 | #if 0 | ||
485 | |||
486 | -- TODO: use reorder and filter (Traversal option) leftovers | ||
487 | -- search :: k -> IterationI ip o -> Search ip o | ||
488 | search _ action = do | ||
489 | awaitForever $ \ batch -> unless (L.null batch) $ do | ||
490 | $(logWarnS) "search" "start query" | ||
491 | responses <- lift $ queryParallel (action <$> batch) | ||
492 | let (nodes, results) = partitionEithers responses | ||
493 | $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results))) | ||
494 | leftover $ L.concat nodes | ||
495 | let r = mapM_ yield results | ||
496 | _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ()) | ||
497 | r | ||
498 | |||
499 | #endif | ||
500 | |||
501 | publish = error "todo" | ||
502 | -- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip () | ||
503 | -- publish ih p = do | ||
504 | -- nodes <- getClosest ih | ||
505 | -- r <- asks (optReplication . options) | ||
506 | -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r | ||
507 | -- return () | ||
508 | |||
509 | |||
510 | probeNode :: ( Default u | ||
511 | , Show u | ||
512 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
513 | , DHT.Kademlia dht | ||
514 | , Address ip | ||
515 | , Ord (TransactionID dht) | ||
516 | , Serialize (TransactionID dht) | ||
517 | , WireFormat raw dht | ||
518 | , SerializableTo raw (Response dht (Ping dht)) | ||
519 | , SerializableTo raw (Query dht (Ping dht)) | ||
520 | , Ord (NodeId dht) | ||
521 | , FiniteBits (NodeId dht) | ||
522 | , Show (NodeId dht) | ||
523 | , Show (QueryMethod dht) | ||
524 | ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP) | ||
525 | probeNode addr = do | ||
526 | $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr))) | ||
527 | result <- try $ pingQ addr | ||
528 | let _ = fmap (const ()) result :: Either QueryFailure () | ||
529 | return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result | ||
530 | |||
531 | |||
532 | refreshNodes :: forall raw dht u ip. | ||
533 | ( Address ip | ||
534 | , Ord (NodeId dht) | ||
535 | , Default u | ||
536 | , FiniteBits (NodeId dht) | ||
537 | , Pretty (NodeId dht) | ||
538 | , DHT.Kademlia dht | ||
539 | , Ord ip | ||
540 | , Ord (TransactionID dht) | ||
541 | , SerializableTo raw (Response dht (NodeFound dht ip)) | ||
542 | , SerializableTo raw (Query dht (FindNode dht ip)) | ||
543 | , SerializableTo raw (Response dht (Ping dht)) | ||
544 | , SerializableTo raw (Query dht (Ping dht)) | ||
545 | , Pretty (NodeInfo dht ip u) | ||
546 | , Show (NodeId dht) | ||
547 | , Show u | ||
548 | , Show (QueryMethod dht) | ||
549 | , Serialize (TransactionID dht) | ||
550 | , WireFormat raw dht | ||
551 | , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip)) | ||
552 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
553 | ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()] | ||
554 | -- FIXME do not use getClosest sinse we should /refresh/ them | ||
555 | refreshNodes nid = do | ||
556 | $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid))) | ||
557 | nodes <- getClosest nid | ||
558 | do | ||
559 | -- forM (L.take 1 nodes) \$ \ addr -> do | ||
560 | -- NodeFound ns <- FindNode nid <@> addr | ||
561 | -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
562 | -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) () | ||
563 | -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume | ||
564 | -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume | ||
565 | ns <- bgsearch ioFindNodes nid | ||
566 | $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes." | ||
567 | _ <- queryParallel $ flip L.map ns $ \n -> do | ||
568 | $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n)) | ||
569 | pingQ n | ||
570 | -- pingQ takes care of inserting the node. | ||
571 | return () | ||
572 | return () -- \$ L.concat nss | ||
573 | |||
574 | logc :: Char -> String -> DHT raw dht u ip () | ||
575 | logc 'D' = $(logDebugS) "insertNode" . T.pack | ||
576 | logc 'W' = $(logWarnS) "insertNode" . T.pack | ||
577 | logc 'I' = $(logInfoS) "insertNode" . T.pack | ||
578 | logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :) | ||
579 | |||
580 | -- | This operation do not block but acquire exclusive access to | ||
581 | -- routing table. | ||
582 | insertNode :: forall raw dht u ip. | ||
583 | ( Address ip | ||
584 | , Ord (NodeId dht) | ||
585 | , FiniteBits (NodeId dht) | ||
586 | , Show (NodeId dht) | ||
587 | , Default u | ||
588 | , Show u | ||
589 | , DHT.Kademlia dht | ||
590 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
591 | , Ord (TransactionID dht) | ||
592 | , WireFormat raw dht | ||
593 | , Serialize (TransactionID dht) | ||
594 | , SerializableTo raw (Response dht (Ping dht)) | ||
595 | , SerializableTo raw (Query dht (Ping dht)) | ||
596 | , Ord (NodeId dht) | ||
597 | , Show (NodeId dht) | ||
598 | , Show (QueryMethod dht) | ||
599 | ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip () | ||
600 | insertNode info witnessed_ip0 = do | ||
601 | f <- insertNode1 | ||
602 | liftIO $ f info witnessed_ip0 | ||
603 | |||
604 | insertNode1 :: forall raw dht u ip. | ||
605 | ( Address ip | ||
606 | , Default u | ||
607 | , Show u | ||
608 | , Ord (NodeId dht) | ||
609 | , FiniteBits (NodeId dht) | ||
610 | , Show (NodeId dht) | ||
611 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
612 | , DHT.Kademlia dht | ||
613 | , Ord (TransactionID dht) | ||
614 | , WireFormat raw dht | ||
615 | , Serialize (TransactionID dht) | ||
616 | , SerializableTo raw (Response dht (Ping dht)) | ||
617 | , SerializableTo raw (Query dht (Ping dht)) | ||
618 | , Ord (NodeId dht) | ||
619 | , Show (NodeId dht) | ||
620 | , Show (QueryMethod dht) | ||
621 | ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ()) | ||
622 | insertNode1 = do | ||
623 | bc <- optBucketCount <$> asks options | ||
624 | nid <- asks tentativeNodeId | ||
625 | logm0 <- embed_ (uncurry logc) | ||
626 | let logm c = logm0 . (c,) | ||
627 | dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state. | ||
628 | probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP) | ||
629 | let probe n = probe0 n >>= runDHT dht_node_state . restoreM | ||
630 | {- | ||
631 | changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive | ||
632 | ip <- fromSockAddr ip0 :: Maybe ip | ||
633 | listToMaybe | ||
634 | $ rank id (nodeId $ foreignNode arrival) | ||
635 | $ bep42s ip (DHT.fallbackID params) -- warning: recursive | ||
636 | -} | ||
637 | params = DHT.TableParameters | ||
638 | { maxBuckets = bc :: Int | ||
639 | , fallbackID = nid :: NodeId dht | ||
640 | , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht | ||
641 | , logMessage = logm :: Char -> String -> IO () | ||
642 | , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP) | ||
643 | } | ||
644 | tbl <- asks routingInfo | ||
645 | let state = DHT.TableKeeper | ||
646 | { routingInfo = tbl | ||
647 | , grokNode = DHT.insertNode params state | ||
648 | , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO () | ||
649 | } | ||
650 | return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0 | ||
651 | |||
652 | -- | Throws exception if node is not responding. | ||
653 | queryNode :: forall raw dht u a b ip. | ||
654 | ( Address ip | ||
655 | , KRPC dht (Query dht a) (Response dht b) | ||
656 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
657 | , Default u | ||
658 | , Show u | ||
659 | , DHT.Kademlia dht | ||
660 | , Ord (TransactionID dht) | ||
661 | , Serialize (TransactionID dht) | ||
662 | , WireFormat raw dht | ||
663 | , SerializableTo raw (Response dht b) | ||
664 | , SerializableTo raw (Query dht a) | ||
665 | , Ord (NodeId dht) | ||
666 | , FiniteBits (NodeId dht) | ||
667 | , Show (NodeId dht) | ||
668 | , Show (QueryMethod dht) | ||
669 | , SerializableTo raw (Response dht (Ping dht)) | ||
670 | , SerializableTo raw (Query dht (Ping dht)) | ||
671 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b) | ||
672 | queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q | ||
673 | |||
674 | queryNode' :: forall raw dht u a b ip. | ||
675 | ( Address ip | ||
676 | , Default u | ||
677 | , Show u | ||
678 | , DHT.Kademlia dht | ||
679 | , KRPC dht (Query dht a) (Response dht b) | ||
680 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
681 | , Ord (TransactionID dht) | ||
682 | , Serialize (TransactionID dht) | ||
683 | , WireFormat raw dht | ||
684 | , SerializableTo raw (Response dht b) | ||
685 | , SerializableTo raw (Query dht a) | ||
686 | , Ord (NodeId dht) | ||
687 | , FiniteBits (NodeId dht) | ||
688 | , Show (NodeId dht) | ||
689 | , Show (QueryMethod dht) | ||
690 | , SerializableTo raw (Response dht (Ping dht)) | ||
691 | , SerializableTo raw (Query dht (Ping dht)) | ||
692 | ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
693 | queryNode' ni q = do | ||
694 | let addr = nodeAddr ni | ||
695 | dest = makeAddress (Left $ nodeId ni) (toSockAddr addr) | ||
696 | coldQueryNode' addr dest q | ||
697 | |||
698 | coldQueryNode' :: forall raw dht u a b ip. | ||
699 | ( Address ip | ||
700 | , Default u | ||
701 | , Show u | ||
702 | , DHT.Kademlia dht | ||
703 | , KRPC dht (Query dht a) (Response dht b) | ||
704 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
705 | , Ord (TransactionID dht) | ||
706 | , Serialize (TransactionID dht) | ||
707 | , WireFormat raw dht | ||
708 | , SerializableTo raw (Response dht b) | ||
709 | , SerializableTo raw (Query dht a) | ||
710 | , Ord (NodeId dht) | ||
711 | , FiniteBits (NodeId dht) | ||
712 | , Show (NodeId dht) | ||
713 | , Show (QueryMethod dht) | ||
714 | , SerializableTo raw (Response dht (Ping dht)) | ||
715 | , SerializableTo raw (Query dht (Ping dht)) | ||
716 | ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP) | ||
717 | coldQueryNode' addr dest q = do | ||
718 | nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest | ||
719 | dta <- asks dhtData | ||
720 | qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b)) | ||
721 | let read_only = False -- TODO: check for NAT issues. (BEP 43) | ||
722 | -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b) | ||
723 | mgr <- asks manager | ||
724 | (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q) | ||
725 | -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip) | ||
726 | -- <> " by " <> T.pack (show (toSockAddr addr)) | ||
727 | _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip | ||
728 | return (remoteId, r, witnessed_ip) | ||
729 | |||
730 | -- | Infix version of 'queryNode' function. | ||
731 | (<@>) :: ( Address ip | ||
732 | , KRPC dht (Query dht a) (Response dht b) | ||
733 | , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht)) | ||
734 | , Default u | ||
735 | , Show u | ||
736 | , Show (QueryMethod dht) | ||
737 | , Ord (NodeId dht) | ||
738 | , FiniteBits (NodeId dht) | ||
739 | , Show (NodeId dht) | ||
740 | , Ord (TransactionID dht) | ||
741 | , Serialize (TransactionID dht) | ||
742 | , SerializableTo raw (Response dht b) | ||
743 | , SerializableTo raw (Query dht a) | ||
744 | , SerializableTo raw (Response dht (Ping dht)) | ||
745 | , SerializableTo raw (Query dht (Ping dht)) | ||
746 | , WireFormat raw dht | ||
747 | , Kademlia dht | ||
748 | ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b | ||
749 | q <@> addr = snd <$> queryNode addr q | ||
750 | {-# INLINE (<@>) #-} | ||