summaryrefslogtreecommitdiff
path: root/kad/src
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /kad/src
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'kad/src')
-rw-r--r--kad/src/DebugTag.hs24
-rw-r--r--kad/src/Network/Kademlia.hs163
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs439
-rw-r--r--kad/src/Network/Kademlia/CommonAPI.hs84
-rw-r--r--kad/src/Network/Kademlia/Persistence.hs52
-rw-r--r--kad/src/Network/Kademlia/Routing.hs809
-rw-r--r--kad/src/Network/Kademlia/Search.hs236
7 files changed, 1807 insertions, 0 deletions
diff --git a/kad/src/DebugTag.hs b/kad/src/DebugTag.hs
new file mode 100644
index 00000000..9ac04bb0
--- /dev/null
+++ b/kad/src/DebugTag.hs
@@ -0,0 +1,24 @@
1module DebugTag where
2
3import Data.Typeable
4
5-- | Debug Tags, add more as needed, but ensure XAnnounce is always first, XMisc last
6data DebugTag
7 = XAnnounce
8 | XBitTorrent
9 | XDHT
10 | XLan
11 | XMan
12 | XNetCrypto
13 | XNetCryptoOut
14 | XOnion
15 | XRoutes
16 | XPing
17 | XRefresh
18 | XJabber
19 | XTCP
20 | XMisc
21 | XNodeinfoSearch
22 | XUnexpected -- Used only for special anomalous errors that we didn't expect to happen.
23 | XUnused -- Never commit code that uses XUnused.
24 deriving (Eq, Ord, Show, Read, Enum, Bounded,Typeable)
diff --git a/kad/src/Network/Kademlia.hs b/kad/src/Network/Kademlia.hs
new file mode 100644
index 00000000..e61afe9b
--- /dev/null
+++ b/kad/src/Network/Kademlia.hs
@@ -0,0 +1,163 @@
1{-# LANGUAGE CPP, ScopedTypeVariables, PartialTypeSignatures, FlexibleContexts #-}
2{-# LANGUAGE KindSignatures #-}
3{-# LANGUAGE DeriveFunctor, DeriveTraversable #-}
4-- {-# LANGUAGE TypeFamilies #-}
5{-# LANGUAGE GADTs #-}
6{-# LANGUAGE PatternSynonyms #-}
7module Network.Kademlia where
8
9import Data.Maybe
10import Data.Time.Clock.POSIX
11import Network.Kademlia.Routing as R
12#ifdef THREAD_DEBUG
13import Control.Concurrent.Lifted.Instrument
14#else
15import Control.Concurrent.Lifted
16import GHC.Conc (labelThread)
17#endif
18import Control.Concurrent.STM
19import Control.Monad
20import Data.Time.Clock.POSIX (POSIXTime)
21
22-- | The status of a given node with respect to a given routint table.
23data RoutingStatus
24 = Stranger -- ^ The node is unknown to the Kademlia routing table.
25 | Applicant -- ^ The node may be inserted pending a ping timeout.
26 | Accepted -- ^ The node has a slot in one of the Kademlia buckets.
27 deriving (Eq,Ord,Enum,Show,Read)
28
29-- | A change occured in the kademlia routing table.
30data RoutingTransition ni = RoutingTransition
31 { transitioningNode :: ni
32 , transitionedTo :: !RoutingStatus
33 }
34 deriving (Eq,Ord,Show,Read)
35
36data InsertionReporter ni = InsertionReporter
37 { -- | Called on every inbound packet. Accepts:
38 --
39 -- * Origin of packet.
40 --
41 -- * List of nodes to be pinged as a result.
42 reportArrival :: POSIXTime
43 -> ni
44 -> [ni]
45 -> IO ()
46 -- | Called on every ping probe. Accepts:
47 --
48 -- * Who was pinged.
49 --
50 -- * True Bool value if they ponged.
51 , reportPingResult :: POSIXTime
52 -> ni
53 -> Bool
54 -> IO ()
55 }
56
57quietInsertions :: InsertionReporter ni
58quietInsertions = InsertionReporter
59 { reportArrival = \_ _ _ -> return ()
60 , reportPingResult = \_ _ _ -> return ()
61 }
62
63contramapIR :: (t -> ni) -> InsertionReporter ni -> InsertionReporter t
64contramapIR f ir = InsertionReporter
65 { reportArrival = \tm ni nis -> reportArrival ir tm (f ni) (map f nis)
66 , reportPingResult = \tm ni b -> reportPingResult ir tm (f ni) b
67 }
68
69-- | All the IO operations necessary to maintain a Kademlia routing table.
70data TableStateIO ni = TableStateIO
71 { -- | Write the routing table. Typically 'writeTVar'.
72 tblWrite :: R.BucketList ni -> STM ()
73
74 -- | Read the routing table. Typically 'readTVar'.
75 , tblRead :: STM (R.BucketList ni)
76
77 -- | Issue a ping to a remote node and report 'True' if the node
78 -- responded within an acceptable time and 'False' otherwise.
79 , tblPing :: ni -> IO Bool
80
81 -- | Convenience method provided to assist in maintaining state
82 -- consistent with the routing table. It will be invoked in the same
83 -- transaction that 'tblRead'\/'tblWrite' occured but only when there was
84 -- an interesting change. The returned IO action will be triggered soon
85 -- afterward.
86 --
87 -- It is not necessary to do anything interesting here. The following
88 -- trivial implementation is fine:
89 --
90 -- > tblTransition = const $ return $ return ()
91 , tblTransition :: RoutingTransition ni -> STM (IO ())
92 }
93
94vanillaIO :: TVar (BucketList ni) -> (ni -> IO Bool) -> TableStateIO ni
95vanillaIO var ping = TableStateIO
96 { tblRead = readTVar var
97 , tblWrite = writeTVar var
98 , tblPing = ping
99 , tblTransition = const $ return $ return ()
100 }
101
102-- | Everything necessary to maintain a routing table of /ni/ (node
103-- information) entries.
104data Kademlia nid ni = Kademlia { kademInsertionReporter :: InsertionReporter ni
105 , kademSpace :: KademliaSpace nid ni
106 , kademIO :: TableStateIO ni
107 }
108
109
110-- Helper to 'insertNode'.
111--
112-- Adapt return value from 'updateForPingResult' into a
113-- more easily grokked list of transitions.
114transition :: (ni,Maybe (t,ni)) -> [RoutingTransition ni]
115transition (x,m) =
116 -- Just _ <- m = Node transition: Accepted --> Stranger
117 -- Nothing <- m = Node transition: Applicant --> Stranger
118 RoutingTransition x Stranger
119 : maybeToList (accepted <$> m)
120
121-- Helper to 'transition'
122--
123-- Node transition: Applicant --> Accepted
124accepted :: (t,ni) -> RoutingTransition ni
125accepted (_,y) = RoutingTransition y Accepted
126
127
128insertNode :: Kademlia nid ni -> ni -> IO ()
129insertNode (Kademlia reporter space io) node = do
130
131 tm <- getPOSIXTime
132
133 (ps,reaction) <- atomically $ do
134 tbl <- tblRead io
135 let (inserted, ps,t') = R.updateForInbound space tm node tbl
136 tblWrite io t'
137 reaction <- case ps of
138 _ | inserted -> -- Node transition: Stranger --> Accepted
139 tblTransition io $ RoutingTransition node Accepted
140 (_:_) -> -- Node transition: Stranger --> Applicant
141 tblTransition io $ RoutingTransition node Applicant
142 _ -> return $ return ()
143 return (ps, reaction)
144
145 reportArrival reporter tm node ps
146 reaction
147
148 _ <- fork $ do
149 myThreadId >>= flip labelThread "pingResults"
150 forM_ ps $ \n -> do
151 b <- tblPing io n
152 reportPingResult reporter tm n b -- XXX: tm is timestamp of original triggering packet, not result
153 join $ atomically $ do
154 tbl <- tblRead io
155 let (replacements, t') = R.updateForPingResult space n b tbl
156 tblWrite io t'
157 ios <- sequence $ concatMap
158 (map (tblTransition io) . transition)
159 replacements
160 return $ sequence_ ios
161
162 return ()
163
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
new file mode 100644
index 00000000..08ba3318
--- /dev/null
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -0,0 +1,439 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE GADTs #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE RecordWildCards #-}
11{-# LANGUAGE NondecreasingIndentation #-}
12{-# LANGUAGE PartialTypeSignatures #-}
13{-# LANGUAGE PatternSynonyms #-}
14{-# LANGUAGE RankNTypes #-}
15{-# LANGUAGE ScopedTypeVariables #-}
16module Network.Kademlia.Bootstrap where
17
18import Data.Function
19import Data.Maybe
20import qualified Data.Set as Set
21import Data.Time.Clock.POSIX (getPOSIXTime)
22import Network.Kademlia.Routing as R
23#ifdef THREAD_DEBUG
24import Control.Concurrent.Lifted.Instrument
25#else
26import Control.Concurrent.Lifted
27import GHC.Conc (labelThread)
28#endif
29import Control.Concurrent.STM
30import Control.Monad
31import Data.Hashable
32import Data.Time.Clock.POSIX (POSIXTime)
33import Data.Ord
34import System.Entropy
35import System.Timeout
36import DPut
37import DebugTag
38
39import qualified Data.Wrapper.PSQInt as Int
40 ;import Data.Wrapper.PSQInt (pattern (:->))
41import Network.Address (bucketRange)
42import Network.Kademlia.Search
43import Control.Concurrent.Tasks
44import Network.Kademlia
45
46type SensibleNodeId nid ni =
47 ( Show nid
48 , Ord nid
49 , Ord ni
50 , Hashable nid
51 , Hashable ni )
52
53data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
54 { -- | A staleness threshold (if a bucket goes this long without being
55 -- touched, a refresh will be triggered).
56 refreshInterval :: POSIXTime
57 -- | A TVar with the time-to-refresh schedule for each bucket.
58 --
59 -- To "touch" a bucket and prevent it from being refreshed, reschedule
60 -- its refresh time to some time into the future by modifying its
61 -- priority in this priority search queue.
62 , refreshQueue :: TVar (Int.PSQ POSIXTime)
63 -- | This is the kademlia node search specification.
64 , refreshSearch :: Search nid addr tok ni ni
65 -- | The current kademlia routing table buckets.
66 , refreshBuckets :: TVar (R.BucketList ni)
67 -- | Action to ping a node. This is used only during initial bootstrap
68 -- to get some nodes in our table. A 'True' result is interpreted as a a
69 -- pong, where 'False' is a non-response.
70 , refreshPing :: ni -> IO Bool
71 , -- | Timestamp of last bucket event.
72 refreshLastTouch :: TVar POSIXTime
73 , -- | This variable indicates whether or not we are in bootstrapping mode.
74 bootstrapMode :: TVar Bool
75 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
76 -- every finished refresh.
77 bootstrapCountdown :: TVar (Maybe Int)
78 }
79
80newBucketRefresher :: ( Ord addr, Hashable addr
81 , SensibleNodeId nid ni )
82 => TVar (R.BucketList ni)
83 -> Search nid addr tok ni ni
84 -> (ni -> IO Bool)
85 -> STM (BucketRefresher nid ni)
86newBucketRefresher bkts sch ping = do
87 let spc = searchSpace sch
88 nodeId = kademliaLocation spc
89 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
90 sched <- newTVar Int.empty
91 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
92 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
93 bootstrapCnt <- newTVar Nothing
94 return BucketRefresher
95 { refreshInterval = 15 * 60
96 , refreshQueue = sched
97 , refreshSearch = sch
98 , refreshBuckets = bkts
99 , refreshPing = ping
100 , refreshLastTouch = lasttouch
101 , bootstrapMode = bootstrapVar
102 , bootstrapCountdown = bootstrapCnt
103 }
104
105-- | This was added to avoid the compile error "Record update for
106-- insufficiently polymorphic field" when trying to update the existentially
107-- quantified field 'refreshSearch'.
108updateRefresherIO :: Ord addr
109 => Search nid addr tok ni ni
110 -> (ni -> IO Bool)
111 -> BucketRefresher nid ni -> BucketRefresher nid ni
112updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
113 { refreshSearch = sch
114 , refreshPing = ping
115 , refreshInterval = refreshInterval
116 , refreshBuckets = refreshBuckets
117 , refreshQueue = refreshQueue
118 , refreshLastTouch = refreshLastTouch
119 , bootstrapMode = bootstrapMode
120 , bootstrapCountdown = bootstrapCountdown
121 }
122
123-- | Fork a refresh loop. Kill the returned thread to terminate it.
124forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
125forkPollForRefresh r@BucketRefresher{ refreshInterval
126 , refreshQueue
127 , refreshBuckets
128 , refreshSearch } = fork $ do
129 myThreadId >>= flip labelThread "pollForRefresh"
130 fix $ \again -> do
131 join $ atomically $ do
132 nextup <- Int.findMin <$> readTVar refreshQueue
133 maybe retry (return . go again) nextup
134 where
135 refresh :: Int -> IO Int
136 refresh n = do
137 -- dput XRefresh $ "Refresh time! "++ show n
138 refreshBucket r n
139
140 go again ( bktnum :-> refresh_time ) = do
141 now <- getPOSIXTime
142 case fromEnum (refresh_time - now) of
143 x | x <= 0 -> do -- Refresh time!
144 -- Move it to the back of the refresh queue.
145 atomically $ do
146 interval <- effectiveRefreshInterval r bktnum
147 modifyTVar' refreshQueue
148 $ Int.insert bktnum (now + interval)
149 -- Now fork the refresh operation.
150 -- TODO: We should probably propogate the kill signal to this thread.
151 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
152 _ <- refresh bktnum
153 return ()
154 return ()
155 picoseconds -> do
156 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
157 threadDelay ( picoseconds `div` 10^6 )
158 again
159
160
161-- | This is a helper to 'refreshBucket' which does some book keeping to decide
162-- whether or not a bucket is sufficiently refreshed or not. It will return
163-- false when we can terminate a node search.
164checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
165 -> TVar (BucketList ni) -- ^ The current routing table.
166 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
167 -> TVar Bool -- ^ The result will also be written here.
168 -> Int -- ^ The bucket number of interest.
169 -> ni -- ^ A newly found node.
170 -> STM Bool
171checkBucketFull space var resultCounter fin n found_node = do
172 let fullcount = R.defaultBucketSize
173 saveit True = writeTVar fin True >> return True
174 saveit _ = return False
175 tbl <- readTVar var
176 let counts = R.shape tbl
177 nid = kademliaLocation space found_node
178 -- Update the result set with every found node that is in the
179 -- bucket of interest.
180 when (n == R.bucketNumber space nid tbl)
181 $ modifyTVar' resultCounter (Set.insert found_node)
182 resultCount <- readTVar resultCounter
183 saveit $ case drop (n - 1) counts of
184 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
185 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
186 _ -> False -- okay, good enough, let's quit.
187
188-- | Called from 'refreshBucket' with the current time when a refresh of the
189-- supplied bucket number finishes.
190onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
191onFinishedRefresh BucketRefresher { bootstrapCountdown
192 , bootstrapMode
193 , refreshQueue
194 , refreshBuckets } num now = do
195 bootstrapping <- readTVar bootstrapMode
196 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
197 else do
198 tbl <- readTVar refreshBuckets
199 action <-
200 if num /= R.bktCount tbl - 1
201 then do modifyTVar' bootstrapCountdown (fmap pred)
202 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
203 else do
204 -- The last bucket finished.
205 cnt <- readTVar bootstrapCountdown
206 case cnt of
207 Nothing -> do
208 let fullsize = R.defaultBucketSize
209 notfull (n,len) | n==num = False
210 | len>=fullsize = False
211 | otherwise = True
212 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
213 [] -> [(0,0)] -- Schedule at least 1 more refresh.
214 xs -> xs
215 forM_ unfull $ \(n,_) -> do
216 -- Schedule immediate refresh for unfull buckets (other than this one).
217 modifyTVar' refreshQueue $ Int.insert n (now - 1)
218 writeTVar bootstrapCountdown $! Just $! length unfull
219 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
220 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
221 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
222 cnt <- readTVar bootstrapCountdown
223 if (cnt == Just 0)
224 then do
225 -- Boostrap finished!
226 writeTVar bootstrapMode False
227 writeTVar bootstrapCountdown Nothing
228 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
229 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
230
231refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
232 BucketRefresher nid ni -> Int -> IO Int
233refreshBucket r@BucketRefresher{ refreshSearch = sch
234 , refreshBuckets = var }
235 n = do
236 tbl <- atomically (readTVar var)
237 let count = bktCount tbl
238 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
239 sample <- if n+1 >= count -- Is this the last bucket?
240 then return nid -- Yes? Search our own id.
241 else kademliaSample (searchSpace sch) -- No? Generate a random id.
242 getEntropy
243 nid
244 (bucketRange n (n + 1 < count))
245 fin <- atomically $ newTVar False
246 resultCounter <- atomically $ newTVar Set.empty
247
248 dput XRefresh $ "Start refresh " ++ show (n,sample)
249
250 -- Set 15 minute timeout in order to avoid overlapping refreshes.
251 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
252 then const $ return True -- Never short-circuit the last bucket.
253 else checkBucketFull (searchSpace sch) var resultCounter fin n
254 _ <- timeout (15*60*1000000) $ do
255 atomically $ searchIsFinished s >>= check
256 atomically $ searchCancel s
257 dput XDHT $ "Finish refresh " ++ show (n,sample)
258 now <- getPOSIXTime
259 join $ atomically $ onFinishedRefresh r n now
260 rcount <- atomically $ do
261 c <- Set.size <$> readTVar resultCounter
262 b <- readTVar fin
263 return $ if b then 1 else c
264 return rcount
265
266refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
267refreshLastBucket r@BucketRefresher { refreshBuckets
268 , refreshQueue } = do
269
270 now <- getPOSIXTime
271 atomically $ do
272 cnt <- bktCount <$> readTVar refreshBuckets
273 -- Schedule immediate refresh.
274 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
275
276restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
277 BucketRefresher nid ni -> STM (IO ())
278restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
279 unchanged <- readTVar bootstrapMode
280 writeTVar bootstrapMode True
281 writeTVar bootstrapCountdown Nothing
282 if not unchanged then return $ do
283 dput XRefresh "BOOTSTRAP entered bootstrap mode"
284 refreshLastBucket r
285 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
286
287bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
288 BucketRefresher nid ni
289 -> t1 ni -- ^ Nodes to bootstrap from.
290 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
291 -> IO ()
292bootstrap r@BucketRefresher { refreshSearch = sch
293 , refreshBuckets = var
294 , refreshPing = ping
295 , bootstrapMode } ns ns0 = do
296 gotPing <- atomically $ newTVar False
297
298 -- First, ping the given nodes so that they are added to
299 -- our routing table.
300 withTaskGroup "bootstrap.resume" 20 $ \g -> do
301 forM_ ns $ \n -> do
302 let lbl = show $ kademliaLocation (searchSpace sch) n
303 forkTask g lbl $ do
304 b <- ping n
305 when b $ atomically $ writeTVar gotPing True
306
307 -- We resort to the hardcoded fallback nodes only when we got no
308 -- responses. This is to lesson the burden on well-known boostrap
309 -- nodes.
310 fallback <- atomically (readTVar gotPing) >>= return . when . not
311 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
312 forM_ ns0 $ \n -> do
313 forkTask g (show $ kademliaLocation (searchSpace sch) n)
314 (void $ ping n)
315 dput XDHT "Finished bootstrap pings."
316 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
317 join $ atomically $ do
318 writeTVar bootstrapMode False
319 restartBootstrap r
320 --
321 -- Hopefully 'forkPollForRefresh' was invoked and can take over
322 -- maintenance.
323
324
325effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
326effectiveRefreshInterval BucketRefresher{ refreshInterval
327 , refreshBuckets
328 , bootstrapMode } num = do
329 tbl <- readTVar refreshBuckets
330 bootstrapping <- readTVar bootstrapMode
331 case bootstrapping of
332 False -> return refreshInterval
333 True -> do
334 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
335 let fullcount = R.defaultBucketSize
336 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
337 if count == fullcount
338 then return refreshInterval
339 else return 15 -- seconds
340
341
342
343-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
344-- changes. This will typically be invoked from 'tblTransition'.
345--
346-- From BEP 05:
347--
348-- > Each bucket should maintain a "last changed" property to indicate how
349-- > "fresh" the contents are.
350--
351-- We will use a "time to next refresh" property instead and store it in
352-- a priority search queue.
353--
354-- In detail using an expository (not actually implemented) type
355-- 'BucketTouchEvent'...
356--
357-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
358-- >>> bucketEvents =
359-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
360-- >>>
361-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
362-- >>>
363-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
364-- >>> , Applicant :--> Accepted -- with another node,
365-- >>> ]
366--
367-- the bucket's last changed property should be updated. Buckets that have not
368-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
369-- This is done by picking a random ID in the range of the bucket and
370-- performing a find_nodes search on it.
371--
372-- The only other possible BucketTouchEvents are as follows:
373--
374-- >>> not_handled =
375-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
376-- >>> -- (Applicant :--> Stranger)
377-- >>> -- (Applicant :--> Accepted)
378-- >>> , Accepted :--> Applicant -- Never happens
379-- >>> ]
380--
381-- Because this BucketTouchEvent type is not actually implemented and we only
382-- receive notifications of a node's new state, it suffices to reschedule the
383-- bucket refresh 'touchBucket' on every transition to a state other than
384-- 'Applicant'.
385--
386-- XXX: Unfortunately, this means redundantly triggering twice upon every node
387-- replacement because we do not currently distinguish between standalone
388-- insertion/deletion events and an insertion/deletion pair constituting
389-- replacement.
390--
391-- It might also be better to pass the timestamp of the transition here and
392-- keep the refresh queue in better sync with the routing table by updating it
393-- within the STM monad.
394--
395-- We embed the result in the STM monad but currently, no STM state changes
396-- occur until the returned IO action is invoked. TODO: simplify?
397touchBucket :: SensibleNodeId nid ni
398 => BucketRefresher nid ni
399 -> RoutingTransition ni -- ^ What happened to the bucket?
400 -> STM (IO ())
401touchBucket r@BucketRefresher{ refreshSearch
402 , refreshInterval
403 , refreshBuckets
404 , refreshQueue
405 , refreshLastTouch
406 , bootstrapMode
407 , bootstrapCountdown }
408 RoutingTransition{ transitionedTo
409 , transitioningNode }
410 = case transitionedTo of
411 Applicant -> return $ return () -- Ignore transition to applicant.
412 _ -> return $ do -- Reschedule for any other transition.
413 now <- getPOSIXTime
414 join $ atomically $ do
415 let space = searchSpace refreshSearch
416 nid = kademliaLocation space transitioningNode
417 tbl <- readTVar refreshBuckets
418 let num = R.bucketNumber space nid tbl
419 stamp <- readTVar refreshLastTouch
420 action <- case stamp /= 0 && (now - stamp > 60) of
421 True -> do
422 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
423 restartBootstrap r
424 False -> return $ return ()
425 interval <- effectiveRefreshInterval r num
426 modifyTVar' refreshQueue $ Int.insert num (now + interval)
427 writeTVar refreshLastTouch now
428 return action
429
430refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
431refreshKademlia r@BucketRefresher { refreshSearch = sch
432 , refreshPing = ping
433 , refreshBuckets = bkts
434 }
435 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
436 { tblTransition = \tr -> do
437 io <- touchBucket r tr
438 return io
439 }
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs
new file mode 100644
index 00000000..601be5d8
--- /dev/null
+++ b/kad/src/Network/Kademlia/CommonAPI.hs
@@ -0,0 +1,84 @@
1{-# LANGUAGE ExistentialQuantification #-}
2module Network.Kademlia.CommonAPI where
3
4
5import Control.Concurrent
6import Control.Concurrent.STM
7import Data.Aeson as J (FromJSON, ToJSON)
8import Data.Hashable
9import qualified Data.Map as Map
10import Data.Serialize as S
11import qualified Data.Set as Set
12import Data.Time.Clock.POSIX
13import Data.Typeable
14
15import Network.Kademlia.Search
16import Network.Kademlia.Routing as R
17import Crypto.Tox (SecretKey,PublicKey)
18
19data DHT = forall nid ni. ( Show ni
20 , Read ni
21 , ToJSON ni
22 , FromJSON ni
23 , Ord ni
24 , Hashable ni
25 , Show nid
26 , Ord nid
27 , Hashable nid
28 , Typeable ni
29 , S.Serialize nid
30 ) =>
31 DHT
32 { dhtBuckets :: TVar (BucketList ni)
33 , dhtSecretKey :: STM (Maybe SecretKey)
34 , dhtPing :: Map.Map String (DHTPing ni)
35 , dhtQuery :: Map.Map String (DHTQuery nid ni)
36 , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid)
37 , dhtParseId :: String -> Either String nid
38 , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni))
39 , dhtFallbackNodes :: IO [ni]
40 , dhtBootstrap :: [ni] -> [ni] -> IO ()
41 }
42
43data DHTQuery nid ni = forall addr r tok.
44 ( Ord addr
45 , Typeable r
46 , Typeable tok
47 , Typeable ni
48 ) => DHTQuery
49 { qsearch :: Search nid addr tok ni r
50 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination.
51 , qshowR :: r -> String
52 , qshowTok :: tok -> Maybe String
53 }
54
55data DHTAnnouncable nid = forall dta tok ni r.
56 ( Show r
57 , Typeable dta -- information being announced
58 , Typeable tok -- token
59 , Typeable r -- search result
60 , Typeable ni -- node
61 ) => DHTAnnouncable
62 { announceParseData :: String -> Either String dta
63 , announceParseToken :: dta -> String -> Either String tok
64 , announceParseAddress :: String -> Either String ni
65 , announceSendData :: Either ( String {- search name -}
66 , String -> Either String r
67 , PublicKey {- me -} -> dta -> r -> IO ())
68 (dta -> tok -> Maybe ni -> IO (Maybe r))
69 , announceInterval :: POSIXTime
70 , announceTarget :: dta -> nid
71 }
72
73data DHTSearch nid ni = forall addr tok r. DHTSearch
74 { searchThread :: ThreadId
75 , searchState :: SearchState nid addr tok ni r
76 , searchShowTok :: tok -> Maybe String
77 , searchResults :: TVar (Set.Set String)
78 }
79
80data DHTPing ni = forall r. DHTPing
81 { pingQuery :: [String] -> ni -> IO (Maybe r)
82 , pingShowResult :: r -> String
83 }
84
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs
new file mode 100644
index 00000000..32ec169d
--- /dev/null
+++ b/kad/src/Network/Kademlia/Persistence.hs
@@ -0,0 +1,52 @@
1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE OverloadedStrings #-}
3module Network.Kademlia.Persistence where
4
5import Network.Kademlia.CommonAPI
6import Network.Kademlia.Routing as R
7
8import Control.Concurrent.STM
9import qualified Data.Aeson as J
10 ;import Data.Aeson as J (FromJSON)
11import qualified Data.ByteString.Lazy as L
12import qualified Data.HashMap.Strict as HashMap
13import Data.List
14import qualified Data.Vector as V
15import System.IO.Error
16
17saveNodes :: String -> DHT -> IO ()
18saveNodes netname DHT{dhtBuckets} = do
19 bkts <- atomically $ readTVar dhtBuckets
20 let ns = map fst $ concat $ R.toList bkts
21 bs = J.encode ns
22 fname = nodesFileName netname
23 L.writeFile fname bs
24
25loadNodes :: FromJSON ni => String -> IO [ni]
26loadNodes netname = do
27 let fname = nodesFileName netname
28 attempt <- tryIOError $ do
29 J.decode <$> L.readFile fname
30 >>= maybe (ioError $ userError "Nothing") return
31 either (const $ fallbackLoad fname) return attempt
32
33nodesFileName :: String -> String
34nodesFileName netname = netname ++ "-nodes.json"
35
36fallbackLoad :: FromJSON t => FilePath -> IO [t]
37fallbackLoad fname = do
38 attempt <- tryIOError $ do
39 J.decode <$> L.readFile fname
40 >>= maybe (ioError $ userError "Nothing") return
41 let go r = do
42 let m = HashMap.lookup "nodes" (r :: J.Object)
43 ns0 = case m of Just (J.Array v) -> V.toList v
44 Nothing -> []
45 ns1 = zip (map J.fromJSON ns0) ns0
46 issuc (J.Error _,_) = False
47 issuc _ = True
48 (ss,fs) = partition issuc ns1
49 ns = map (\(J.Success n,_) -> n) ss
50 mapM_ print (map snd fs) >> return ns
51 either (const $ return []) go attempt
52
diff --git a/kad/src/Network/Kademlia/Routing.hs b/kad/src/Network/Kademlia/Routing.hs
new file mode 100644
index 00000000..c7fdf028
--- /dev/null
+++ b/kad/src/Network/Kademlia/Routing.hs
@@ -0,0 +1,809 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013
3-- (c) Joe Crayne 2017
4-- License : BSD3
5-- Maintainer : pxqr.sta@gmail.com
6-- Stability : experimental
7-- Portability : portable
8--
9-- Every node maintains a routing table of known good nodes. The
10-- nodes in the routing table are used as starting points for
11-- queries in the DHT. Nodes from the routing table are returned in
12-- response to queries from other nodes.
13--
14-- For more info see:
15-- <http://www.bittorrent.org/beps/bep_0005.html#routing-table>
16--
17{-# LANGUAGE CPP #-}
18{-# LANGUAGE RecordWildCards #-}
19{-# LANGUAGE BangPatterns #-}
20{-# LANGUAGE RankNTypes #-}
21{-# LANGUAGE ViewPatterns #-}
22{-# LANGUAGE TypeOperators #-}
23{-# LANGUAGE DeriveGeneric #-}
24{-# LANGUAGE DeriveFunctor #-}
25{-# LANGUAGE GADTs #-}
26{-# LANGUAGE ScopedTypeVariables #-}
27{-# LANGUAGE TupleSections #-}
28{-# LANGUAGE OverloadedStrings #-}
29{-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-}
30{-# OPTIONS_GHC -fno-warn-orphans #-}
31module Network.Kademlia.Routing
32 {-
33 ( -- * BucketList
34 BucketList
35 , Info(..)
36
37 -- * Attributes
38 , BucketCount
39 , defaultBucketCount
40 , BucketSize
41 , defaultBucketSize
42 , NodeCount
43
44 -- * Query
45 , Network.Kademlia.Routing.null
46 , Network.Kademlia.Routing.full
47 , thisId
48 , shape
49 , Network.Kademlia.Routing.size
50 , Network.Kademlia.Routing.depth
51 , compatibleNodeId
52
53 -- * Lookup
54 , K
55 , defaultK
56 , TableKey (..)
57 , kclosest
58
59 -- * Construction
60 , Network.Kademlia.Routing.nullTable
61 , Event(..)
62 , CheckPing(..)
63 , Network.Kademlia.Routing.insert
64
65 -- * Conversion
66 , Network.Kademlia.Routing.TableEntry
67 , Network.Kademlia.Routing.toList
68
69 -- * Routing
70 , Timestamp
71 , getTimestamp
72 ) -} where
73
74import Control.Applicative as A
75import Control.Arrow
76import Control.Monad
77import Data.Function
78import Data.Functor.Contravariant
79import Data.Functor.Identity
80import Data.List as L hiding (insert)
81import Data.Maybe
82import Data.Monoid
83import Data.Wrapper.PSQ as PSQ
84import Data.Serialize as S hiding (Result, Done)
85import qualified Data.Sequence as Seq
86import Data.Time
87import Data.Time.Clock.POSIX
88import Data.Word
89import GHC.Generics
90import Text.PrettyPrint as PP hiding ((<>))
91import Text.PrettyPrint.HughesPJClass (pPrint,Pretty)
92import qualified Data.ByteString as BS
93import Data.Bits
94import Data.Ord
95import Data.Reflection
96import Network.Address
97import Data.Typeable
98import Data.Coerce
99import Data.Hashable
100
101
102-- | Last time the node was responding to our queries.
103--
104-- Not all nodes that we learn about are equal. Some are \"good\" and
105-- some are not. Many nodes using the DHT are able to send queries
106-- and receive responses, but are not able to respond to queries
107-- from other nodes. It is important that each node's routing table
108-- must contain only known good nodes. A good node is a node has
109-- responded to one of our queries within the last 15 minutes. A
110-- node is also good if it has ever responded to one of our queries
111-- and has sent us a query within the last 15 minutes. After 15
112-- minutes of inactivity, a node becomes questionable. Nodes become
113-- bad when they fail to respond to multiple queries in a row. Nodes
114-- that we know are good are given priority over nodes with unknown
115-- status.
116--
117type Timestamp = POSIXTime
118
119getTimestamp :: IO Timestamp
120getTimestamp = do
121 utcTime <- getCurrentTime
122 return $ utcTimeToPOSIXSeconds utcTime
123
124
125
126{-----------------------------------------------------------------------
127 Bucket
128-----------------------------------------------------------------------}
129--
130-- When a k-bucket is full and a new node is discovered for that
131-- k-bucket, the least recently seen node in the k-bucket is
132-- PINGed. If the node is found to be still alive, the new node is
133-- place in a secondary list, a replacement cache. The replacement
134-- cache is used only if a node in the k-bucket stops responding. In
135-- other words: new nodes are used only when older nodes disappear.
136
137-- | Timestamp - last time this node is pinged.
138type NodeEntry ni = Binding ni Timestamp
139
140
141-- | Maximum number of 'NodeInfo's stored in a bucket. Most clients
142-- use this value.
143defaultBucketSize :: Int
144defaultBucketSize = 8
145
146data QueueMethods m elem fifo = QueueMethods
147 { pushBack :: elem -> fifo -> m fifo
148 , popFront :: fifo -> m (Maybe elem, fifo)
149 , emptyQueue :: m fifo
150 }
151
152{-
153fromQ :: Functor m =>
154 ( a -> b )
155 -> ( b -> a )
156 -> QueueMethods m elem a
157 -> QueueMethods m elem b
158fromQ embed project QueueMethods{..} =
159 QueueMethods { pushBack = \e -> fmap embed . pushBack e . project
160 , popFront = fmap (second embed) . popFront . project
161 , emptyQueue = fmap embed emptyQueue
162 }
163-}
164
165seqQ :: QueueMethods Identity ni (Seq.Seq ni)
166seqQ = QueueMethods
167 { pushBack = \e fifo -> pure (fifo Seq.|> e)
168 , popFront = \fifo -> case Seq.viewl fifo of
169 e Seq.:< fifo' -> pure (Just e, fifo')
170 Seq.EmptyL -> pure (Nothing, Seq.empty)
171 , emptyQueue = pure Seq.empty
172 }
173
174type BucketQueue ni = Seq.Seq ni
175
176bucketQ :: QueueMethods Identity ni (BucketQueue ni)
177bucketQ = seqQ
178
179
180data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int)
181
182contramapC :: (b -> a) -> Compare a -> Compare b
183contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b))
184 (\s x -> hsh s (f x))
185
186newtype Ordered' s a = Ordered a
187 deriving (Show)
188
189-- | Hack to avoid UndecidableInstances
190newtype Shrink a = Shrink a
191 deriving (Show)
192
193type Ordered s a = Ordered' s (Shrink a)
194
195instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where
196 a == b = (compare a b == EQ)
197
198instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where
199 compare a b = cmp (coerce a) (coerce b)
200 where Compare cmp _ = reflect (Proxy :: Proxy s)
201
202instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where
203 hashWithSalt salt x = hash salt (coerce x)
204 where Compare _ hash = reflect (Proxy :: Proxy s)
205
206-- | Bucket is also limited in its length — thus it's called k-bucket.
207-- When bucket becomes full, we should split it in two lists by
208-- current span bit. Span bit is defined by depth in the routing
209-- table tree. Size of the bucket should be choosen such that it's
210-- very unlikely that all nodes in bucket fail within an hour of
211-- each other.
212data Bucket s ni = Bucket
213 { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes
214 , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs
215 } deriving (Generic)
216
217#define CAN_SHOW_BUCKET 0
218
219#if CAN_SHOW_BUCKET
220deriving instance Show ni => Show (Bucket s ni)
221#endif
222
223bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni
224bucketCompare _ = reflect (Proxy :: Proxy s)
225
226mapBucket :: ( Reifies s (Compare a)
227 , Reifies t (Compare ni)
228 ) => (a -> ni) -> Bucket s a -> Bucket t ni
229mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns)
230 (fmap (second f) q)
231 where f' = coerce . f . coerce
232
233
234#if 0
235
236{-
237getGenericNode :: ( Serialize (NodeId)
238 , Serialize ip
239 , Serialize u
240 ) => Get (NodeInfo)
241getGenericNode = do
242 nid <- get
243 naddr <- get
244 u <- get
245 return NodeInfo
246 { nodeId = nid
247 , nodeAddr = naddr
248 , nodeAnnotation = u
249 }
250
251putGenericNode :: ( Serialize (NodeId)
252 , Serialize ip
253 , Serialize u
254 ) => NodeInfo -> Put
255putGenericNode (NodeInfo nid naddr u) = do
256 put nid
257 put naddr
258 put u
259
260instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where
261 get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ)
262 put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes
263-}
264
265#endif
266
267psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p
268psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs
269
270psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)]
271psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq
272
273-- | Update interval, in seconds.
274delta :: NominalDiffTime
275delta = 15 * 60
276
277-- | Should maintain a set of stable long running nodes.
278--
279-- Note: pings are triggerd only when a bucket is full.
280updateBucketForInbound :: ( Coercible t1 t
281 , Alternative f
282 , Reifies s (Compare t1)
283 ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1)
284updateBucketForInbound curTime info bucket
285 -- Just update timestamp if a node is already in bucket.
286 --
287 -- Note PingResult events should only occur for nodes we requested a ping for,
288 -- and those will always already be in the routing queue and will get their
289 -- timestamp updated here, since 'TryInsert' is called on every inbound packet,
290 -- including ping results.
291 | already_have
292 = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime )
293 -- bucket is good, but not full => we can insert a new node
294 | PSQ.size (bktNodes bucket) < defaultBucketSize
295 = pure ( [], map_ns $ PSQ.insert (coerce info) curTime )
296 -- If there are any questionable nodes in the bucket have not been
297 -- seen in the last 15 minutes, the least recently seen node is
298 -- pinged. If any nodes in the bucket are known to have become bad,
299 -- then one is replaced by the new node in the next insertBucket
300 -- iteration.
301 | not (L.null stales)
302 = pure ( stales
303 , bucket { -- Update timestamps so that we don't redundantly ping.
304 bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket
305 -- Update queue with the pending NodeInfo in case of ping fail.
306 , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } )
307 -- When the bucket is full of good nodes, the new node is simply discarded.
308 -- We must return 'A.empty' here to ensure that bucket splitting happens
309 -- inside 'modifyBucket'.
310 | otherwise = A.empty
311 where
312 -- We (take 1) to keep a 1-to-1 correspondence between pending pings and
313 -- waiting nodes in the bktQ. This way, we don't have to worry about what
314 -- to do with failed pings for which there is no ready replacements.
315 stales = -- One stale:
316 do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket)
317 guard (t < curTime - delta)
318 return $ coerce n
319 -- All stale:
320 -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket
321
322 already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket)
323
324 map_ns f = bucket { bktNodes = f (bktNodes bucket) }
325 -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) }
326
327updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) =>
328 a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a)
329updateBucketForPingResult bad_node got_response bucket
330 = pure ( map (,Nothing) forgotten
331 ++ map (second Just) replacements
332 , Bucket (foldr replace
333 (bktNodes bucket)
334 replacements)
335 popped
336 )
337 where
338 (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket)
339
340 -- Dropped from accepted, replaced by pending.
341 replacements | got_response = [] -- Timestamp was already updated by TryInsert.
342 | Just info <- top = do
343 -- Insert only if there's a removal.
344 _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket)
345 return (bad_node, info)
346 | otherwise = []
347
348 -- Dropped from the pending queue without replacing.
349 forgotten | got_response = maybeToList $ fmap snd top
350 | otherwise = []
351
352
353 replace (bad_node, (tm, info)) =
354 PSQ.insert (coerce info) tm
355 . PSQ.delete (coerce bad_node)
356
357
358updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp
359updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales
360
361type BitIx = Word
362
363partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b)
364partitionQ imp test q0 = do
365 pass0 <- emptyQueue imp
366 fail0 <- emptyQueue imp
367 let flipfix a b f = fix f a b
368 flipfix q0 (pass0,fail0) $ \rec q qs -> do
369 (mb,q') <- popFront imp q
370 case mb of
371 Nothing -> return qs
372 Just e -> do qs' <- select (pushBack imp e) qs
373 rec q' qs'
374 where
375 select :: Functor f => (b -> f b) -> (b, b) -> f (b, b)
376 select f = if test e then \(a,b) -> flip (,) b <$> f a
377 else \(a,b) -> (,) a <$> f b
378
379
380
381split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
382 forall ni s. ( Reifies s (Compare ni) ) =>
383 (ni -> Word -> Bool)
384 -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni)
385split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs)
386 where
387 (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b
388 (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b
389
390 spanBit :: ni -> Bool
391 spanBit entry = testNodeIdBit entry i
392
393
394{-----------------------------------------------------------------------
395-- BucketList
396-----------------------------------------------------------------------}
397
398defaultBucketCount :: Int
399defaultBucketCount = 20
400
401defaultMaxBucketCount :: Word
402defaultMaxBucketCount = 24
403
404data Info ni nid = Info
405 { myBuckets :: BucketList ni
406 , myNodeId :: nid
407 , myAddress :: SockAddr
408 }
409 deriving Generic
410
411deriving instance (Eq ni, Eq nid) => Eq (Info ni nid)
412deriving instance (Show ni, Show nid) => Show (Info ni nid)
413
414-- instance (Eq ip, Serialize ip) => Serialize (Info ip)
415
416-- | The routing table covers the entire 'NodeId' space from 0 to 2 ^
417-- 160. The routing table is subdivided into 'Bucket's that each cover
418-- a portion of the space. An empty table has one bucket with an ID
419-- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\"
420-- is inserted into the table, it is placed within the bucket that has
421-- @min <= N < max@. An empty table has only one bucket so any node
422-- must fit within it. Each bucket can only hold 'K' nodes, currently
423-- eight, before becoming 'Full'. When a bucket is full of known good
424-- nodes, no more nodes may be added unless our own 'NodeId' falls
425-- within the range of the 'Bucket'. In that case, the bucket is
426-- replaced by two new buckets each with half the range of the old
427-- bucket and the nodes from the old bucket are distributed among the
428-- two new ones. For a new table with only one bucket, the full bucket
429-- is always split into two new buckets covering the ranges @0..2 ^
430-- 159@ and @2 ^ 159..2 ^ 160@.
431--
432data BucketList ni = forall s. Reifies s (Compare ni) =>
433 BucketList { thisNode :: !ni
434 -- | Non-empty list of buckets.
435 , buckets :: [Bucket s ni]
436 }
437
438mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b
439mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts)
440 $ \p -> BucketList
441 { thisNode = f self
442 , buckets = map (resolve p . mapBucket f) bkts
443 }
444 where
445 resolve :: Proxy s -> Bucket s ni -> Bucket s ni
446 resolve = const id
447
448instance (Eq ni) => Eq (BucketList ni) where
449 (==) = (==) `on` Network.Kademlia.Routing.toList
450
451#if 0
452
453instance Serialize NominalDiffTime where
454 put = putWord32be . fromIntegral . fromEnum
455 get = (toEnum . fromIntegral) <$> getWord32be
456
457#endif
458
459#if CAN_SHOW_BUCKET
460deriving instance (Show ni) => Show (BucketList ni)
461#else
462instance Show ni => Show (BucketList ni) where
463 showsPrec d (BucketList self bkts) =
464 mappend "BucketList "
465 . showsPrec (d+1) self
466 . mappend " (fromList "
467 . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts)
468 . mappend ") "
469#endif
470
471#if 0
472
473-- | Normally, routing table should be saved between invocations of
474-- the client software. Note that you don't need to store /this/
475-- 'NodeId' since it is already included in routing table.
476instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList)
477
478#endif
479
480-- | Shape of the table.
481instance Pretty (BucketList ni) where
482 pPrint t
483 | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss
484 | otherwise = brackets $
485 PP.int (L.sum ss) <> " nodes, " <>
486 PP.int bucketCount <> " buckets"
487 where
488 bucketCount = L.length ss
489 ss = shape t
490
491-- | Empty table with specified /spine/ node id.
492--
493-- XXX: The comparison function argument is awkward here.
494nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni
495nullTable cmp hsh ni n =
496 reify (Compare cmp hsh)
497 $ \p -> BucketList
498 ni
499 [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)]
500 where
501 empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp
502 empty = const $ PSQ.empty
503
504#if 0
505
506-- | Test if table is empty. In this case DHT should start
507-- bootstrapping process until table becomes 'full'.
508null :: BucketList -> Bool
509null (Tip _ _ b) = PSQ.null $ bktNodes b
510null _ = False
511
512-- | Test if table have maximum number of nodes. No more nodes can be
513-- 'insert'ed, except old ones becomes bad.
514full :: BucketList -> Bool
515full (Tip _ n _) = n == 0
516full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t
517full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t
518
519-- | Get the /spine/ node id.
520thisId :: BucketList -> NodeId
521thisId (Tip nid _ _) = nid
522thisId (Zero table _) = thisId table
523thisId (One _ table) = thisId table
524
525-- | Number of nodes in a bucket or a table.
526type NodeCount = Int
527
528#endif
529
530-- | Internally, routing table is similar to list of buckets or a
531-- /matrix/ of nodes. This function returns the shape of the matrix.
532shape :: BucketList ni -> [Int]
533shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl
534
535#if 0
536
537-- | Get number of nodes in the table.
538size :: BucketList -> NodeCount
539size = L.sum . shape
540
541-- | Get number of buckets in the table.
542depth :: BucketList -> BucketCount
543depth = L.length . shape
544
545#endif
546
547lookupBucket :: forall ni nid x.
548 ( -- FiniteBits nid
549 Ord nid
550 ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x
551lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts
552 where
553 d = kademliaXor space nid (kademliaLocation space self)
554
555 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni]
556 go i bs (bucket : buckets)
557 | kademliaTestBit space d i = bucket : buckets ++ bs
558 | otherwise = go (succ i) (bucket:bs) buckets
559 go _ bs [] = bs
560
561bucketNumber :: forall ni nid.
562 KademliaSpace nid ni -> nid -> BucketList ni -> Int
563bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts
564 where
565 d = kademliaXor space nid (kademliaLocation space self)
566
567 go :: Word -> [Bucket s ni] -> Word
568 go i (bucket : buckets)
569 | kademliaTestBit space d i = i
570 | otherwise = go (succ i) buckets
571 go i [] = i
572
573
574compatibleNodeId :: forall ni nid.
575 ( Serialize nid, FiniteBits nid) =>
576 (ni -> nid) -> BucketList ni -> IO nid
577compatibleNodeId nodeId tbl = genBucketSample prefix br
578 where
579 br = bucketRange (L.length (shape tbl) - 1) True
580 nodeIdSize = finiteBitSize (undefined :: nid) `div` 8
581 bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0
582 prefix = either error id $ S.decode bs
583
584tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8]
585tablePrefix testbit = map (packByte . take 8 . (++repeat False))
586 . chunksOf 8
587 . tableBits testbit
588 where
589 packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0]
590 bitmask ix True = bit ix
591 bitmask _ _ = 0
592
593tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool]
594tableBits testbit (BucketList self bkts) =
595 zipWith const (map (testbit self) [0..])
596 bkts
597
598selfNode :: BucketList ni -> ni
599selfNode (BucketList self _) = self
600
601chunksOf :: Int -> [e] -> [[e]]
602chunksOf i ls = map (take i) (build (splitter ls)) where
603 splitter :: [e] -> ([e] -> a -> a) -> a -> a
604 splitter [] _ n = n
605 splitter l c n = l `c` splitter (drop i l) c n
606
607build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a]
608build g = g (:) []
609
610
611
612-- | Count of closest nodes in find_node reply.
613type K = Int
614
615-- | Default 'K' is equal to 'defaultBucketSize'.
616defaultK :: K
617defaultK = 8
618
619#if 0
620class TableKey dht k where
621 toNodeId :: k -> NodeId
622
623instance TableKey dht (NodeId) where
624 toNodeId = id
625
626#endif
627
628-- | In Kademlia, the distance metric is XOR and the result is
629-- interpreted as an unsigned integer.
630newtype NodeDistance nodeid = NodeDistance nodeid
631 deriving (Eq, Ord)
632
633-- | distance(A,B) = |A xor B| Smaller values are closer.
634distance :: Bits nid => nid -> nid -> NodeDistance nid
635distance a b = NodeDistance $ xor a b
636
637-- | Order by closeness: nearest nodes first.
638rank :: ( Ord nid
639 ) => KademliaSpace nid ni -> nid -> [ni] -> [ni]
640rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space))
641
642
643-- | Get a list of /K/ closest nodes using XOR metric. Used in
644-- 'find_node' and 'get_peers' queries.
645kclosest :: ( -- FiniteBits nid
646 Ord nid
647 ) =>
648 KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni]
649kclosest space k nid tbl = take k $ rank space nid (L.concat bucket)
650 ++ rank space nid (L.concat everyone)
651 where
652 (bucket,everyone) =
653 L.splitAt 1
654 . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes))
655 $ tbl
656
657
658
659{-----------------------------------------------------------------------
660-- Routing
661-----------------------------------------------------------------------}
662
663splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
664 ( Reifies s (Compare ni) ) =>
665 (ni -> Word -> Bool)
666 -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ]
667splitTip testNodeBit ni i bucket
668 | testNodeBit ni i = [zeros , ones ]
669 | otherwise = [ones , zeros ]
670 where
671 (ones, zeros) = split testNodeBit i bucket
672
673-- | Used in each query.
674--
675-- TODO: Kademlia non-empty subtrees should should split if they have less than
676-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
677-- paper. The rule requiring additional splits is in section 2.4.
678modifyBucket
679 :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) =>
680 forall ni nid xs.
681 KademliaSpace nid ni
682 -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni)
683modifyBucket space nid f (BucketList self bkts)
684 = second (BucketList self) <$> go (0 :: BitIx) bkts
685 where
686 d = kademliaXor space nid (kademliaLocation space self)
687
688 -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni])
689
690 go !i (bucket : buckets@(_:_))
691 | kademliaTestBit space d i = second (: buckets) <$> f bucket
692 | otherwise = second (bucket :) <$> go (succ i) buckets
693
694 go !i [bucket] = second (: []) <$> f bucket <|> gosplit
695 where
696 gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space
697 . kademliaLocation space )
698 self
699 i
700 bucket)
701 | otherwise = Nothing -- Limit the number of buckets.
702
703
704bktCount :: BucketList ni -> Int
705bktCount (BucketList _ bkts) = L.length bkts
706
707-- | Triggering event for atomic table update
708data Event ni = TryInsert { foreignNode :: ni }
709 | PingResult { foreignNode :: ni , ponged :: Bool }
710
711#if 0
712deriving instance Eq (NodeId) => Eq (Event)
713deriving instance ( Show ip
714 , Show (NodeId)
715 , Show u
716 ) => Show (Event)
717
718#endif
719
720eventId :: (ni -> nid) -> Event ni -> nid
721eventId nodeId (TryInsert ni) = nodeId ni
722eventId nodeId (PingResult ni _) = nodeId ni
723
724
725-- | Actions requested by atomic table update
726data CheckPing ni = CheckPing [ni]
727
728#if 0
729
730deriving instance Eq (NodeId) => Eq (CheckPing)
731deriving instance ( Show ip
732 , Show (NodeId)
733 , Show u
734 ) => Show (CheckPing)
735
736#endif
737
738
739-- | Call on every inbound packet (including requested ping results).
740-- Returns a triple (was_inserted, to_ping, tbl') where
741--
742-- [ /was_inserted/ ] True if the node was added to the routing table.
743--
744-- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'.
745-- This will be empty if /was_inserted/, but a non-inserted node
746-- may be added to a replacement queue and will be inserted if
747-- one of the items in this list time out.
748--
749-- [ /tbl'/ ] The updated routing 'BucketList'.
750--
751updateForInbound ::
752 KademliaSpace nid ni
753 -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni)
754updateForInbound space tm ni tbl@(BucketList _ bkts) =
755 maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl'))
756 $ modifyBucket space
757 (kademliaLocation space ni)
758 (updateBucketForInbound tm ni)
759 tbl
760
761-- | Update the routing table with the results of a ping.
762--
763-- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the
764-- routing table and the node /b/, with timestamp /tm/, has taken its place.
765updateForPingResult ::
766 KademliaSpace nid ni
767 -> ni -- ^ The pinged node.
768 -> Bool -- ^ True if we got a reply, False if it timed out.
769 -> BucketList ni -- ^ The routing table.
770 -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni )
771updateForPingResult space ni got_reply tbl =
772 fromMaybe ([],tbl)
773 $ modifyBucket space
774 (kademliaLocation space ni)
775 (updateBucketForPingResult ni got_reply)
776 tbl
777
778
779{-----------------------------------------------------------------------
780-- Conversion
781-----------------------------------------------------------------------}
782
783type TableEntry ni = (ni, Timestamp)
784
785tableEntry :: NodeEntry ni -> TableEntry ni
786tableEntry (a :-> b) = (a, b)
787
788toList :: BucketList ni -> [[TableEntry ni]]
789toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts
790
791data KademliaSpace nid ni = KademliaSpace
792 { -- | Given a node record (probably including IP address), yields a
793 -- kademlia xor-metric location.
794 kademliaLocation :: ni -> nid
795 -- | Used when comparing locations. This is similar to
796 -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so
797 -- that 0 is the most significant bit.
798 , kademliaTestBit :: nid -> Word -> Bool
799 -- | The Kademlia xor-metric.
800 , kademliaXor :: nid -> nid -> nid
801
802 , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid
803 }
804
805instance Contravariant (KademliaSpace nid) where
806 contramap f ks = ks
807 { kademliaLocation = kademliaLocation ks . f
808 }
809
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
new file mode 100644
index 00000000..1be1afc1
--- /dev/null
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -0,0 +1,236 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE PatternSynonyms #-}
3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
7module Network.Kademlia.Search where
8
9import Control.Concurrent.Tasks
10import Control.Concurrent.STM
11import Control.Monad
12import Data.Function
13import Data.Maybe
14import qualified Data.Set as Set
15 ;import Data.Set (Set)
16import Data.Hashable (Hashable(..)) -- for type sigs
17import System.IO.Error
18
19import qualified Data.MinMaxPSQ as MM
20 ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ')
21import qualified Data.Wrapper.PSQ as PSQ
22 ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey)
23import Network.Kademlia.Routing as R
24#ifdef THREAD_DEBUG
25import Control.Concurrent.Lifted.Instrument
26#else
27import Control.Concurrent.Lifted
28import GHC.Conc (labelThread)
29#endif
30
31data Search nid addr tok ni r = Search
32 { searchSpace :: KademliaSpace nid ni
33 , searchNodeAddress :: ni -> addr
34 , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)))
35 (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ())
36 , searchAlpha :: Int -- α = 8
37 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
38 -- how fast the queries are. For Tox's much slower onion-routed queries, we
39 -- need to ensure that closer non-responding queries don't completely push out
40 -- farther away queries.
41 --
42 -- For BitTorrent, setting them both 8 was not an issue, but that is no longer
43 -- supported because now the number of remembered informants is now the
44 -- difference between these two numbers. So, if searchK = 16 and searchAlpha =
45 -- 4, then the number of remembered query responses is 12.
46 , searchK :: Int -- K = 16
47 }
48
49data SearchState nid addr tok ni r = SearchState
50 { -- | The number of pending queries. Incremented before any query is sent
51 -- and decremented when we get a reply.
52 searchPendingCount :: TVar Int
53 -- | Nodes scheduled to be queried (roughly at most K).
54 , searchQueued :: TVar (MinMaxPSQ ni nid)
55 -- | The nearest (K - α) nodes that issued a reply.
56 --
57 -- α is the maximum number of simultaneous queries.
58 , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok))
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
63 , searchVisited :: TVar (Set addr)
64 , searchSpec :: Search nid addr tok ni r
65 }
66
67
68newSearch :: ( Ord addr
69 , PSQKey nid
70 , PSQKey ni
71 ) =>
72 {-
73 KademliaSpace nid ni
74 -> (ni -> addr)
75 -> (ni -> IO ([ni], [r])) -- the query action.
76 -> (r -> STM Bool) -- receives search results.
77 -> nid -- target of search
78 -}
79 Search nid addr tok ni r
80 -> nid
81 -> [ni] -- Initial nodes to query.
82 -> STM (SearchState nid addr tok ni r)
83newSearch s@(Search space nAddr qry _ _) target ns = do
84 c <- newTVar 0
85 q <- newTVar $ MM.fromList
86 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
87 $ ns
88 i <- newTVar MM.empty
89 v <- newTVar Set.empty
90 return -- (Search space nAddr qry) , r , target
91 ( SearchState c q i v s )
92
93-- | Discard a value from a key-priority-value tuple. This is useful for
94-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
95stripValue :: Binding' k p v -> Binding k p
96stripValue (Binding ni _ nid) = (ni :-> nid)
97
98-- | Reset a 'SearchState' object to ready it for a repeated search.
99reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
100 (nid -> STM [ni])
101 -> Search nid addr1 tok1 ni r1
102 -> nid
103 -> SearchState nid addr tok ni r
104 -> STM (SearchState nid addr tok ni r)
105reset nearestNodes qsearch target st = do
106 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
107 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
108 <$> nearestNodes target
109 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
110 writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes
111 writeTVar (searchInformant st) MM.empty
112 writeTVar (searchVisited st) Set.empty
113 writeTVar (searchPendingCount st) 0
114 return st
115
116sendAsyncQuery :: forall addr nid tok ni r.
117 ( Ord addr
118 , PSQKey nid
119 , PSQKey ni
120 , Show nid
121 ) =>
122 Search nid addr tok ni r
123 -> nid
124 -> (r -> STM Bool) -- ^ return False to terminate the search.
125 -> SearchState nid addr tok ni r
126 -> Binding ni nid
127 -> TaskGroup
128 -> IO ()
129sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g =
130 case searchQuery of
131 Left blockingQuery ->
132 forkTask g "searchQuery" $ do
133 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
134 reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing)
135 atomically $ do
136 modifyTVar searchPendingCount pred
137 maybe (return ()) go reply
138 Right nonblockingQuery -> do
139 nonblockingQuery searchTarget ni $ \reply ->
140 atomically $ do
141 modifyTVar searchPendingCount pred
142 maybe (return ()) go reply
143 where
144 go (ns,rs,tok) = do
145 vs <- readTVar searchVisited
146 -- We only queue a node if it is not yet visited
147 let insertFoundNode :: Int
148 -> ni
149 -> MinMaxPSQ ni nid
150 -> MinMaxPSQ ni nid
151 insertFoundNode k n q
152 | searchNodeAddress n `Set.member` vs
153 = q
154 | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget
155 $ kademliaLocation searchSpace n )
156 q
157
158 qsize0 <- MM.size <$> readTVar searchQueued
159 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow
160 -- only when there's fewer than
161 -- K elements.
162 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns
163 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
164 flip fix rs $ \loop -> \case
165 r:rs' -> do
166 wanting <- searchResult r
167 if wanting then loop rs'
168 else searchCancel sch
169 [] -> return ()
170
171
172searchIsFinished :: ( PSQKey nid
173 , PSQKey ni
174 ) => SearchState nid addr tok ni r -> STM Bool
175searchIsFinished SearchState{..} = do
176 q <- readTVar searchQueued
177 cnt <- readTVar searchPendingCount
178 informants <- readTVar searchInformant
179 return $ cnt == 0
180 && ( MM.null q
181 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec)
182 && ( PSQ.prio (fromJust $ MM.findMax informants)
183 <= PSQ.prio (fromJust $ MM.findMin q))))
184
185searchCancel :: SearchState nid addr tok ni r -> STM ()
186searchCancel SearchState{..} = do
187 writeTVar searchPendingCount 0
188 writeTVar searchQueued MM.empty
189
190search ::
191 ( Ord r
192 , Ord addr
193 , PSQKey nid
194 , PSQKey ni
195 , Show nid
196 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r)
197search sch buckets target result = do
198 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
199 st <- atomically $ newSearch sch target ns
200 forkIO $ searchLoop sch target result st
201 return st
202
203searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni )
204 => Search nid addr tok ni r -- ^ Query and distance methods.
205 -> nid -- ^ The target we are searching for.
206 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
207 -> SearchState nid addr tok ni r -- ^ Search-related state.
208 -> IO ()
209searchLoop sch@Search{..} target result s@SearchState{..} = do
210 myThreadId >>= flip labelThread ("search."++show target)
211 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
212 join $ atomically $ do
213 cnt <- readTVar $ searchPendingCount
214 check (cnt <= 8) -- Only 8 pending queries at a time.
215 informants <- readTVar searchInformant
216 found <- MM.minView <$> readTVar searchQueued
217 case found of
218 Just (ni :-> d, q)
219 | -- If there's fewer than /k - α/ informants and there's any
220 -- node we haven't yet got a response from.
221 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q))
222 -- Or there's no informants yet at all.
223 || MM.null informants
224 -- Or if the closest scheduled node is nearer than the
225 -- nearest /k/ informants.
226 || (d < PSQ.prio (fromJust $ MM.findMax informants))
227 -> -- Then the search continues, send a query.
228 do writeTVar searchQueued q
229 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
230 modifyTVar searchPendingCount succ
231 return $ do
232 sendAsyncQuery sch target result s (ni :-> d) g
233 again
234 _ -> -- Otherwise, we are finished.
235 do check (cnt == 0)
236 return $ return ()