summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT')
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs750
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs571
2 files changed, 0 insertions, 1321 deletions
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
deleted file mode 100644
index 003bb5b9..00000000
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ /dev/null
@@ -1,750 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module provides functions to interact with other nodes.
9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead.
11--
12{-# LANGUAGE CPP #-}
13{-# LANGUAGE FlexibleContexts #-}
14{-# LANGUAGE ScopedTypeVariables #-}
15{-# LANGUAGE TemplateHaskell #-}
16{-# LANGUAGE TupleSections #-}
17{-# LANGUAGE PartialTypeSignatures #-}
18{-# LANGUAGE GADTs #-}
19{-# LANGUAGE RankNTypes #-}
20{-# LANGUAGE MultiParamTypeClasses #-}
21module Network.BitTorrent.DHT.Query
22 ( -- * Handler
23 -- | To bind specific set of handlers you need to pass
24 -- handler list to the 'startNode' function.
25 pingH
26 , findNodeH
27 , getPeersH
28 , announceH
29 , defaultHandlers
30
31 -- * Query
32 -- ** Basic
33 -- | A basic query perform a single request expecting a
34 -- single response.
35 , Iteration
36 , pingQ
37 , coldPingQ
38 , findNodeQ
39 , getPeersQ
40 , announceQ
41
42 -- ** Iterative
43 -- | An iterative query perform multiple basic queries,
44 -- concatenate its responses, optionally yielding result and
45 -- continue to the next iteration.
46 , Search
47 -- , search
48 , publish
49 , ioFindNode
50 , ioFindNodes
51 , ioGetPeers
52 , isearch
53 , bgsearch
54
55 -- ** Routing table
56 , insertNode
57 , refreshNodes
58
59 -- ** Messaging
60 , queryNode
61 , queryNode'
62 , (<@>)
63 ) where
64
65import Data.Bits
66import Data.Default
67#ifdef THREAD_DEBUG
68import Control.Concurrent.Lifted.Instrument hiding (yield)
69#else
70import GHC.Conc (labelThread)
71import Control.Concurrent.Lifted hiding (yield)
72#endif
73import Control.Exception.Lifted hiding (Handler)
74import Control.Monad.Reader
75import Control.Monad.Logger
76import Data.Maybe
77import Data.Conduit
78import Data.Conduit.List as C hiding (mapMaybe, mapM_)
79import Data.Either
80import Data.List as L
81import Data.Monoid
82import Data.Text as T
83import qualified Data.Set as Set
84 ;import Data.Set (Set)
85import Network
86import Text.PrettyPrint as PP hiding ((<>), ($$))
87import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
88import Data.Time
89import Data.Time.Clock.POSIX
90import Data.Hashable (Hashable)
91import Data.Serialize
92import Data.Hashable
93
94import Network.DatagramServer as KRPC hiding (Options, def)
95import Network.KRPC.Method as KRPC
96import Network.DatagramServer.Mainline (ReflectedIP(..), QueryExtra(..), ResponseExtra(..))
97import Network.DatagramServer (QueryFailure(..))
98import Data.Torrent
99import qualified Network.DHT as DHT
100import Network.DHT.Mainline
101import Network.DHT.Routing as R
102import Network.BitTorrent.DHT.Session
103import Control.Concurrent.STM
104import qualified Network.BitTorrent.DHT.Search as Search
105#ifdef VERSION_bencoding
106import Data.BEncode (BValue)
107import Network.DatagramServer.Mainline (KMessageOf)
108#else
109import Data.ByteString (ByteString)
110import Network.DatagramServer.Tox
111#endif
112import Network.Address hiding (NodeId)
113import Network.DatagramServer.Types as RPC hiding (Query,Response)
114import Network.DHT.Types
115import Control.Monad.Trans.Control
116import Data.Typeable
117import Data.Serialize
118import System.IO.Unsafe (unsafeInterleaveIO)
119import Data.String
120
121
122{-----------------------------------------------------------------------
123-- Handlers
124-----------------------------------------------------------------------}
125
126{-
127nodeHandler :: ( Address ip
128 , KRPC dht (Query KMessageOf a) (Response KMessageOf b)
129 )
130 => (NodeInfo KMessageOf ip () -> Maybe ReflectedIP -> IO ()) -> (NodeAddr ip -> IO (NodeId KMessageOf)) -> (Char -> String -> Text -> IO ()) -> QueryMethod KMessageOf -> (NodeAddr ip -> a -> IO b) -> NodeHandler
131-}
132nodeHandler :: forall raw dht addr u t q r.
133 (Address addr, WireFormat raw dht, Pretty (NodeInfo dht addr u),
134 Default u,
135 IsString t, Functor dht,
136 KRPC dht (Query dht q) (Response dht r),
137 SerializableTo raw (Response dht r),
138 SerializableTo raw (Query dht q),
139 Show (QueryMethod dht)) =>
140 (NodeInfo dht addr u -> Maybe ReflectedIP -> IO ())
141 -> (NodeAddr addr -> IO (NodeId dht))
142 -> (Char -> t -> Text -> IO ())
143 -> DHTData dht addr
144 -> QueryMethod dht
145 -> (NodeAddr addr -> q -> IO r)
146 -> Handler IO dht raw
147nodeHandler insertNode myNodeIdAccordingTo logm dta method action = handler (\sockaddr -> myNodeIdAccordingTo (error "todo")) method $ \ sockAddr msg -> do
148 let remoteId = messageSender (msg :: dht (Query dht q)) resptype
149 qextra = queryExtra qry
150 resptype = Proxy :: Proxy (Response dht r)
151 q = queryParams qry
152 qry = envelopePayload msg :: Query dht q
153 case fromSockAddr sockAddr of
154 Nothing -> throwIO BadAddress
155 Just naddr -> do
156 logm 'D' "nodeHandler" $ "Received query: " <> T.pack (show $ method)
157 me <- myNodeIdAccordingTo naddr
158 rextra <- liftIO $ makeResponseExtra dta me qry resptype
159 let ni = NodeInfo remoteId naddr def
160 -- Do not route read-only nodes. (bep 43)
161 if fromRoutableNode qextra
162 then insertNode ni Nothing >> return () -- TODO need to block. why?
163 else logm 'W' "nodeHandler" $ "READ-ONLY " <> T.pack (show $ pPrint ni)
164 Response
165 <$> pure rextra
166 <*> action naddr q
167
168-- | Default 'Ping' handler.
169pingH :: DHT.Kademlia dht => Proxy dht -> NodeAddr ip -> Ping dht -> IO (Ping dht)
170pingH dht _ _ = return (DHT.pongMessage dht)
171-- pingH = nodeHandler $ \ _ p@PingPayload{} -> return p { isPong = True }
172
173-- | Default 'FindNode' handler.
174findNodeH :: Kademlia dht => (NodeId dht -> IO [NodeInfo dht ip u]) -> NodeAddr ip -> FindNode dht ip -> IO (NodeFound dht ip)
175findNodeH getclosest _ msg = foundNodesMessage . L.map (fmap (const ())) <$> getclosest (findWho msg)
176
177-- | Default 'GetPeers' handler.
178getPeersH :: Hashable ip => (InfoHash -> IO (PeerList ip)) -> TVar SessionTokens -> NodeAddr ip -> GetPeers ip -> IO (GotPeers ip)
179getPeersH getPeerList toks naddr (GetPeers ih) = do
180 ps <- getPeerList ih
181 tok <- grantToken toks naddr
182 return $ GotPeers ps tok
183
184-- | Default 'Announce' handler.
185announceH :: ( Ord ip, Hashable ip ) => TVar (PeerStore ip) -> TVar SessionTokens -> NodeAddr ip -> Announce -> IO Announced
186announceH peers toks naddr @ NodeAddr {..} (Announce {..}) = do
187 valid <- checkToken toks naddr sessionToken
188 unless valid $ do
189 throwIO $ InvalidParameter "token"
190
191 let annPort = if impliedPort then nodePort else port
192 peerAddr = PeerAddr Nothing nodeHost annPort
193 insertPeer peers topic announcedName peerAddr
194 return Announced
195
196-- | Includes all Kademlia-related handlers.
197kademliaHandlers :: forall raw dht u ip. (Eq ip, Ord ip, Address ip
198 , Ord (TransactionID dht)
199 , Ord (NodeId dht)
200 , Show u
201 , SerializableTo raw (Response dht (Ping dht))
202 , SerializableTo raw (Query dht (Ping dht))
203 , Show (QueryMethod dht)
204 , Show (NodeId dht)
205 , FiniteBits (NodeId dht)
206 , Default u
207 , Serialize (TransactionID dht)
208 , WireFormat raw dht
209 , Kademlia dht
210 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
211 , Functor dht
212 , Pretty (NodeInfo dht ip u)
213 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
214 , SerializableTo raw (Response dht (NodeFound dht ip))
215 , SerializableTo raw (Query dht (FindNode dht ip))
216 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
217-- kademliaHandlers :: forall ip. (Eq ip, Ord ip, Address ip) => LogFun -> DHT BValue KMessageOf () ip [NodeHandler]
218kademliaHandlers logger = do
219 groknode <- insertNode1
220 mynid <- myNodeIdAccordingTo1
221 dta <- asks dhtData
222 let handler :: ( KRPC dht (Query dht a) (Response dht b)
223 , SerializableTo raw (Response dht b)
224 , SerializableTo raw (Query dht a)
225 ) => QueryMethod dht -> (NodeAddr ip -> a -> IO b) -> Handler IO dht raw
226 handler = nodeHandler groknode mynid (logt logger) dta
227 dht = Proxy :: Proxy dht
228 getclosest <- getClosest1
229 return [ handler (namePing dht) $ pingH dht
230 , handler (nameFindNodes dht) $ findNodeH getclosest
231 ]
232
233instance DataHandlers BValue KMessageOf where
234 dataHandlers = bthandlers
235
236bthandlers ::
237 ( Ord ip , Hashable ip, Typeable ip, Serialize ip) =>
238 (NodeId KMessageOf -> IO [NodeInfo KMessageOf ip ()])
239 -> DHTData KMessageOf ip
240 -> [MethodHandler BValue KMessageOf ip]
241bthandlers getclosest dta =
242 [ MethodHandler "get_peers" $ getPeersH (getpeers dta) (sessionTokens dta)
243 , MethodHandler "announce_peer" $ announceH (contactInfo dta) (sessionTokens dta)
244 ]
245 where
246 getpeers dta ih = do
247 ps <- lookupPeers (contactInfo dta) ih
248 if L.null ps
249 then Left <$> getclosest (toNodeId ih)
250 else return (Right ps)
251
252
253-- | Includes all default query handlers.
254defaultHandlers :: forall raw dht u ip.
255 ( Ord (TransactionID dht)
256 , Ord (NodeId dht)
257 , Show u
258 , SerializableTo raw (Response dht (Ping dht))
259 , SerializableTo raw (Query dht (Ping dht))
260 , Show (QueryMethod dht)
261 , Show (NodeId dht)
262 , FiniteBits (NodeId dht)
263 , Default u
264 , Serialize (TransactionID dht)
265 , WireFormat raw dht
266 , Kademlia dht
267 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
268 , Functor dht
269 , Pretty (NodeInfo dht ip u)
270 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
271 , SerializableTo raw (Response dht (NodeFound dht ip))
272 , SerializableTo raw (Query dht (FindNode dht ip))
273 , Eq ip, Ord ip, Address ip, DataHandlers raw dht
274 ) => LogFun -> DHT raw dht u ip [Handler IO dht raw]
275defaultHandlers logger = do
276 groknode <- insertNode1
277 mynid <- myNodeIdAccordingTo1
278 dta <- asks dhtData
279 let handler :: MethodHandler raw dht ip -> Handler IO dht raw
280 handler (MethodHandler name action) = nodeHandler groknode mynid (logt logger) dta name action
281 getclosest <- getClosest1
282 hs <- kademliaHandlers logger
283 return $ hs ++ L.map handler (dataHandlers (fmap (fmap (fmap (const ()))) . getclosest) dta)
284
285{-----------------------------------------------------------------------
286-- Basic queries
287-----------------------------------------------------------------------}
288
289type Iteration raw dht u ip o = NodeInfo dht ip u -> DHT raw dht u ip (Either [NodeInfo dht ip u] [o ip])
290
291-- | The most basic query. May be used to check if the given node is
292-- alive or get its 'NodeId'.
293pingQ :: forall raw dht u ip.
294 ( DHT.Kademlia dht
295 , Address ip
296 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
297 , Default u
298 , Show u
299 , Ord (TransactionID dht)
300 , Serialize (TransactionID dht)
301 , WireFormat raw dht
302 , SerializableTo raw (Response dht (Ping dht))
303 , SerializableTo raw (Query dht (Ping dht))
304 , Ord (NodeId dht)
305 , FiniteBits (NodeId dht)
306 , Show (NodeId dht)
307 , Show (QueryMethod dht)
308 ) => NodeInfo dht ip u -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
309pingQ ni = do
310 let ping = DHT.pingMessage (Proxy :: Proxy dht)
311 (nid, pong, mip) <- queryNode' ni ping
312 let _ = pong `asTypeOf` ping
313 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
314 return (NodeInfo nid (nodeAddr ni) def, mip)
315
316-- | The most basic query. May be used to check if the given node is
317-- alive or get its 'NodeId'.
318coldPingQ :: forall raw dht u ip.
319 ( DHT.Kademlia dht
320 , Address ip
321 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
322 , Default u
323 , Show u
324 , Ord (TransactionID dht)
325 , Serialize (TransactionID dht)
326 , WireFormat raw dht
327 , SerializableTo raw (Response dht (Ping dht))
328 , SerializableTo raw (Query dht (Ping dht))
329 , Ord (NodeId dht)
330 , FiniteBits (NodeId dht)
331 , Show (NodeId dht)
332 , Show (QueryMethod dht)
333 ) => PacketDestination dht -> DHT raw dht u ip (NodeInfo dht ip u , Maybe ReflectedIP)
334coldPingQ dest = do
335 let ping = DHT.pingMessage (Proxy :: Proxy dht)
336 naddr <- maybe (throwIO $ QueryFailed ProtocolError "unable to construct NodeAddr from PacketDestination")
337 return
338 $ fromAddr dest
339 (nid, pong, mip) <- coldQueryNode' naddr dest ping
340 let _ = pong `asTypeOf` ping
341 -- (nid, PingPayload{}, mip) <- queryNode' addr PingPayload {isPong=False, pingId=pid}
342 return (NodeInfo nid naddr def, mip)
343
344-- TODO [robustness] match range of returned node ids with the
345-- expected range and either filter bad nodes or discard response at
346-- all throwing an exception
347-- findNodeQ :: Address ip => TableKey key => key -> IterationI ip NodeInfo
348findNodeQ proxy key ni = do
349 closest <- fmap DHT.foundNodes $ DHT.findNodeMessage proxy key <@> ni
350 $(logInfoS) "findNodeQ" $ "NodeFound\n"
351 <> T.pack (L.unlines $ L.map ((' ' :) . show . pPrint) closest)
352 return $ Right closest
353
354#ifdef VERSION_bencoding
355getPeersQ :: Address ip => InfoHash -> Iteration BValue KMessageOf () ip PeerAddr
356getPeersQ topic ni = do
357 GotPeers {..} <- GetPeers topic <@> ni
358 let dist = distance (toNodeId topic) (nodeId ni)
359 $(logInfoS) "getPeersQ" $ T.pack
360 $ "distance: " <> render (pPrint dist) <> " , result: "
361 <> case peers of { Left _ -> "NODES"; Right _ -> "PEERS" }
362 return peers
363
364announceQ :: Address ip => InfoHash -> PortNumber -> Iteration BValue KMessageOf () ip NodeAddr
365announceQ ih p ni = do
366 GotPeers {..} <- GetPeers ih <@> ni
367 case peers of
368 Left ns
369 | False -> undefined -- TODO check if we can announce
370 | otherwise -> return (Left ns)
371 Right _ -> do -- TODO *probably* add to peer cache
372 Announced <- Announce False ih Nothing p grantedToken <@> ni
373 return (Right [nodeAddr ni])
374#endif
375
376{-----------------------------------------------------------------------
377-- Iterative queries
378-----------------------------------------------------------------------}
379
380
381ioGetPeers :: Address ip => InfoHash -> DHT BValue KMessageOf () ip (NodeInfo KMessageOf ip () -> IO ([NodeInfo KMessageOf ip ()], [PeerAddr ip]))
382ioGetPeers ih = do
383 session <- ask
384 return $ \ni -> runDHT session $ do
385 r <- try $ getPeersQ ih ni
386 case r of
387 Right e -> return $ either (,[]) ([],) e
388 Left e -> let _ = e :: QueryFailure in return ([],[])
389
390ioFindNode :: ( DHT.Kademlia dht
391 , WireFormat raw dht
392 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
393 , Address ip
394 , Default u
395 , Show u
396 , Show (QueryMethod dht)
397 , TableKey dht infohash
398 , Eq (NodeId dht)
399 , Ord (NodeId dht)
400 , FiniteBits (NodeId dht)
401 , Show (NodeId dht)
402 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
403 , Ord (TransactionID dht)
404 , Serialize (TransactionID dht)
405 , SerializableTo raw (Response dht (NodeFound dht ip))
406 , SerializableTo raw (Query dht (FindNode dht ip))
407 , SerializableTo raw (Response dht (Ping dht))
408 , SerializableTo raw (Query dht (Ping dht))
409 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
410ioFindNode ih = do
411 session <- ask
412 return $ \ni -> runDHT session $ do
413 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
414 let ns' = L.map (fmap (const def)) ns
415 return $ L.partition (\n -> nodeId n /= toNodeId ih) ns'
416
417
418-- | Like ioFindNode, but considers all found nodes to be 'Right' results.
419ioFindNodes :: ( DHT.Kademlia dht
420 , WireFormat raw dht
421 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
422 , Address ip
423 , Default u
424 , Show u
425 , Show (QueryMethod dht)
426 , TableKey dht infohash
427 , Eq (NodeId dht)
428 , Ord (NodeId dht)
429 , FiniteBits (NodeId dht)
430 , Show (NodeId dht)
431 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
432 , Ord (TransactionID dht)
433 , Serialize (TransactionID dht)
434 , SerializableTo raw (Response dht (NodeFound dht ip))
435 , SerializableTo raw (Query dht (FindNode dht ip))
436 , SerializableTo raw (Response dht (Ping dht))
437 , SerializableTo raw (Query dht (Ping dht))
438 ) => infohash -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [NodeInfo dht ip u]))
439ioFindNodes ih = do
440 session <- ask
441 return $ \ni -> runDHT session $ do
442 ns <- fmap DHT.foundNodes $ DHT.findNodeMessage Proxy ih <@> ni
443 let ns' = L.map (fmap (const def)) ns
444 return ([], ns')
445
446isearch :: ( Ord r
447 , Ord ip
448 , Ord (NodeId dht)
449 , FiniteBits (NodeId dht)
450 , TableKey dht ih
451 , Show ih) =>
452 (ih -> DHT raw dht u ip (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])))
453 -> ih
454 -> DHT raw dht u ip (ThreadId, Search.IterativeSearch dht u ip r)
455isearch f ih = do
456 qry <- f ih
457 ns <- kclosest 8 ih <$> getTable
458 liftIO $ do s <- Search.newSearch qry (toNodeId ih) ns
459 a <- fork $ do
460 tid <- myThreadId
461 labelThread tid ("search."++show ih)
462 Search.search s
463 -- atomically \$ readTVar (Search.searchResults s)
464 return (a, s)
465
466-- | Background search: fill a lazy list using a background thread.
467bgsearch f ih = do
468 (tid, s) <- isearch f ih
469 let again shown = do
470 (chk,fin) <- atomically $ do
471 r <- (Set.\\ shown) <$> readTVar (Search.searchResults s)
472 if not $ Set.null r
473 then (,) r <$> Search.searchIsFinished s
474 else Search.searchIsFinished s >>= check >> return (Set.empty,True)
475 let ps = Set.toList chk
476 if fin then return ps
477 else do
478 xs <- unsafeInterleaveIO $ again (shown `Set.union` chk)
479 return $ ps ++ xs
480 liftIO $ again Set.empty
481
482type Search raw dht u ip o = Conduit [NodeInfo dht ip u] (DHT raw dht u ip) [o dht ip u]
483
484#if 0
485
486-- TODO: use reorder and filter (Traversal option) leftovers
487-- search :: k -> IterationI ip o -> Search ip o
488search _ action = do
489 awaitForever $ \ batch -> unless (L.null batch) $ do
490 $(logWarnS) "search" "start query"
491 responses <- lift $ queryParallel (action <$> batch)
492 let (nodes, results) = partitionEithers responses
493 $(logWarnS) "search" ("done query more:" <> T.pack (show (L.length nodes, L.length results)))
494 leftover $ L.concat nodes
495 let r = mapM_ yield results
496 _ = (action,r) :: (a -> DHT raw dht u ip (Either [a] o), ConduitM [a] o (DHT raw dht u ip) ())
497 r
498
499#endif
500
501publish = error "todo"
502-- publish :: Address ip => InfoHash -> PortNumber -> DHT BValue KMessageOf () ip ()
503-- publish ih p = do
504 -- nodes <- getClosest ih
505 -- r <- asks (optReplication . options)
506 -- _ <- sourceList [nodes] $= search ih (announceQ ih p) $$ C.take r
507 -- return ()
508
509
510probeNode :: ( Default u
511 , Show u
512 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
513 , DHT.Kademlia dht
514 , Address ip
515 , Ord (TransactionID dht)
516 , Serialize (TransactionID dht)
517 , WireFormat raw dht
518 , SerializableTo raw (Response dht (Ping dht))
519 , SerializableTo raw (Query dht (Ping dht))
520 , Ord (NodeId dht)
521 , FiniteBits (NodeId dht)
522 , Show (NodeId dht)
523 , Show (QueryMethod dht)
524 ) => NodeInfo dht ip u -> DHT raw dht u ip (Bool , Maybe ReflectedIP)
525probeNode addr = do
526 $(logDebugS) "routing.questionable_node" (T.pack (render (pPrint $ nodeAddr addr)))
527 result <- try $ pingQ addr
528 let _ = fmap (const ()) result :: Either QueryFailure ()
529 return $ either (const (False,Nothing)) (\(_,mip)->(True,mip)) result
530
531
532refreshNodes :: forall raw dht u ip.
533 ( Address ip
534 , Ord (NodeId dht)
535 , Default u
536 , FiniteBits (NodeId dht)
537 , Pretty (NodeId dht)
538 , DHT.Kademlia dht
539 , Ord ip
540 , Ord (TransactionID dht)
541 , SerializableTo raw (Response dht (NodeFound dht ip))
542 , SerializableTo raw (Query dht (FindNode dht ip))
543 , SerializableTo raw (Response dht (Ping dht))
544 , SerializableTo raw (Query dht (Ping dht))
545 , Pretty (NodeInfo dht ip u)
546 , Show (NodeId dht)
547 , Show u
548 , Show (QueryMethod dht)
549 , Serialize (TransactionID dht)
550 , WireFormat raw dht
551 , KRPC dht (Query dht (FindNode dht ip)) (Response dht (NodeFound dht ip))
552 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
553 ) => NodeId dht -> DHT raw dht u ip () -- [NodeInfo KMessageOf ip ()]
554-- FIXME do not use getClosest sinse we should /refresh/ them
555refreshNodes nid = do
556 $(logDebugS) "routing.refresh_bucket" (T.pack (render (pPrint nid)))
557 nodes <- getClosest nid
558 do
559 -- forM (L.take 1 nodes) \$ \ addr -> do
560 -- NodeFound ns <- FindNode nid <@> addr
561 -- Expected type: ConduitM [NodeAddr ip] [NodeInfo KMessageOf ip ()] (DHT ip) ()
562 -- Actual type: ConduitM [NodeInfo KMessageOf ip ()] [NodeInfo KMessageOf ip ()] (DHT ip) ()
563 -- nss <- sourceList [[addr]] \$= search nid (findNodeQ nid) $$ C.consume
564 -- nss <- sourceList [nodes] \$= search nid (findNodeQ (Proxy :: Proxy dht) nid) $$ C.consume
565 ns <- bgsearch ioFindNodes nid
566 $(logWarnS) "refreshNodes" $ "received " <> T.pack (show (L.length ns)) <> " nodes."
567 _ <- queryParallel $ flip L.map ns $ \n -> do
568 $(logWarnS) "refreshNodes" $ "received node: " <> T.pack (show (pPrint n))
569 pingQ n
570 -- pingQ takes care of inserting the node.
571 return ()
572 return () -- \$ L.concat nss
573
574logc :: Char -> String -> DHT raw dht u ip ()
575logc 'D' = $(logDebugS) "insertNode" . T.pack
576logc 'W' = $(logWarnS) "insertNode" . T.pack
577logc 'I' = $(logInfoS) "insertNode" . T.pack
578logc c = $(logInfoS) "insertNode" . T.pack . (c :) . (':' :)
579
580-- | This operation do not block but acquire exclusive access to
581-- routing table.
582insertNode :: forall raw dht u ip.
583 ( Address ip
584 , Ord (NodeId dht)
585 , FiniteBits (NodeId dht)
586 , Show (NodeId dht)
587 , Default u
588 , Show u
589 , DHT.Kademlia dht
590 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
591 , Ord (TransactionID dht)
592 , WireFormat raw dht
593 , Serialize (TransactionID dht)
594 , SerializableTo raw (Response dht (Ping dht))
595 , SerializableTo raw (Query dht (Ping dht))
596 , Ord (NodeId dht)
597 , Show (NodeId dht)
598 , Show (QueryMethod dht)
599 ) => NodeInfo dht ip u -> Maybe ReflectedIP -> DHT raw dht u ip ()
600insertNode info witnessed_ip0 = do
601 f <- insertNode1
602 liftIO $ f info witnessed_ip0
603
604insertNode1 :: forall raw dht u ip.
605 ( Address ip
606 , Default u
607 , Show u
608 , Ord (NodeId dht)
609 , FiniteBits (NodeId dht)
610 , Show (NodeId dht)
611 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
612 , DHT.Kademlia dht
613 , Ord (TransactionID dht)
614 , WireFormat raw dht
615 , Serialize (TransactionID dht)
616 , SerializableTo raw (Response dht (Ping dht))
617 , SerializableTo raw (Query dht (Ping dht))
618 , Ord (NodeId dht)
619 , Show (NodeId dht)
620 , Show (QueryMethod dht)
621 ) => DHT raw dht u ip (NodeInfo dht ip u -> Maybe ReflectedIP -> IO ())
622insertNode1 = do
623 bc <- optBucketCount <$> asks options
624 nid <- asks tentativeNodeId
625 logm0 <- embed_ (uncurry logc)
626 let logm c = logm0 . (c,)
627 dht_node_state <- ask -- XXX: This prevents ping probe from modifying the Node state.
628 probe0 <- embed probeNode -- probeNode :: Address ip => NodeAddr ip -> DHT ip (Bool, Maybe ReflectedIP)
629 let probe n = probe0 n >>= runDHT dht_node_state . restoreM
630 {-
631 changeip ip0 arrival = fromMaybe (DHT.fallbackID params) $ do -- warning: recursive
632 ip <- fromSockAddr ip0 :: Maybe ip
633 listToMaybe
634 $ rank id (nodeId $ foreignNode arrival)
635 $ bep42s ip (DHT.fallbackID params) -- warning: recursive
636 -}
637 params = DHT.TableParameters
638 { maxBuckets = bc :: Int
639 , fallbackID = nid :: NodeId dht
640 , adjustID = dhtAdjustID Proxy (DHT.fallbackID params) :: SockAddr -> Event dht ip u -> NodeId dht
641 , logMessage = logm :: Char -> String -> IO ()
642 , pingProbe = probe :: NodeInfo dht ip u -> IO (Bool, Maybe ReflectedIP)
643 }
644 tbl <- asks routingInfo
645 let state = DHT.TableKeeper
646 { routingInfo = tbl
647 , grokNode = DHT.insertNode params state
648 , grokAddress = \_ _ -> return () -- :: Maybe SockAddr -> ReflectedIP -> IO ()
649 }
650 return $ \info witnessed_ip0 -> DHT.insertNode params state info witnessed_ip0
651
652-- | Throws exception if node is not responding.
653queryNode :: forall raw dht u a b ip.
654 ( Address ip
655 , KRPC dht (Query dht a) (Response dht b)
656 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
657 , Default u
658 , Show u
659 , DHT.Kademlia dht
660 , Ord (TransactionID dht)
661 , Serialize (TransactionID dht)
662 , WireFormat raw dht
663 , SerializableTo raw (Response dht b)
664 , SerializableTo raw (Query dht a)
665 , Ord (NodeId dht)
666 , FiniteBits (NodeId dht)
667 , Show (NodeId dht)
668 , Show (QueryMethod dht)
669 , SerializableTo raw (Response dht (Ping dht))
670 , SerializableTo raw (Query dht (Ping dht))
671 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b)
672queryNode addr q = fmap (\(n,b,_) -> (n,b)) $ queryNode' addr q
673
674queryNode' :: forall raw dht u a b ip.
675 ( Address ip
676 , Default u
677 , Show u
678 , DHT.Kademlia dht
679 , KRPC dht (Query dht a) (Response dht b)
680 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
681 , Ord (TransactionID dht)
682 , Serialize (TransactionID dht)
683 , WireFormat raw dht
684 , SerializableTo raw (Response dht b)
685 , SerializableTo raw (Query dht a)
686 , Ord (NodeId dht)
687 , FiniteBits (NodeId dht)
688 , Show (NodeId dht)
689 , Show (QueryMethod dht)
690 , SerializableTo raw (Response dht (Ping dht))
691 , SerializableTo raw (Query dht (Ping dht))
692 ) => NodeInfo dht ip u -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
693queryNode' ni q = do
694 let addr = nodeAddr ni
695 dest = makeAddress (Left $ nodeId ni) (toSockAddr addr)
696 coldQueryNode' addr dest q
697
698coldQueryNode' :: forall raw dht u a b ip.
699 ( Address ip
700 , Default u
701 , Show u
702 , DHT.Kademlia dht
703 , KRPC dht (Query dht a) (Response dht b)
704 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
705 , Ord (TransactionID dht)
706 , Serialize (TransactionID dht)
707 , WireFormat raw dht
708 , SerializableTo raw (Response dht b)
709 , SerializableTo raw (Query dht a)
710 , Ord (NodeId dht)
711 , FiniteBits (NodeId dht)
712 , Show (NodeId dht)
713 , Show (QueryMethod dht)
714 , SerializableTo raw (Response dht (Ping dht))
715 , SerializableTo raw (Query dht (Ping dht))
716 ) => NodeAddr ip -> PacketDestination dht -> a -> DHT raw dht u ip (NodeId dht, b, Maybe ReflectedIP)
717coldQueryNode' addr dest q = do
718 nid <- myNodeIdAccordingTo $ fromMaybe (error "TODO: coldQueryNode' myNodeIdAccordingTo") $ fromAddr dest
719 dta <- asks dhtData
720 qextra <- liftIO $ makeQueryExtra dta nid (Proxy :: Proxy (Query dht q)) (Proxy :: Proxy (Response dht b))
721 let read_only = False -- TODO: check for NAT issues. (BEP 43)
722 -- let KRPC.Method name = KRPC.method :: KRPC.Method dht (Query dht a) (Response dht b)
723 mgr <- asks manager
724 (Response rextra r, remoteId, witnessed_ip) <- liftIO $ query' mgr dest (Query qextra q)
725 -- \$(logDebugS) "queryNode" $ "Witnessed address: " <> T.pack (show witnessed_ip)
726 -- <> " by " <> T.pack (show (toSockAddr addr))
727 _ <- insertNode (NodeInfo remoteId addr def) witnessed_ip
728 return (remoteId, r, witnessed_ip)
729
730-- | Infix version of 'queryNode' function.
731(<@>) :: ( Address ip
732 , KRPC dht (Query dht a) (Response dht b)
733 , KRPC dht (Query dht (Ping dht)) (Response dht (Ping dht))
734 , Default u
735 , Show u
736 , Show (QueryMethod dht)
737 , Ord (NodeId dht)
738 , FiniteBits (NodeId dht)
739 , Show (NodeId dht)
740 , Ord (TransactionID dht)
741 , Serialize (TransactionID dht)
742 , SerializableTo raw (Response dht b)
743 , SerializableTo raw (Query dht a)
744 , SerializableTo raw (Response dht (Ping dht))
745 , SerializableTo raw (Query dht (Ping dht))
746 , WireFormat raw dht
747 , Kademlia dht
748 ) => a -> NodeInfo dht ip u -> DHT raw dht u ip b
749q <@> addr = snd <$> queryNode addr q
750{-# INLINE (<@>) #-}
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
deleted file mode 100644
index b775e7d3..00000000
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ /dev/null
@@ -1,571 +0,0 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013-2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- This module defines internal state of a node instance. You can
9-- have multiple nodes per application but usually you don't have
10-- to. Normally, you don't need to import this module, use
11-- "Network.BitTorrent.DHT" instead.
12--
13{-# LANGUAGE CPP #-}
14{-# LANGUAGE RecordWildCards #-}
15{-# LANGUAGE FlexibleContexts #-}
16{-# LANGUAGE FlexibleInstances #-}
17{-# LANGUAGE GeneralizedNewtypeDeriving #-}
18{-# LANGUAGE MultiParamTypeClasses #-}
19{-# LANGUAGE ScopedTypeVariables #-}
20{-# LANGUAGE TypeFamilies #-}
21{-# LANGUAGE TemplateHaskell #-}
22module Network.BitTorrent.DHT.Session
23 ( -- * Options
24 -- | Use @optFooBar def@ to get default 'Alpha' or 'K'.
25 Alpha
26 , K
27 , Options (..)
28
29 -- * Session
30 , Node
31 , options
32 , tentativeNodeId
33 , myNodeIdAccordingTo
34 , myNodeIdAccordingTo1
35 , routingInfo
36 , routableAddress
37 , getTimestamp
38 -- , SessionTokens
39 -- , sessionTokens
40 -- , contactInfo
41 , dhtData
42 , PeerStore
43 , manager
44
45 -- ** Initialization
46 , LogFun
47 , logt
48 , NodeHandler
49 , newNode
50 , closeNode
51
52 -- * DHT
53 -- | Use @asks options@ to get options passed to 'startNode'
54 -- or @asks thisNodeId@ to get id of locally running node.
55 , DHT
56 , runDHT
57
58 -- ** Tokens
59 -- , grantToken
60 -- , checkToken
61
62 -- ** Routing table
63 , getTable
64 , getClosest
65 , getClosest1
66
67#ifdef VERSION_bencoding
68 -- ** Peer storage
69 , insertPeer
70 , getPeerList
71 , getPeerList1
72 , lookupPeers
73 , insertTopic
74 , deleteTopic
75 , getSwarms
76 , savePeerStore
77 , mergeSavedPeers
78 , allPeers
79#endif
80
81 -- ** Messaging
82 , queryParallel
83 ) where
84
85import Prelude hiding (ioError)
86
87import Control.Concurrent.STM
88#ifdef THREAD_DEBUG
89import Control.Concurrent.Async.Lifted.Instrument
90#else
91import Control.Concurrent.Async.Lifted
92#endif
93import Control.Exception.Lifted hiding (Handler)
94import Control.Monad.Base
95import Control.Monad.Logger
96import Control.Monad.Reader
97import Control.Monad.Trans.Control
98import Control.Monad.Trans.Resource
99import Data.Typeable
100import Data.String
101import Data.Bits
102import Data.ByteString
103import Data.Conduit.Lazy
104import Data.Default
105import Data.Fixed
106import Data.Hashable
107import Data.List as L
108import Data.Maybe
109import Data.Monoid
110import Data.Set as S
111import Data.Time
112import Network (PortNumber)
113import System.Random (randomIO)
114import Data.Time.Clock.POSIX
115import Data.Text as Text
116import Text.PrettyPrint.HughesPJClass hiding ((<>),($$))
117import Data.Serialize as S
118import Network.DHT.Types
119import Network.DatagramServer.Types
120
121
122import Data.Torrent as Torrent
123import Network.DatagramServer as KRPC hiding (Options, def)
124import qualified Network.DatagramServer as KRPC (def)
125#ifdef VERSION_bencoding
126import Data.BEncode (BValue)
127import Network.DatagramServer.Mainline (KMessageOf)
128#else
129import Network.DatagramServer.Tox as Tox
130#endif
131import Network.Address
132import Network.BitTorrent.DHT.ContactInfo (PeerStore)
133import qualified Network.BitTorrent.DHT.ContactInfo as P
134import Network.DHT.Mainline
135import Network.DHT.Routing as R
136import Network.BitTorrent.DHT.Token as T
137import GHC.Stack as GHC
138
139{-----------------------------------------------------------------------
140-- Options
141-----------------------------------------------------------------------}
142
143-- | Node lookups can proceed asynchronously.
144type Alpha = Int
145
146-- NOTE: libtorrent uses 5, azureus uses 10
147-- | The quantity of simultaneous lookups is typically three.
148defaultAlpha :: Alpha
149defaultAlpha = 3
150
151-- TODO add replication loop
152
153-- TODO do not insert infohash -> peeraddr if infohash is too far from
154-- this node id
155{-
156data Order
157 = NearFirst
158 | FarFirst
159 | Random
160
161data Traversal
162 = Greedy -- ^ aggressive short-circuit traversal
163 | Exhaustive -- ^
164-}
165
166-- | Original Kamelia DHT uses term /publish/ for data replication
167-- process. BitTorrent DHT uses term /announce/ since the purpose of
168-- the DHT is peer discovery. Later in documentation, we use terms
169-- /publish/ and /announce/ interchangible.
170data Options = Options
171 { -- | The degree of parallelism in 'find_node' queries. More
172 -- parallism lead to faster bootstrapping and lookup operations,
173 -- but also increase resource usage.
174 --
175 -- Normally this parameter should not exceed 'optK'.
176 optAlpha :: {-# UNPACK #-} !Alpha
177
178 -- | /K/ parameter - number of nodes to return in 'find_node'
179 -- responses.
180 , optK :: {-# UNPACK #-} !K
181
182 -- | Number of buckets to maintain. This parameter depends on
183 -- amount of nodes in the DHT network.
184 , optBucketCount :: {-# UNPACK #-} !BucketCount
185
186 -- | RPC timeout.
187 , optTimeout :: !NominalDiffTime
188
189 -- | /R/ parameter - how many target nodes the 'announce' query
190 -- should affect.
191 --
192 -- A large replica set compensates for inconsistent routing and
193 -- reduces the need to frequently republish data for
194 -- persistence. This comes at an increased cost for
195 -- 'Network.BitTorrent.DHT.insert' in terms of time, nodes
196 -- contacted, and storage.
197 , optReplication :: {-# UNPACK #-} !NodeCount
198
199 -- | How often this node should republish (or reannounce) its
200 -- data.
201 --
202 -- Large replica set ('optReplication') should require
203 -- smaller reannounce intervals ('optReannounce').
204 , optReannounce :: !NominalDiffTime
205
206 -- | The time it takes for data to expire in the
207 -- network. Publisher of the data should republish (or
208 -- reannounce) data to keep it in the network.
209 --
210 -- The /data expired timeout/ should be more than 'optReannounce'
211 -- interval.
212 , optDataExpired :: !NominalDiffTime
213 } deriving (Show, Eq)
214
215-- | Optimal options for bittorrent client. For short-lifetime
216-- utilities you most likely need to tune 'optAlpha' and
217-- 'optBucketCount'.
218instance Default Options where
219 def = Options
220 { optAlpha = defaultAlpha
221 , optK = defaultK
222
223 -- see Fig.2 from "BitTorrent Mainline DHT Measurement" paper.
224 , optBucketCount = defaultBucketCount
225
226 -- see Fig.4 from "Profiling a Million User DHT" paper.
227 , optTimeout = 5 -- seconds
228 , optReplication = 20 -- nodes
229 , optReannounce = 15 * 60
230 , optDataExpired = 60 * 60
231 }
232
233seconds :: NominalDiffTime -> Int
234seconds dt = fromEnum (realToFrac dt :: Uni)
235{-----------------------------------------------------------------------
236-- Session
237-----------------------------------------------------------------------}
238
239-- | A set of torrents this peer intends to share.
240type AnnounceSet = Set (InfoHash, PortNumber)
241
242-- | Logger function.
243type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO ()
244
245-- | DHT session keep track state of /this/ node.
246data Node raw dht u ip = Node
247 { -- | Session configuration;
248 options :: !Options
249
250 -- | Pseudo-unique self-assigned session identifier. This value is
251 -- constant during DHT session and (optionally) between sessions.
252 , tentativeNodeId :: !(NodeId dht)
253
254 , resources :: !InternalState
255 , manager :: !(Manager raw dht) -- ^ RPC manager;
256 , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table;
257 , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node;
258 , dhtData :: DHTData dht ip
259 , loggerFun :: !LogFun
260 }
261
262-- | DHT keep track current session and proper resource allocation for
263-- safe multithreading.
264newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a }
265 deriving ( Functor, Applicative, Monad, MonadIO
266 , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow
267 )
268
269#if MIN_VERSION_monad_control(1,0,0)
270newtype DHTStM raw dht u ip a = StM {
271 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
272 }
273#endif
274
275instance MonadBaseControl IO (DHT raw dht u ip) where
276#if MIN_VERSION_monad_control(1,0,0)
277 type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a
278#else
279 newtype StM (DHT raw dht u ip) a = StM {
280 unSt :: StM (ReaderT (Node raw dht u ip) IO) a
281 }
282#endif
283 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
284 cc $ \ (DHT m) -> StM <$> cc' m
285 {-# INLINE liftBaseWith #-}
286
287 restoreM = DHT . restoreM . unSt
288 {-# INLINE restoreM #-}
289
290-- | Check is it is possible to run 'queryNode' or handle pending
291-- query from remote node.
292instance MonadActive (DHT raw dht u ip) where
293 monadActive = getManager >>= liftIO . isActive
294 {-# INLINE monadActive #-}
295
296-- | All allocated resources will be closed at 'closeNode'.
297instance MonadResource (DHT raw dht u ip) where
298 liftResourceT m = do
299 s <- asks resources
300 liftIO $ runInternalState m s
301
302-- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where
303
304getManager :: DHT raw dht u ip (Manager raw dht)
305getManager = asks manager
306
307instance MonadLogger (DHT raw dht u ip) where
308 monadLoggerLog loc src lvl msg = do
309 logger <- asks loggerFun
310 liftIO $ logger loc src lvl (toLogStr msg)
311
312#ifdef VERSION_bencoding
313type NodeHandler = Handler IO KMessageOf BValue
314#else
315type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString
316#endif
317
318logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO ()
319logt lf c m txt = lf (locFromCS callStack) (fromString m) (lvl c) (fromString $ Text.unpack txt)
320 where
321 lvl 'D' = LevelDebug
322 lvl 'I' = LevelInfo
323 lvl 'W' = LevelWarn
324 lvl 'E' = LevelError
325 lvl ch = LevelOther $ Text.cons ch Text.empty
326
327mkLoggerLoc :: GHC.SrcLoc -> Loc
328mkLoggerLoc loc =
329 Loc { loc_filename = GHC.srcLocFile loc
330 , loc_package = GHC.srcLocPackage loc
331 , loc_module = GHC.srcLocModule loc
332 , loc_start = ( GHC.srcLocStartLine loc
333 , GHC.srcLocStartCol loc)
334 , loc_end = ( GHC.srcLocEndLine loc
335 , GHC.srcLocEndCol loc)
336 }
337
338locFromCS :: GHC.CallStack -> Loc
339locFromCS cs = case getCallStack cs of
340 ((_, loc):_) -> mkLoggerLoc loc
341 _ -> Loc "<unknown>" "<unknown>" "<unknown>" (0,0) (0,0)
342
343
344-- | Run DHT session. You /must/ properly close session using
345-- 'closeNode' function, otherwise socket or other scarce resources may
346-- leak.
347newNode :: forall raw dht ip u.
348 ( Address ip
349 , FiniteBits (NodeId dht)
350 , Serialize (NodeId dht)
351 , Kademlia dht
352 , WireFormat raw dht
353 )
354 => -- [NodeHandler] -- ^ handlers to run on accepted queries;
355 Options -- ^ various dht options;
356 -> NodeAddr ip -- ^ node address to bind;
357 -> LogFun -- ^ invoked on log messages;
358 -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated.
359 -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address.
360newNode opts naddr logger mbid = do
361 s <- createInternalState
362 runInternalState initNode s
363 `onException` closeInternalState s
364 where
365 rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) }
366 nodeAddr = toSockAddr naddr
367 initNode = do
368 s <- getInternalState
369 (myId, infovar, getst) <- liftIO $ do
370 (i, ctx) <- initializeServerState (Proxy :: Proxy (dht raw)) mbid
371 var <- atomically (newTVar Nothing)
372 let getst dest = do
373 info <- atomically . readTVar $ var
374 return ( maybe i myNodeId info, ctx)
375 return (i, var, getst)
376 (_, m) <- allocate (newManager rpcOpts (logt logger) nodeAddr getst []) closeManager
377 liftIO $ do
378 dta <- initializeDHTData
379 node <- Node opts myId s m infovar
380 <$> newTVarIO S.empty
381 <*> pure dta
382 <*> pure logger
383 return node
384
385-- | Some resources like listener thread may live for
386-- some short period of time right after this DHT session closed.
387closeNode :: Node raw dht u ip -> IO ()
388closeNode Node {..} = closeInternalState resources
389
390-- | Run DHT operation on the given session.
391runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a
392runDHT node action = runReaderT (unDHT action) node
393{-# INLINE runDHT #-}
394
395{-----------------------------------------------------------------------
396-- Routing
397-----------------------------------------------------------------------}
398
399-- /pick a random ID/ in the range of the bucket and perform a
400-- find_nodes search on it.
401
402
403{-----------------------------------------------------------------------
404-- Routing table
405-----------------------------------------------------------------------}
406
407-- | This nodes externally routable address reported by remote peers.
408routableAddress :: DHT raw dht u ip (Maybe SockAddr)
409routableAddress = do
410 info <- asks routingInfo >>= liftIO . atomically . readTVar
411 return $ myAddress <$> info
412
413-- | The current NodeId that the given remote node should know us by.
414myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht)
415myNodeIdAccordingTo _ = do
416 info <- asks routingInfo >>= liftIO . atomically . readTVar
417 maybe (asks tentativeNodeId)
418 (return . myNodeId)
419 info
420
421myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) )
422myNodeIdAccordingTo1 = do
423 var <- asks routingInfo
424 tid <- asks tentativeNodeId
425 return $ \ _ -> do
426 info <- atomically $ readTVar var
427 return $ maybe tid myNodeId info
428
429-- | Get current routing table. Normally you don't need to use this
430-- function, but it can be usefull for debugging and profiling purposes.
431getTable :: Eq ip => DHT raw dht u ip (Table dht ip u)
432getTable = do
433 Node { tentativeNodeId = myId
434 , routingInfo = var
435 , options = opts } <- ask
436 let nil = nullTable myId (optBucketCount opts)
437 liftIO (maybe nil R.myBuckets <$> atomically (readTVar var))
438
439getSwarms :: Ord ip => DHT raw KMessageOf u ip [ (InfoHash, Int, Maybe ByteString) ]
440getSwarms = do
441 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
442 return $ P.knownSwarms store
443
444savePeerStore :: (Ord ip, Address ip) => DHT raw KMessageOf u ip ByteString
445savePeerStore = do
446 var <- asks (contactInfo . dhtData)
447 peers <- liftIO $ atomically $ readTVar var
448 return $ S.encode peers
449
450mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw KMessageOf u ip ()
451mergeSavedPeers bs = do
452 var <- asks (contactInfo . dhtData)
453 case S.decode bs of
454 Right newbies -> liftIO $ atomically $ modifyTVar' var (<> newbies)
455 Left _ -> return ()
456
457
458allPeers :: Ord ip => InfoHash -> DHT raw KMessageOf u ip [ PeerAddr ip ]
459allPeers ih = do
460 store <- asks (contactInfo . dhtData) >>= liftIO . atomically . readTVar
461 return $ P.lookup ih store
462
463-- | Find a set of closest nodes from routing table of this node. (in
464-- no particular order)
465--
466-- This operation used for 'find_nodes' query.
467--
468getClosest :: ( Eq ip
469 , Ord (NodeId dht)
470 , FiniteBits (NodeId dht)
471 , TableKey dht k ) =>
472 k -> DHT raw dht u ip [NodeInfo dht ip u]
473getClosest node = do
474 k <- asks (optK . options)
475 kclosest k node <$> getTable
476
477getClosest1 :: ( Eq ip
478 , Ord (NodeId dht)
479 , FiniteBits (NodeId dht)
480 , TableKey dht k
481 ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u])
482getClosest1 = do
483 k <- asks (optK . options)
484 nobkts <- asks (optBucketCount . options)
485 myid <- asks tentativeNodeId
486 var <- asks routingInfo
487 return $ \node -> do nfo <- atomically $ readTVar var
488 let tbl = maybe (nullTable myid nobkts) R.myBuckets nfo
489 return $ kclosest k node tbl
490
491{-----------------------------------------------------------------------
492-- Peer storage
493-----------------------------------------------------------------------}
494
495refreshContacts :: Ord ip => TVar (PeerStore ip) -> IO ()
496refreshContacts var =
497 -- TODO limit dht peer store in size (probably by removing oldest peers)
498 return ()
499
500
501-- | Insert peer to peer store. Used to handle announce requests.
502insertPeer :: Ord ip => TVar (PeerStore ip) -> InfoHash -> Maybe ByteString -> PeerAddr ip -> IO ()
503insertPeer var ih name addr = do
504 refreshContacts var
505 atomically $ modifyTVar' var (P.insertPeer ih name addr)
506
507-- | Get peer set for specific swarm.
508lookupPeers :: Ord ip => TVar (PeerStore ip) -> InfoHash -> IO [PeerAddr ip]
509lookupPeers var ih = do
510 refreshContacts var
511 tm <- getTimestamp
512 atomically $ do
513 (ps,store') <- P.freshPeers ih tm <$> readTVar var
514 writeTVar var store'
515 return ps
516
517getTimestamp :: IO Timestamp
518getTimestamp = do
519 utcTime <- getCurrentTime
520 -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime)))
521 return $ utcTimeToPOSIXSeconds utcTime
522
523#ifdef VERSION_bencoding
524-- | Prepare result for 'get_peers' query.
525--
526-- This operation use 'getClosest' as failback so it may block.
527--
528getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip)
529getPeerList ih = do
530 var <- asks (contactInfo . dhtData)
531 ps <- liftIO $ lookupPeers var ih
532 if L.null ps
533 then Left <$> getClosest ih
534 else return (Right ps)
535
536getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip))
537getPeerList1 = do
538 var <- asks (contactInfo . dhtData)
539 getclosest <- getClosest1
540 return $ \ih -> do
541 ps <- lookupPeers var ih
542 if L.null ps
543 then Left <$> getclosest ih
544 else return (Right ps)
545
546
547insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
548insertTopic ih p = do
549 var <- asks announceInfo
550 liftIO $ atomically $ modifyTVar' var (S.insert (ih, p))
551
552deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip ()
553deleteTopic ih p = do
554 var <- asks announceInfo
555 liftIO $ atomically $ modifyTVar' var (S.delete (ih, p))
556
557#endif
558
559{-----------------------------------------------------------------------
560-- Messaging
561-----------------------------------------------------------------------}
562
563-- | Failed queries are ignored.
564queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a]
565queryParallel queries = do
566 -- TODO: use alpha
567 -- alpha <- asks (optAlpha . options)
568 cleanup <$> mapConcurrently try queries
569 where
570 cleanup :: [Either QueryFailure a] -> [a]
571 cleanup = mapMaybe (either (const Nothing) Just)