diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /kad/src/Network/Kademlia | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'kad/src/Network/Kademlia')
-rw-r--r-- | kad/src/Network/Kademlia/Bootstrap.hs | 439 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/CommonAPI.hs | 84 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Persistence.hs | 52 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Routing.hs | 809 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 236 |
5 files changed, 1620 insertions, 0 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs new file mode 100644 index 00000000..08ba3318 --- /dev/null +++ b/kad/src/Network/Kademlia/Bootstrap.hs | |||
@@ -0,0 +1,439 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE ConstraintKinds #-} | ||
3 | {-# LANGUAGE DeriveFunctor #-} | ||
4 | {-# LANGUAGE DeriveTraversable #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE GADTs #-} | ||
7 | {-# LANGUAGE KindSignatures #-} | ||
8 | {-# LANGUAGE LambdaCase #-} | ||
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
10 | {-# LANGUAGE RecordWildCards #-} | ||
11 | {-# LANGUAGE NondecreasingIndentation #-} | ||
12 | {-# LANGUAGE PartialTypeSignatures #-} | ||
13 | {-# LANGUAGE PatternSynonyms #-} | ||
14 | {-# LANGUAGE RankNTypes #-} | ||
15 | {-# LANGUAGE ScopedTypeVariables #-} | ||
16 | module Network.Kademlia.Bootstrap where | ||
17 | |||
18 | import Data.Function | ||
19 | import Data.Maybe | ||
20 | import qualified Data.Set as Set | ||
21 | import Data.Time.Clock.POSIX (getPOSIXTime) | ||
22 | import Network.Kademlia.Routing as R | ||
23 | #ifdef THREAD_DEBUG | ||
24 | import Control.Concurrent.Lifted.Instrument | ||
25 | #else | ||
26 | import Control.Concurrent.Lifted | ||
27 | import GHC.Conc (labelThread) | ||
28 | #endif | ||
29 | import Control.Concurrent.STM | ||
30 | import Control.Monad | ||
31 | import Data.Hashable | ||
32 | import Data.Time.Clock.POSIX (POSIXTime) | ||
33 | import Data.Ord | ||
34 | import System.Entropy | ||
35 | import System.Timeout | ||
36 | import DPut | ||
37 | import DebugTag | ||
38 | |||
39 | import qualified Data.Wrapper.PSQInt as Int | ||
40 | ;import Data.Wrapper.PSQInt (pattern (:->)) | ||
41 | import Network.Address (bucketRange) | ||
42 | import Network.Kademlia.Search | ||
43 | import Control.Concurrent.Tasks | ||
44 | import Network.Kademlia | ||
45 | |||
46 | type SensibleNodeId nid ni = | ||
47 | ( Show nid | ||
48 | , Ord nid | ||
49 | , Ord ni | ||
50 | , Hashable nid | ||
51 | , Hashable ni ) | ||
52 | |||
53 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | ||
54 | { -- | A staleness threshold (if a bucket goes this long without being | ||
55 | -- touched, a refresh will be triggered). | ||
56 | refreshInterval :: POSIXTime | ||
57 | -- | A TVar with the time-to-refresh schedule for each bucket. | ||
58 | -- | ||
59 | -- To "touch" a bucket and prevent it from being refreshed, reschedule | ||
60 | -- its refresh time to some time into the future by modifying its | ||
61 | -- priority in this priority search queue. | ||
62 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | ||
63 | -- | This is the kademlia node search specification. | ||
64 | , refreshSearch :: Search nid addr tok ni ni | ||
65 | -- | The current kademlia routing table buckets. | ||
66 | , refreshBuckets :: TVar (R.BucketList ni) | ||
67 | -- | Action to ping a node. This is used only during initial bootstrap | ||
68 | -- to get some nodes in our table. A 'True' result is interpreted as a a | ||
69 | -- pong, where 'False' is a non-response. | ||
70 | , refreshPing :: ni -> IO Bool | ||
71 | , -- | Timestamp of last bucket event. | ||
72 | refreshLastTouch :: TVar POSIXTime | ||
73 | , -- | This variable indicates whether or not we are in bootstrapping mode. | ||
74 | bootstrapMode :: TVar Bool | ||
75 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | ||
76 | -- every finished refresh. | ||
77 | bootstrapCountdown :: TVar (Maybe Int) | ||
78 | } | ||
79 | |||
80 | newBucketRefresher :: ( Ord addr, Hashable addr | ||
81 | , SensibleNodeId nid ni ) | ||
82 | => TVar (R.BucketList ni) | ||
83 | -> Search nid addr tok ni ni | ||
84 | -> (ni -> IO Bool) | ||
85 | -> STM (BucketRefresher nid ni) | ||
86 | newBucketRefresher bkts sch ping = do | ||
87 | let spc = searchSpace sch | ||
88 | nodeId = kademliaLocation spc | ||
89 | -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount | ||
90 | sched <- newTVar Int.empty | ||
91 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | ||
92 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | ||
93 | bootstrapCnt <- newTVar Nothing | ||
94 | return BucketRefresher | ||
95 | { refreshInterval = 15 * 60 | ||
96 | , refreshQueue = sched | ||
97 | , refreshSearch = sch | ||
98 | , refreshBuckets = bkts | ||
99 | , refreshPing = ping | ||
100 | , refreshLastTouch = lasttouch | ||
101 | , bootstrapMode = bootstrapVar | ||
102 | , bootstrapCountdown = bootstrapCnt | ||
103 | } | ||
104 | |||
105 | -- | This was added to avoid the compile error "Record update for | ||
106 | -- insufficiently polymorphic field" when trying to update the existentially | ||
107 | -- quantified field 'refreshSearch'. | ||
108 | updateRefresherIO :: Ord addr | ||
109 | => Search nid addr tok ni ni | ||
110 | -> (ni -> IO Bool) | ||
111 | -> BucketRefresher nid ni -> BucketRefresher nid ni | ||
112 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | ||
113 | { refreshSearch = sch | ||
114 | , refreshPing = ping | ||
115 | , refreshInterval = refreshInterval | ||
116 | , refreshBuckets = refreshBuckets | ||
117 | , refreshQueue = refreshQueue | ||
118 | , refreshLastTouch = refreshLastTouch | ||
119 | , bootstrapMode = bootstrapMode | ||
120 | , bootstrapCountdown = bootstrapCountdown | ||
121 | } | ||
122 | |||
123 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | ||
124 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | ||
125 | forkPollForRefresh r@BucketRefresher{ refreshInterval | ||
126 | , refreshQueue | ||
127 | , refreshBuckets | ||
128 | , refreshSearch } = fork $ do | ||
129 | myThreadId >>= flip labelThread "pollForRefresh" | ||
130 | fix $ \again -> do | ||
131 | join $ atomically $ do | ||
132 | nextup <- Int.findMin <$> readTVar refreshQueue | ||
133 | maybe retry (return . go again) nextup | ||
134 | where | ||
135 | refresh :: Int -> IO Int | ||
136 | refresh n = do | ||
137 | -- dput XRefresh $ "Refresh time! "++ show n | ||
138 | refreshBucket r n | ||
139 | |||
140 | go again ( bktnum :-> refresh_time ) = do | ||
141 | now <- getPOSIXTime | ||
142 | case fromEnum (refresh_time - now) of | ||
143 | x | x <= 0 -> do -- Refresh time! | ||
144 | -- Move it to the back of the refresh queue. | ||
145 | atomically $ do | ||
146 | interval <- effectiveRefreshInterval r bktnum | ||
147 | modifyTVar' refreshQueue | ||
148 | $ Int.insert bktnum (now + interval) | ||
149 | -- Now fork the refresh operation. | ||
150 | -- TODO: We should probably propogate the kill signal to this thread. | ||
151 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
152 | _ <- refresh bktnum | ||
153 | return () | ||
154 | return () | ||
155 | picoseconds -> do | ||
156 | -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum | ||
157 | threadDelay ( picoseconds `div` 10^6 ) | ||
158 | again | ||
159 | |||
160 | |||
161 | -- | This is a helper to 'refreshBucket' which does some book keeping to decide | ||
162 | -- whether or not a bucket is sufficiently refreshed or not. It will return | ||
163 | -- false when we can terminate a node search. | ||
164 | checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. | ||
165 | -> TVar (BucketList ni) -- ^ The current routing table. | ||
166 | -> TVar (Set.Set ni) -- ^ In-range nodes found so far. | ||
167 | -> TVar Bool -- ^ The result will also be written here. | ||
168 | -> Int -- ^ The bucket number of interest. | ||
169 | -> ni -- ^ A newly found node. | ||
170 | -> STM Bool | ||
171 | checkBucketFull space var resultCounter fin n found_node = do | ||
172 | let fullcount = R.defaultBucketSize | ||
173 | saveit True = writeTVar fin True >> return True | ||
174 | saveit _ = return False | ||
175 | tbl <- readTVar var | ||
176 | let counts = R.shape tbl | ||
177 | nid = kademliaLocation space found_node | ||
178 | -- Update the result set with every found node that is in the | ||
179 | -- bucket of interest. | ||
180 | when (n == R.bucketNumber space nid tbl) | ||
181 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
182 | resultCount <- readTVar resultCounter | ||
183 | saveit $ case drop (n - 1) counts of | ||
184 | (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going | ||
185 | _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going | ||
186 | _ -> False -- okay, good enough, let's quit. | ||
187 | |||
188 | -- | Called from 'refreshBucket' with the current time when a refresh of the | ||
189 | -- supplied bucket number finishes. | ||
190 | onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) | ||
191 | onFinishedRefresh BucketRefresher { bootstrapCountdown | ||
192 | , bootstrapMode | ||
193 | , refreshQueue | ||
194 | , refreshBuckets } num now = do | ||
195 | bootstrapping <- readTVar bootstrapMode | ||
196 | if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num | ||
197 | else do | ||
198 | tbl <- readTVar refreshBuckets | ||
199 | action <- | ||
200 | if num /= R.bktCount tbl - 1 | ||
201 | then do modifyTVar' bootstrapCountdown (fmap pred) | ||
202 | return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" | ||
203 | else do | ||
204 | -- The last bucket finished. | ||
205 | cnt <- readTVar bootstrapCountdown | ||
206 | case cnt of | ||
207 | Nothing -> do | ||
208 | let fullsize = R.defaultBucketSize | ||
209 | notfull (n,len) | n==num = False | ||
210 | | len>=fullsize = False | ||
211 | | otherwise = True | ||
212 | unfull = case filter notfull $ zip [0..] (R.shape tbl) of | ||
213 | [] -> [(0,0)] -- Schedule at least 1 more refresh. | ||
214 | xs -> xs | ||
215 | forM_ unfull $ \(n,_) -> do | ||
216 | -- Schedule immediate refresh for unfull buckets (other than this one). | ||
217 | modifyTVar' refreshQueue $ Int.insert n (now - 1) | ||
218 | writeTVar bootstrapCountdown $! Just $! length unfull | ||
219 | return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull | ||
220 | Just n -> do writeTVar bootstrapCountdown $! Just $! pred n | ||
221 | return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" | ||
222 | cnt <- readTVar bootstrapCountdown | ||
223 | if (cnt == Just 0) | ||
224 | then do | ||
225 | -- Boostrap finished! | ||
226 | writeTVar bootstrapMode False | ||
227 | writeTVar bootstrapCountdown Nothing | ||
228 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | ||
229 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | ||
230 | |||
231 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | ||
232 | BucketRefresher nid ni -> Int -> IO Int | ||
233 | refreshBucket r@BucketRefresher{ refreshSearch = sch | ||
234 | , refreshBuckets = var } | ||
235 | n = do | ||
236 | tbl <- atomically (readTVar var) | ||
237 | let count = bktCount tbl | ||
238 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
239 | sample <- if n+1 >= count -- Is this the last bucket? | ||
240 | then return nid -- Yes? Search our own id. | ||
241 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
242 | getEntropy | ||
243 | nid | ||
244 | (bucketRange n (n + 1 < count)) | ||
245 | fin <- atomically $ newTVar False | ||
246 | resultCounter <- atomically $ newTVar Set.empty | ||
247 | |||
248 | dput XRefresh $ "Start refresh " ++ show (n,sample) | ||
249 | |||
250 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
251 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
252 | then const $ return True -- Never short-circuit the last bucket. | ||
253 | else checkBucketFull (searchSpace sch) var resultCounter fin n | ||
254 | _ <- timeout (15*60*1000000) $ do | ||
255 | atomically $ searchIsFinished s >>= check | ||
256 | atomically $ searchCancel s | ||
257 | dput XDHT $ "Finish refresh " ++ show (n,sample) | ||
258 | now <- getPOSIXTime | ||
259 | join $ atomically $ onFinishedRefresh r n now | ||
260 | rcount <- atomically $ do | ||
261 | c <- Set.size <$> readTVar resultCounter | ||
262 | b <- readTVar fin | ||
263 | return $ if b then 1 else c | ||
264 | return rcount | ||
265 | |||
266 | refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () | ||
267 | refreshLastBucket r@BucketRefresher { refreshBuckets | ||
268 | , refreshQueue } = do | ||
269 | |||
270 | now <- getPOSIXTime | ||
271 | atomically $ do | ||
272 | cnt <- bktCount <$> readTVar refreshBuckets | ||
273 | -- Schedule immediate refresh. | ||
274 | modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) | ||
275 | |||
276 | restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => | ||
277 | BucketRefresher nid ni -> STM (IO ()) | ||
278 | restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do | ||
279 | unchanged <- readTVar bootstrapMode | ||
280 | writeTVar bootstrapMode True | ||
281 | writeTVar bootstrapCountdown Nothing | ||
282 | if not unchanged then return $ do | ||
283 | dput XRefresh "BOOTSTRAP entered bootstrap mode" | ||
284 | refreshLastBucket r | ||
285 | else return $ dput XRefresh "BOOTSTRAP already bootstrapping" | ||
286 | |||
287 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | ||
288 | BucketRefresher nid ni | ||
289 | -> t1 ni -- ^ Nodes to bootstrap from. | ||
290 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. | ||
291 | -> IO () | ||
292 | bootstrap r@BucketRefresher { refreshSearch = sch | ||
293 | , refreshBuckets = var | ||
294 | , refreshPing = ping | ||
295 | , bootstrapMode } ns ns0 = do | ||
296 | gotPing <- atomically $ newTVar False | ||
297 | |||
298 | -- First, ping the given nodes so that they are added to | ||
299 | -- our routing table. | ||
300 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
301 | forM_ ns $ \n -> do | ||
302 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
303 | forkTask g lbl $ do | ||
304 | b <- ping n | ||
305 | when b $ atomically $ writeTVar gotPing True | ||
306 | |||
307 | -- We resort to the hardcoded fallback nodes only when we got no | ||
308 | -- responses. This is to lesson the burden on well-known boostrap | ||
309 | -- nodes. | ||
310 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
311 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
312 | forM_ ns0 $ \n -> do | ||
313 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
314 | (void $ ping n) | ||
315 | dput XDHT "Finished bootstrap pings." | ||
316 | -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. | ||
317 | join $ atomically $ do | ||
318 | writeTVar bootstrapMode False | ||
319 | restartBootstrap r | ||
320 | -- | ||
321 | -- Hopefully 'forkPollForRefresh' was invoked and can take over | ||
322 | -- maintenance. | ||
323 | |||
324 | |||
325 | effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime | ||
326 | effectiveRefreshInterval BucketRefresher{ refreshInterval | ||
327 | , refreshBuckets | ||
328 | , bootstrapMode } num = do | ||
329 | tbl <- readTVar refreshBuckets | ||
330 | bootstrapping <- readTVar bootstrapMode | ||
331 | case bootstrapping of | ||
332 | False -> return refreshInterval | ||
333 | True -> do | ||
334 | -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. | ||
335 | let fullcount = R.defaultBucketSize | ||
336 | count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl | ||
337 | if count == fullcount | ||
338 | then return refreshInterval | ||
339 | else return 15 -- seconds | ||
340 | |||
341 | |||
342 | |||
343 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | ||
344 | -- changes. This will typically be invoked from 'tblTransition'. | ||
345 | -- | ||
346 | -- From BEP 05: | ||
347 | -- | ||
348 | -- > Each bucket should maintain a "last changed" property to indicate how | ||
349 | -- > "fresh" the contents are. | ||
350 | -- | ||
351 | -- We will use a "time to next refresh" property instead and store it in | ||
352 | -- a priority search queue. | ||
353 | -- | ||
354 | -- In detail using an expository (not actually implemented) type | ||
355 | -- 'BucketTouchEvent'... | ||
356 | -- | ||
357 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
358 | -- >>> bucketEvents = | ||
359 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
360 | -- >>> | ||
361 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
362 | -- >>> | ||
363 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
364 | -- >>> , Applicant :--> Accepted -- with another node, | ||
365 | -- >>> ] | ||
366 | -- | ||
367 | -- the bucket's last changed property should be updated. Buckets that have not | ||
368 | -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." | ||
369 | -- This is done by picking a random ID in the range of the bucket and | ||
370 | -- performing a find_nodes search on it. | ||
371 | -- | ||
372 | -- The only other possible BucketTouchEvents are as follows: | ||
373 | -- | ||
374 | -- >>> not_handled = | ||
375 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
376 | -- >>> -- (Applicant :--> Stranger) | ||
377 | -- >>> -- (Applicant :--> Accepted) | ||
378 | -- >>> , Accepted :--> Applicant -- Never happens | ||
379 | -- >>> ] | ||
380 | -- | ||
381 | -- Because this BucketTouchEvent type is not actually implemented and we only | ||
382 | -- receive notifications of a node's new state, it suffices to reschedule the | ||
383 | -- bucket refresh 'touchBucket' on every transition to a state other than | ||
384 | -- 'Applicant'. | ||
385 | -- | ||
386 | -- XXX: Unfortunately, this means redundantly triggering twice upon every node | ||
387 | -- replacement because we do not currently distinguish between standalone | ||
388 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
389 | -- replacement. | ||
390 | -- | ||
391 | -- It might also be better to pass the timestamp of the transition here and | ||
392 | -- keep the refresh queue in better sync with the routing table by updating it | ||
393 | -- within the STM monad. | ||
394 | -- | ||
395 | -- We embed the result in the STM monad but currently, no STM state changes | ||
396 | -- occur until the returned IO action is invoked. TODO: simplify? | ||
397 | touchBucket :: SensibleNodeId nid ni | ||
398 | => BucketRefresher nid ni | ||
399 | -> RoutingTransition ni -- ^ What happened to the bucket? | ||
400 | -> STM (IO ()) | ||
401 | touchBucket r@BucketRefresher{ refreshSearch | ||
402 | , refreshInterval | ||
403 | , refreshBuckets | ||
404 | , refreshQueue | ||
405 | , refreshLastTouch | ||
406 | , bootstrapMode | ||
407 | , bootstrapCountdown } | ||
408 | RoutingTransition{ transitionedTo | ||
409 | , transitioningNode } | ||
410 | = case transitionedTo of | ||
411 | Applicant -> return $ return () -- Ignore transition to applicant. | ||
412 | _ -> return $ do -- Reschedule for any other transition. | ||
413 | now <- getPOSIXTime | ||
414 | join $ atomically $ do | ||
415 | let space = searchSpace refreshSearch | ||
416 | nid = kademliaLocation space transitioningNode | ||
417 | tbl <- readTVar refreshBuckets | ||
418 | let num = R.bucketNumber space nid tbl | ||
419 | stamp <- readTVar refreshLastTouch | ||
420 | action <- case stamp /= 0 && (now - stamp > 60) of | ||
421 | True -> do | ||
422 | -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. | ||
423 | restartBootstrap r | ||
424 | False -> return $ return () | ||
425 | interval <- effectiveRefreshInterval r num | ||
426 | modifyTVar' refreshQueue $ Int.insert num (now + interval) | ||
427 | writeTVar refreshLastTouch now | ||
428 | return action | ||
429 | |||
430 | refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni | ||
431 | refreshKademlia r@BucketRefresher { refreshSearch = sch | ||
432 | , refreshPing = ping | ||
433 | , refreshBuckets = bkts | ||
434 | } | ||
435 | = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping) | ||
436 | { tblTransition = \tr -> do | ||
437 | io <- touchBucket r tr | ||
438 | return io | ||
439 | } | ||
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs new file mode 100644 index 00000000..601be5d8 --- /dev/null +++ b/kad/src/Network/Kademlia/CommonAPI.hs | |||
@@ -0,0 +1,84 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | ||
2 | module Network.Kademlia.CommonAPI where | ||
3 | |||
4 | |||
5 | import Control.Concurrent | ||
6 | import Control.Concurrent.STM | ||
7 | import Data.Aeson as J (FromJSON, ToJSON) | ||
8 | import Data.Hashable | ||
9 | import qualified Data.Map as Map | ||
10 | import Data.Serialize as S | ||
11 | import qualified Data.Set as Set | ||
12 | import Data.Time.Clock.POSIX | ||
13 | import Data.Typeable | ||
14 | |||
15 | import Network.Kademlia.Search | ||
16 | import Network.Kademlia.Routing as R | ||
17 | import Crypto.Tox (SecretKey,PublicKey) | ||
18 | |||
19 | data DHT = forall nid ni. ( Show ni | ||
20 | , Read ni | ||
21 | , ToJSON ni | ||
22 | , FromJSON ni | ||
23 | , Ord ni | ||
24 | , Hashable ni | ||
25 | , Show nid | ||
26 | , Ord nid | ||
27 | , Hashable nid | ||
28 | , Typeable ni | ||
29 | , S.Serialize nid | ||
30 | ) => | ||
31 | DHT | ||
32 | { dhtBuckets :: TVar (BucketList ni) | ||
33 | , dhtSecretKey :: STM (Maybe SecretKey) | ||
34 | , dhtPing :: Map.Map String (DHTPing ni) | ||
35 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | ||
36 | , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid) | ||
37 | , dhtParseId :: String -> Either String nid | ||
38 | , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) | ||
39 | , dhtFallbackNodes :: IO [ni] | ||
40 | , dhtBootstrap :: [ni] -> [ni] -> IO () | ||
41 | } | ||
42 | |||
43 | data DHTQuery nid ni = forall addr r tok. | ||
44 | ( Ord addr | ||
45 | , Typeable r | ||
46 | , Typeable tok | ||
47 | , Typeable ni | ||
48 | ) => DHTQuery | ||
49 | { qsearch :: Search nid addr tok ni r | ||
50 | , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. | ||
51 | , qshowR :: r -> String | ||
52 | , qshowTok :: tok -> Maybe String | ||
53 | } | ||
54 | |||
55 | data DHTAnnouncable nid = forall dta tok ni r. | ||
56 | ( Show r | ||
57 | , Typeable dta -- information being announced | ||
58 | , Typeable tok -- token | ||
59 | , Typeable r -- search result | ||
60 | , Typeable ni -- node | ||
61 | ) => DHTAnnouncable | ||
62 | { announceParseData :: String -> Either String dta | ||
63 | , announceParseToken :: dta -> String -> Either String tok | ||
64 | , announceParseAddress :: String -> Either String ni | ||
65 | , announceSendData :: Either ( String {- search name -} | ||
66 | , String -> Either String r | ||
67 | , PublicKey {- me -} -> dta -> r -> IO ()) | ||
68 | (dta -> tok -> Maybe ni -> IO (Maybe r)) | ||
69 | , announceInterval :: POSIXTime | ||
70 | , announceTarget :: dta -> nid | ||
71 | } | ||
72 | |||
73 | data DHTSearch nid ni = forall addr tok r. DHTSearch | ||
74 | { searchThread :: ThreadId | ||
75 | , searchState :: SearchState nid addr tok ni r | ||
76 | , searchShowTok :: tok -> Maybe String | ||
77 | , searchResults :: TVar (Set.Set String) | ||
78 | } | ||
79 | |||
80 | data DHTPing ni = forall r. DHTPing | ||
81 | { pingQuery :: [String] -> ni -> IO (Maybe r) | ||
82 | , pingShowResult :: r -> String | ||
83 | } | ||
84 | |||
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs new file mode 100644 index 00000000..32ec169d --- /dev/null +++ b/kad/src/Network/Kademlia/Persistence.hs | |||
@@ -0,0 +1,52 @@ | |||
1 | {-# LANGUAGE NamedFieldPuns #-} | ||
2 | {-# LANGUAGE OverloadedStrings #-} | ||
3 | module Network.Kademlia.Persistence where | ||
4 | |||
5 | import Network.Kademlia.CommonAPI | ||
6 | import Network.Kademlia.Routing as R | ||
7 | |||
8 | import Control.Concurrent.STM | ||
9 | import qualified Data.Aeson as J | ||
10 | ;import Data.Aeson as J (FromJSON) | ||
11 | import qualified Data.ByteString.Lazy as L | ||
12 | import qualified Data.HashMap.Strict as HashMap | ||
13 | import Data.List | ||
14 | import qualified Data.Vector as V | ||
15 | import System.IO.Error | ||
16 | |||
17 | saveNodes :: String -> DHT -> IO () | ||
18 | saveNodes netname DHT{dhtBuckets} = do | ||
19 | bkts <- atomically $ readTVar dhtBuckets | ||
20 | let ns = map fst $ concat $ R.toList bkts | ||
21 | bs = J.encode ns | ||
22 | fname = nodesFileName netname | ||
23 | L.writeFile fname bs | ||
24 | |||
25 | loadNodes :: FromJSON ni => String -> IO [ni] | ||
26 | loadNodes netname = do | ||
27 | let fname = nodesFileName netname | ||
28 | attempt <- tryIOError $ do | ||
29 | J.decode <$> L.readFile fname | ||
30 | >>= maybe (ioError $ userError "Nothing") return | ||
31 | either (const $ fallbackLoad fname) return attempt | ||
32 | |||
33 | nodesFileName :: String -> String | ||
34 | nodesFileName netname = netname ++ "-nodes.json" | ||
35 | |||
36 | fallbackLoad :: FromJSON t => FilePath -> IO [t] | ||
37 | fallbackLoad fname = do | ||
38 | attempt <- tryIOError $ do | ||
39 | J.decode <$> L.readFile fname | ||
40 | >>= maybe (ioError $ userError "Nothing") return | ||
41 | let go r = do | ||
42 | let m = HashMap.lookup "nodes" (r :: J.Object) | ||
43 | ns0 = case m of Just (J.Array v) -> V.toList v | ||
44 | Nothing -> [] | ||
45 | ns1 = zip (map J.fromJSON ns0) ns0 | ||
46 | issuc (J.Error _,_) = False | ||
47 | issuc _ = True | ||
48 | (ss,fs) = partition issuc ns1 | ||
49 | ns = map (\(J.Success n,_) -> n) ss | ||
50 | mapM_ print (map snd fs) >> return ns | ||
51 | either (const $ return []) go attempt | ||
52 | |||
diff --git a/kad/src/Network/Kademlia/Routing.hs b/kad/src/Network/Kademlia/Routing.hs new file mode 100644 index 00000000..c7fdf028 --- /dev/null +++ b/kad/src/Network/Kademlia/Routing.hs | |||
@@ -0,0 +1,809 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- (c) Joe Crayne 2017 | ||
4 | -- License : BSD3 | ||
5 | -- Maintainer : pxqr.sta@gmail.com | ||
6 | -- Stability : experimental | ||
7 | -- Portability : portable | ||
8 | -- | ||
9 | -- Every node maintains a routing table of known good nodes. The | ||
10 | -- nodes in the routing table are used as starting points for | ||
11 | -- queries in the DHT. Nodes from the routing table are returned in | ||
12 | -- response to queries from other nodes. | ||
13 | -- | ||
14 | -- For more info see: | ||
15 | -- <http://www.bittorrent.org/beps/bep_0005.html#routing-table> | ||
16 | -- | ||
17 | {-# LANGUAGE CPP #-} | ||
18 | {-# LANGUAGE RecordWildCards #-} | ||
19 | {-# LANGUAGE BangPatterns #-} | ||
20 | {-# LANGUAGE RankNTypes #-} | ||
21 | {-# LANGUAGE ViewPatterns #-} | ||
22 | {-# LANGUAGE TypeOperators #-} | ||
23 | {-# LANGUAGE DeriveGeneric #-} | ||
24 | {-# LANGUAGE DeriveFunctor #-} | ||
25 | {-# LANGUAGE GADTs #-} | ||
26 | {-# LANGUAGE ScopedTypeVariables #-} | ||
27 | {-# LANGUAGE TupleSections #-} | ||
28 | {-# LANGUAGE OverloadedStrings #-} | ||
29 | {-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} | ||
30 | {-# OPTIONS_GHC -fno-warn-orphans #-} | ||
31 | module Network.Kademlia.Routing | ||
32 | {- | ||
33 | ( -- * BucketList | ||
34 | BucketList | ||
35 | , Info(..) | ||
36 | |||
37 | -- * Attributes | ||
38 | , BucketCount | ||
39 | , defaultBucketCount | ||
40 | , BucketSize | ||
41 | , defaultBucketSize | ||
42 | , NodeCount | ||
43 | |||
44 | -- * Query | ||
45 | , Network.Kademlia.Routing.null | ||
46 | , Network.Kademlia.Routing.full | ||
47 | , thisId | ||
48 | , shape | ||
49 | , Network.Kademlia.Routing.size | ||
50 | , Network.Kademlia.Routing.depth | ||
51 | , compatibleNodeId | ||
52 | |||
53 | -- * Lookup | ||
54 | , K | ||
55 | , defaultK | ||
56 | , TableKey (..) | ||
57 | , kclosest | ||
58 | |||
59 | -- * Construction | ||
60 | , Network.Kademlia.Routing.nullTable | ||
61 | , Event(..) | ||
62 | , CheckPing(..) | ||
63 | , Network.Kademlia.Routing.insert | ||
64 | |||
65 | -- * Conversion | ||
66 | , Network.Kademlia.Routing.TableEntry | ||
67 | , Network.Kademlia.Routing.toList | ||
68 | |||
69 | -- * Routing | ||
70 | , Timestamp | ||
71 | , getTimestamp | ||
72 | ) -} where | ||
73 | |||
74 | import Control.Applicative as A | ||
75 | import Control.Arrow | ||
76 | import Control.Monad | ||
77 | import Data.Function | ||
78 | import Data.Functor.Contravariant | ||
79 | import Data.Functor.Identity | ||
80 | import Data.List as L hiding (insert) | ||
81 | import Data.Maybe | ||
82 | import Data.Monoid | ||
83 | import Data.Wrapper.PSQ as PSQ | ||
84 | import Data.Serialize as S hiding (Result, Done) | ||
85 | import qualified Data.Sequence as Seq | ||
86 | import Data.Time | ||
87 | import Data.Time.Clock.POSIX | ||
88 | import Data.Word | ||
89 | import GHC.Generics | ||
90 | import Text.PrettyPrint as PP hiding ((<>)) | ||
91 | import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) | ||
92 | import qualified Data.ByteString as BS | ||
93 | import Data.Bits | ||
94 | import Data.Ord | ||
95 | import Data.Reflection | ||
96 | import Network.Address | ||
97 | import Data.Typeable | ||
98 | import Data.Coerce | ||
99 | import Data.Hashable | ||
100 | |||
101 | |||
102 | -- | Last time the node was responding to our queries. | ||
103 | -- | ||
104 | -- Not all nodes that we learn about are equal. Some are \"good\" and | ||
105 | -- some are not. Many nodes using the DHT are able to send queries | ||
106 | -- and receive responses, but are not able to respond to queries | ||
107 | -- from other nodes. It is important that each node's routing table | ||
108 | -- must contain only known good nodes. A good node is a node has | ||
109 | -- responded to one of our queries within the last 15 minutes. A | ||
110 | -- node is also good if it has ever responded to one of our queries | ||
111 | -- and has sent us a query within the last 15 minutes. After 15 | ||
112 | -- minutes of inactivity, a node becomes questionable. Nodes become | ||
113 | -- bad when they fail to respond to multiple queries in a row. Nodes | ||
114 | -- that we know are good are given priority over nodes with unknown | ||
115 | -- status. | ||
116 | -- | ||
117 | type Timestamp = POSIXTime | ||
118 | |||
119 | getTimestamp :: IO Timestamp | ||
120 | getTimestamp = do | ||
121 | utcTime <- getCurrentTime | ||
122 | return $ utcTimeToPOSIXSeconds utcTime | ||
123 | |||
124 | |||
125 | |||
126 | {----------------------------------------------------------------------- | ||
127 | Bucket | ||
128 | -----------------------------------------------------------------------} | ||
129 | -- | ||
130 | -- When a k-bucket is full and a new node is discovered for that | ||
131 | -- k-bucket, the least recently seen node in the k-bucket is | ||
132 | -- PINGed. If the node is found to be still alive, the new node is | ||
133 | -- place in a secondary list, a replacement cache. The replacement | ||
134 | -- cache is used only if a node in the k-bucket stops responding. In | ||
135 | -- other words: new nodes are used only when older nodes disappear. | ||
136 | |||
137 | -- | Timestamp - last time this node is pinged. | ||
138 | type NodeEntry ni = Binding ni Timestamp | ||
139 | |||
140 | |||
141 | -- | Maximum number of 'NodeInfo's stored in a bucket. Most clients | ||
142 | -- use this value. | ||
143 | defaultBucketSize :: Int | ||
144 | defaultBucketSize = 8 | ||
145 | |||
146 | data QueueMethods m elem fifo = QueueMethods | ||
147 | { pushBack :: elem -> fifo -> m fifo | ||
148 | , popFront :: fifo -> m (Maybe elem, fifo) | ||
149 | , emptyQueue :: m fifo | ||
150 | } | ||
151 | |||
152 | {- | ||
153 | fromQ :: Functor m => | ||
154 | ( a -> b ) | ||
155 | -> ( b -> a ) | ||
156 | -> QueueMethods m elem a | ||
157 | -> QueueMethods m elem b | ||
158 | fromQ embed project QueueMethods{..} = | ||
159 | QueueMethods { pushBack = \e -> fmap embed . pushBack e . project | ||
160 | , popFront = fmap (second embed) . popFront . project | ||
161 | , emptyQueue = fmap embed emptyQueue | ||
162 | } | ||
163 | -} | ||
164 | |||
165 | seqQ :: QueueMethods Identity ni (Seq.Seq ni) | ||
166 | seqQ = QueueMethods | ||
167 | { pushBack = \e fifo -> pure (fifo Seq.|> e) | ||
168 | , popFront = \fifo -> case Seq.viewl fifo of | ||
169 | e Seq.:< fifo' -> pure (Just e, fifo') | ||
170 | Seq.EmptyL -> pure (Nothing, Seq.empty) | ||
171 | , emptyQueue = pure Seq.empty | ||
172 | } | ||
173 | |||
174 | type BucketQueue ni = Seq.Seq ni | ||
175 | |||
176 | bucketQ :: QueueMethods Identity ni (BucketQueue ni) | ||
177 | bucketQ = seqQ | ||
178 | |||
179 | |||
180 | data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) | ||
181 | |||
182 | contramapC :: (b -> a) -> Compare a -> Compare b | ||
183 | contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) | ||
184 | (\s x -> hsh s (f x)) | ||
185 | |||
186 | newtype Ordered' s a = Ordered a | ||
187 | deriving (Show) | ||
188 | |||
189 | -- | Hack to avoid UndecidableInstances | ||
190 | newtype Shrink a = Shrink a | ||
191 | deriving (Show) | ||
192 | |||
193 | type Ordered s a = Ordered' s (Shrink a) | ||
194 | |||
195 | instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where | ||
196 | a == b = (compare a b == EQ) | ||
197 | |||
198 | instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where | ||
199 | compare a b = cmp (coerce a) (coerce b) | ||
200 | where Compare cmp _ = reflect (Proxy :: Proxy s) | ||
201 | |||
202 | instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where | ||
203 | hashWithSalt salt x = hash salt (coerce x) | ||
204 | where Compare _ hash = reflect (Proxy :: Proxy s) | ||
205 | |||
206 | -- | Bucket is also limited in its length — thus it's called k-bucket. | ||
207 | -- When bucket becomes full, we should split it in two lists by | ||
208 | -- current span bit. Span bit is defined by depth in the routing | ||
209 | -- table tree. Size of the bucket should be choosen such that it's | ||
210 | -- very unlikely that all nodes in bucket fail within an hour of | ||
211 | -- each other. | ||
212 | data Bucket s ni = Bucket | ||
213 | { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes | ||
214 | , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs | ||
215 | } deriving (Generic) | ||
216 | |||
217 | #define CAN_SHOW_BUCKET 0 | ||
218 | |||
219 | #if CAN_SHOW_BUCKET | ||
220 | deriving instance Show ni => Show (Bucket s ni) | ||
221 | #endif | ||
222 | |||
223 | bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni | ||
224 | bucketCompare _ = reflect (Proxy :: Proxy s) | ||
225 | |||
226 | mapBucket :: ( Reifies s (Compare a) | ||
227 | , Reifies t (Compare ni) | ||
228 | ) => (a -> ni) -> Bucket s a -> Bucket t ni | ||
229 | mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) | ||
230 | (fmap (second f) q) | ||
231 | where f' = coerce . f . coerce | ||
232 | |||
233 | |||
234 | #if 0 | ||
235 | |||
236 | {- | ||
237 | getGenericNode :: ( Serialize (NodeId) | ||
238 | , Serialize ip | ||
239 | , Serialize u | ||
240 | ) => Get (NodeInfo) | ||
241 | getGenericNode = do | ||
242 | nid <- get | ||
243 | naddr <- get | ||
244 | u <- get | ||
245 | return NodeInfo | ||
246 | { nodeId = nid | ||
247 | , nodeAddr = naddr | ||
248 | , nodeAnnotation = u | ||
249 | } | ||
250 | |||
251 | putGenericNode :: ( Serialize (NodeId) | ||
252 | , Serialize ip | ||
253 | , Serialize u | ||
254 | ) => NodeInfo -> Put | ||
255 | putGenericNode (NodeInfo nid naddr u) = do | ||
256 | put nid | ||
257 | put naddr | ||
258 | put u | ||
259 | |||
260 | instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where | ||
261 | get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) | ||
262 | put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes | ||
263 | -} | ||
264 | |||
265 | #endif | ||
266 | |||
267 | psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p | ||
268 | psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs | ||
269 | |||
270 | psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] | ||
271 | psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq | ||
272 | |||
273 | -- | Update interval, in seconds. | ||
274 | delta :: NominalDiffTime | ||
275 | delta = 15 * 60 | ||
276 | |||
277 | -- | Should maintain a set of stable long running nodes. | ||
278 | -- | ||
279 | -- Note: pings are triggerd only when a bucket is full. | ||
280 | updateBucketForInbound :: ( Coercible t1 t | ||
281 | , Alternative f | ||
282 | , Reifies s (Compare t1) | ||
283 | ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1) | ||
284 | updateBucketForInbound curTime info bucket | ||
285 | -- Just update timestamp if a node is already in bucket. | ||
286 | -- | ||
287 | -- Note PingResult events should only occur for nodes we requested a ping for, | ||
288 | -- and those will always already be in the routing queue and will get their | ||
289 | -- timestamp updated here, since 'TryInsert' is called on every inbound packet, | ||
290 | -- including ping results. | ||
291 | | already_have | ||
292 | = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) | ||
293 | -- bucket is good, but not full => we can insert a new node | ||
294 | | PSQ.size (bktNodes bucket) < defaultBucketSize | ||
295 | = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) | ||
296 | -- If there are any questionable nodes in the bucket have not been | ||
297 | -- seen in the last 15 minutes, the least recently seen node is | ||
298 | -- pinged. If any nodes in the bucket are known to have become bad, | ||
299 | -- then one is replaced by the new node in the next insertBucket | ||
300 | -- iteration. | ||
301 | | not (L.null stales) | ||
302 | = pure ( stales | ||
303 | , bucket { -- Update timestamps so that we don't redundantly ping. | ||
304 | bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket | ||
305 | -- Update queue with the pending NodeInfo in case of ping fail. | ||
306 | , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) | ||
307 | -- When the bucket is full of good nodes, the new node is simply discarded. | ||
308 | -- We must return 'A.empty' here to ensure that bucket splitting happens | ||
309 | -- inside 'modifyBucket'. | ||
310 | | otherwise = A.empty | ||
311 | where | ||
312 | -- We (take 1) to keep a 1-to-1 correspondence between pending pings and | ||
313 | -- waiting nodes in the bktQ. This way, we don't have to worry about what | ||
314 | -- to do with failed pings for which there is no ready replacements. | ||
315 | stales = -- One stale: | ||
316 | do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) | ||
317 | guard (t < curTime - delta) | ||
318 | return $ coerce n | ||
319 | -- All stale: | ||
320 | -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket | ||
321 | |||
322 | already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) | ||
323 | |||
324 | map_ns f = bucket { bktNodes = f (bktNodes bucket) } | ||
325 | -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } | ||
326 | |||
327 | updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) => | ||
328 | a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a) | ||
329 | updateBucketForPingResult bad_node got_response bucket | ||
330 | = pure ( map (,Nothing) forgotten | ||
331 | ++ map (second Just) replacements | ||
332 | , Bucket (foldr replace | ||
333 | (bktNodes bucket) | ||
334 | replacements) | ||
335 | popped | ||
336 | ) | ||
337 | where | ||
338 | (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) | ||
339 | |||
340 | -- Dropped from accepted, replaced by pending. | ||
341 | replacements | got_response = [] -- Timestamp was already updated by TryInsert. | ||
342 | | Just info <- top = do | ||
343 | -- Insert only if there's a removal. | ||
344 | _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) | ||
345 | return (bad_node, info) | ||
346 | | otherwise = [] | ||
347 | |||
348 | -- Dropped from the pending queue without replacing. | ||
349 | forgotten | got_response = maybeToList $ fmap snd top | ||
350 | | otherwise = [] | ||
351 | |||
352 | |||
353 | replace (bad_node, (tm, info)) = | ||
354 | PSQ.insert (coerce info) tm | ||
355 | . PSQ.delete (coerce bad_node) | ||
356 | |||
357 | |||
358 | updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp | ||
359 | updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales | ||
360 | |||
361 | type BitIx = Word | ||
362 | |||
363 | partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) | ||
364 | partitionQ imp test q0 = do | ||
365 | pass0 <- emptyQueue imp | ||
366 | fail0 <- emptyQueue imp | ||
367 | let flipfix a b f = fix f a b | ||
368 | flipfix q0 (pass0,fail0) $ \rec q qs -> do | ||
369 | (mb,q') <- popFront imp q | ||
370 | case mb of | ||
371 | Nothing -> return qs | ||
372 | Just e -> do qs' <- select (pushBack imp e) qs | ||
373 | rec q' qs' | ||
374 | where | ||
375 | select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) | ||
376 | select f = if test e then \(a,b) -> flip (,) b <$> f a | ||
377 | else \(a,b) -> (,) a <$> f b | ||
378 | |||
379 | |||
380 | |||
381 | split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
382 | forall ni s. ( Reifies s (Compare ni) ) => | ||
383 | (ni -> Word -> Bool) | ||
384 | -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) | ||
385 | split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) | ||
386 | where | ||
387 | (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b | ||
388 | (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b | ||
389 | |||
390 | spanBit :: ni -> Bool | ||
391 | spanBit entry = testNodeIdBit entry i | ||
392 | |||
393 | |||
394 | {----------------------------------------------------------------------- | ||
395 | -- BucketList | ||
396 | -----------------------------------------------------------------------} | ||
397 | |||
398 | defaultBucketCount :: Int | ||
399 | defaultBucketCount = 20 | ||
400 | |||
401 | defaultMaxBucketCount :: Word | ||
402 | defaultMaxBucketCount = 24 | ||
403 | |||
404 | data Info ni nid = Info | ||
405 | { myBuckets :: BucketList ni | ||
406 | , myNodeId :: nid | ||
407 | , myAddress :: SockAddr | ||
408 | } | ||
409 | deriving Generic | ||
410 | |||
411 | deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) | ||
412 | deriving instance (Show ni, Show nid) => Show (Info ni nid) | ||
413 | |||
414 | -- instance (Eq ip, Serialize ip) => Serialize (Info ip) | ||
415 | |||
416 | -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ | ||
417 | -- 160. The routing table is subdivided into 'Bucket's that each cover | ||
418 | -- a portion of the space. An empty table has one bucket with an ID | ||
419 | -- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" | ||
420 | -- is inserted into the table, it is placed within the bucket that has | ||
421 | -- @min <= N < max@. An empty table has only one bucket so any node | ||
422 | -- must fit within it. Each bucket can only hold 'K' nodes, currently | ||
423 | -- eight, before becoming 'Full'. When a bucket is full of known good | ||
424 | -- nodes, no more nodes may be added unless our own 'NodeId' falls | ||
425 | -- within the range of the 'Bucket'. In that case, the bucket is | ||
426 | -- replaced by two new buckets each with half the range of the old | ||
427 | -- bucket and the nodes from the old bucket are distributed among the | ||
428 | -- two new ones. For a new table with only one bucket, the full bucket | ||
429 | -- is always split into two new buckets covering the ranges @0..2 ^ | ||
430 | -- 159@ and @2 ^ 159..2 ^ 160@. | ||
431 | -- | ||
432 | data BucketList ni = forall s. Reifies s (Compare ni) => | ||
433 | BucketList { thisNode :: !ni | ||
434 | -- | Non-empty list of buckets. | ||
435 | , buckets :: [Bucket s ni] | ||
436 | } | ||
437 | |||
438 | mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b | ||
439 | mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) | ||
440 | $ \p -> BucketList | ||
441 | { thisNode = f self | ||
442 | , buckets = map (resolve p . mapBucket f) bkts | ||
443 | } | ||
444 | where | ||
445 | resolve :: Proxy s -> Bucket s ni -> Bucket s ni | ||
446 | resolve = const id | ||
447 | |||
448 | instance (Eq ni) => Eq (BucketList ni) where | ||
449 | (==) = (==) `on` Network.Kademlia.Routing.toList | ||
450 | |||
451 | #if 0 | ||
452 | |||
453 | instance Serialize NominalDiffTime where | ||
454 | put = putWord32be . fromIntegral . fromEnum | ||
455 | get = (toEnum . fromIntegral) <$> getWord32be | ||
456 | |||
457 | #endif | ||
458 | |||
459 | #if CAN_SHOW_BUCKET | ||
460 | deriving instance (Show ni) => Show (BucketList ni) | ||
461 | #else | ||
462 | instance Show ni => Show (BucketList ni) where | ||
463 | showsPrec d (BucketList self bkts) = | ||
464 | mappend "BucketList " | ||
465 | . showsPrec (d+1) self | ||
466 | . mappend " (fromList " | ||
467 | . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) | ||
468 | . mappend ") " | ||
469 | #endif | ||
470 | |||
471 | #if 0 | ||
472 | |||
473 | -- | Normally, routing table should be saved between invocations of | ||
474 | -- the client software. Note that you don't need to store /this/ | ||
475 | -- 'NodeId' since it is already included in routing table. | ||
476 | instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) | ||
477 | |||
478 | #endif | ||
479 | |||
480 | -- | Shape of the table. | ||
481 | instance Pretty (BucketList ni) where | ||
482 | pPrint t | ||
483 | | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss | ||
484 | | otherwise = brackets $ | ||
485 | PP.int (L.sum ss) <> " nodes, " <> | ||
486 | PP.int bucketCount <> " buckets" | ||
487 | where | ||
488 | bucketCount = L.length ss | ||
489 | ss = shape t | ||
490 | |||
491 | -- | Empty table with specified /spine/ node id. | ||
492 | -- | ||
493 | -- XXX: The comparison function argument is awkward here. | ||
494 | nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni | ||
495 | nullTable cmp hsh ni n = | ||
496 | reify (Compare cmp hsh) | ||
497 | $ \p -> BucketList | ||
498 | ni | ||
499 | [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] | ||
500 | where | ||
501 | empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp | ||
502 | empty = const $ PSQ.empty | ||
503 | |||
504 | #if 0 | ||
505 | |||
506 | -- | Test if table is empty. In this case DHT should start | ||
507 | -- bootstrapping process until table becomes 'full'. | ||
508 | null :: BucketList -> Bool | ||
509 | null (Tip _ _ b) = PSQ.null $ bktNodes b | ||
510 | null _ = False | ||
511 | |||
512 | -- | Test if table have maximum number of nodes. No more nodes can be | ||
513 | -- 'insert'ed, except old ones becomes bad. | ||
514 | full :: BucketList -> Bool | ||
515 | full (Tip _ n _) = n == 0 | ||
516 | full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
517 | full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
518 | |||
519 | -- | Get the /spine/ node id. | ||
520 | thisId :: BucketList -> NodeId | ||
521 | thisId (Tip nid _ _) = nid | ||
522 | thisId (Zero table _) = thisId table | ||
523 | thisId (One _ table) = thisId table | ||
524 | |||
525 | -- | Number of nodes in a bucket or a table. | ||
526 | type NodeCount = Int | ||
527 | |||
528 | #endif | ||
529 | |||
530 | -- | Internally, routing table is similar to list of buckets or a | ||
531 | -- /matrix/ of nodes. This function returns the shape of the matrix. | ||
532 | shape :: BucketList ni -> [Int] | ||
533 | shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl | ||
534 | |||
535 | #if 0 | ||
536 | |||
537 | -- | Get number of nodes in the table. | ||
538 | size :: BucketList -> NodeCount | ||
539 | size = L.sum . shape | ||
540 | |||
541 | -- | Get number of buckets in the table. | ||
542 | depth :: BucketList -> BucketCount | ||
543 | depth = L.length . shape | ||
544 | |||
545 | #endif | ||
546 | |||
547 | lookupBucket :: forall ni nid x. | ||
548 | ( -- FiniteBits nid | ||
549 | Ord nid | ||
550 | ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x | ||
551 | lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts | ||
552 | where | ||
553 | d = kademliaXor space nid (kademliaLocation space self) | ||
554 | |||
555 | go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] | ||
556 | go i bs (bucket : buckets) | ||
557 | | kademliaTestBit space d i = bucket : buckets ++ bs | ||
558 | | otherwise = go (succ i) (bucket:bs) buckets | ||
559 | go _ bs [] = bs | ||
560 | |||
561 | bucketNumber :: forall ni nid. | ||
562 | KademliaSpace nid ni -> nid -> BucketList ni -> Int | ||
563 | bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts | ||
564 | where | ||
565 | d = kademliaXor space nid (kademliaLocation space self) | ||
566 | |||
567 | go :: Word -> [Bucket s ni] -> Word | ||
568 | go i (bucket : buckets) | ||
569 | | kademliaTestBit space d i = i | ||
570 | | otherwise = go (succ i) buckets | ||
571 | go i [] = i | ||
572 | |||
573 | |||
574 | compatibleNodeId :: forall ni nid. | ||
575 | ( Serialize nid, FiniteBits nid) => | ||
576 | (ni -> nid) -> BucketList ni -> IO nid | ||
577 | compatibleNodeId nodeId tbl = genBucketSample prefix br | ||
578 | where | ||
579 | br = bucketRange (L.length (shape tbl) - 1) True | ||
580 | nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 | ||
581 | bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 | ||
582 | prefix = either error id $ S.decode bs | ||
583 | |||
584 | tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] | ||
585 | tablePrefix testbit = map (packByte . take 8 . (++repeat False)) | ||
586 | . chunksOf 8 | ||
587 | . tableBits testbit | ||
588 | where | ||
589 | packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] | ||
590 | bitmask ix True = bit ix | ||
591 | bitmask _ _ = 0 | ||
592 | |||
593 | tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] | ||
594 | tableBits testbit (BucketList self bkts) = | ||
595 | zipWith const (map (testbit self) [0..]) | ||
596 | bkts | ||
597 | |||
598 | selfNode :: BucketList ni -> ni | ||
599 | selfNode (BucketList self _) = self | ||
600 | |||
601 | chunksOf :: Int -> [e] -> [[e]] | ||
602 | chunksOf i ls = map (take i) (build (splitter ls)) where | ||
603 | splitter :: [e] -> ([e] -> a -> a) -> a -> a | ||
604 | splitter [] _ n = n | ||
605 | splitter l c n = l `c` splitter (drop i l) c n | ||
606 | |||
607 | build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] | ||
608 | build g = g (:) [] | ||
609 | |||
610 | |||
611 | |||
612 | -- | Count of closest nodes in find_node reply. | ||
613 | type K = Int | ||
614 | |||
615 | -- | Default 'K' is equal to 'defaultBucketSize'. | ||
616 | defaultK :: K | ||
617 | defaultK = 8 | ||
618 | |||
619 | #if 0 | ||
620 | class TableKey dht k where | ||
621 | toNodeId :: k -> NodeId | ||
622 | |||
623 | instance TableKey dht (NodeId) where | ||
624 | toNodeId = id | ||
625 | |||
626 | #endif | ||
627 | |||
628 | -- | In Kademlia, the distance metric is XOR and the result is | ||
629 | -- interpreted as an unsigned integer. | ||
630 | newtype NodeDistance nodeid = NodeDistance nodeid | ||
631 | deriving (Eq, Ord) | ||
632 | |||
633 | -- | distance(A,B) = |A xor B| Smaller values are closer. | ||
634 | distance :: Bits nid => nid -> nid -> NodeDistance nid | ||
635 | distance a b = NodeDistance $ xor a b | ||
636 | |||
637 | -- | Order by closeness: nearest nodes first. | ||
638 | rank :: ( Ord nid | ||
639 | ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] | ||
640 | rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) | ||
641 | |||
642 | |||
643 | -- | Get a list of /K/ closest nodes using XOR metric. Used in | ||
644 | -- 'find_node' and 'get_peers' queries. | ||
645 | kclosest :: ( -- FiniteBits nid | ||
646 | Ord nid | ||
647 | ) => | ||
648 | KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] | ||
649 | kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) | ||
650 | ++ rank space nid (L.concat everyone) | ||
651 | where | ||
652 | (bucket,everyone) = | ||
653 | L.splitAt 1 | ||
654 | . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) | ||
655 | $ tbl | ||
656 | |||
657 | |||
658 | |||
659 | {----------------------------------------------------------------------- | ||
660 | -- Routing | ||
661 | -----------------------------------------------------------------------} | ||
662 | |||
663 | splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
664 | ( Reifies s (Compare ni) ) => | ||
665 | (ni -> Word -> Bool) | ||
666 | -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] | ||
667 | splitTip testNodeBit ni i bucket | ||
668 | | testNodeBit ni i = [zeros , ones ] | ||
669 | | otherwise = [ones , zeros ] | ||
670 | where | ||
671 | (ones, zeros) = split testNodeBit i bucket | ||
672 | |||
673 | -- | Used in each query. | ||
674 | -- | ||
675 | -- TODO: Kademlia non-empty subtrees should should split if they have less than | ||
676 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | ||
677 | -- paper. The rule requiring additional splits is in section 2.4. | ||
678 | modifyBucket | ||
679 | :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
680 | forall ni nid xs. | ||
681 | KademliaSpace nid ni | ||
682 | -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) | ||
683 | modifyBucket space nid f (BucketList self bkts) | ||
684 | = second (BucketList self) <$> go (0 :: BitIx) bkts | ||
685 | where | ||
686 | d = kademliaXor space nid (kademliaLocation space self) | ||
687 | |||
688 | -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) | ||
689 | |||
690 | go !i (bucket : buckets@(_:_)) | ||
691 | | kademliaTestBit space d i = second (: buckets) <$> f bucket | ||
692 | | otherwise = second (bucket :) <$> go (succ i) buckets | ||
693 | |||
694 | go !i [bucket] = second (: []) <$> f bucket <|> gosplit | ||
695 | where | ||
696 | gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space | ||
697 | . kademliaLocation space ) | ||
698 | self | ||
699 | i | ||
700 | bucket) | ||
701 | | otherwise = Nothing -- Limit the number of buckets. | ||
702 | |||
703 | |||
704 | bktCount :: BucketList ni -> Int | ||
705 | bktCount (BucketList _ bkts) = L.length bkts | ||
706 | |||
707 | -- | Triggering event for atomic table update | ||
708 | data Event ni = TryInsert { foreignNode :: ni } | ||
709 | | PingResult { foreignNode :: ni , ponged :: Bool } | ||
710 | |||
711 | #if 0 | ||
712 | deriving instance Eq (NodeId) => Eq (Event) | ||
713 | deriving instance ( Show ip | ||
714 | , Show (NodeId) | ||
715 | , Show u | ||
716 | ) => Show (Event) | ||
717 | |||
718 | #endif | ||
719 | |||
720 | eventId :: (ni -> nid) -> Event ni -> nid | ||
721 | eventId nodeId (TryInsert ni) = nodeId ni | ||
722 | eventId nodeId (PingResult ni _) = nodeId ni | ||
723 | |||
724 | |||
725 | -- | Actions requested by atomic table update | ||
726 | data CheckPing ni = CheckPing [ni] | ||
727 | |||
728 | #if 0 | ||
729 | |||
730 | deriving instance Eq (NodeId) => Eq (CheckPing) | ||
731 | deriving instance ( Show ip | ||
732 | , Show (NodeId) | ||
733 | , Show u | ||
734 | ) => Show (CheckPing) | ||
735 | |||
736 | #endif | ||
737 | |||
738 | |||
739 | -- | Call on every inbound packet (including requested ping results). | ||
740 | -- Returns a triple (was_inserted, to_ping, tbl') where | ||
741 | -- | ||
742 | -- [ /was_inserted/ ] True if the node was added to the routing table. | ||
743 | -- | ||
744 | -- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. | ||
745 | -- This will be empty if /was_inserted/, but a non-inserted node | ||
746 | -- may be added to a replacement queue and will be inserted if | ||
747 | -- one of the items in this list time out. | ||
748 | -- | ||
749 | -- [ /tbl'/ ] The updated routing 'BucketList'. | ||
750 | -- | ||
751 | updateForInbound :: | ||
752 | KademliaSpace nid ni | ||
753 | -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) | ||
754 | updateForInbound space tm ni tbl@(BucketList _ bkts) = | ||
755 | maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) | ||
756 | $ modifyBucket space | ||
757 | (kademliaLocation space ni) | ||
758 | (updateBucketForInbound tm ni) | ||
759 | tbl | ||
760 | |||
761 | -- | Update the routing table with the results of a ping. | ||
762 | -- | ||
763 | -- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the | ||
764 | -- routing table and the node /b/, with timestamp /tm/, has taken its place. | ||
765 | updateForPingResult :: | ||
766 | KademliaSpace nid ni | ||
767 | -> ni -- ^ The pinged node. | ||
768 | -> Bool -- ^ True if we got a reply, False if it timed out. | ||
769 | -> BucketList ni -- ^ The routing table. | ||
770 | -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) | ||
771 | updateForPingResult space ni got_reply tbl = | ||
772 | fromMaybe ([],tbl) | ||
773 | $ modifyBucket space | ||
774 | (kademliaLocation space ni) | ||
775 | (updateBucketForPingResult ni got_reply) | ||
776 | tbl | ||
777 | |||
778 | |||
779 | {----------------------------------------------------------------------- | ||
780 | -- Conversion | ||
781 | -----------------------------------------------------------------------} | ||
782 | |||
783 | type TableEntry ni = (ni, Timestamp) | ||
784 | |||
785 | tableEntry :: NodeEntry ni -> TableEntry ni | ||
786 | tableEntry (a :-> b) = (a, b) | ||
787 | |||
788 | toList :: BucketList ni -> [[TableEntry ni]] | ||
789 | toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts | ||
790 | |||
791 | data KademliaSpace nid ni = KademliaSpace | ||
792 | { -- | Given a node record (probably including IP address), yields a | ||
793 | -- kademlia xor-metric location. | ||
794 | kademliaLocation :: ni -> nid | ||
795 | -- | Used when comparing locations. This is similar to | ||
796 | -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so | ||
797 | -- that 0 is the most significant bit. | ||
798 | , kademliaTestBit :: nid -> Word -> Bool | ||
799 | -- | The Kademlia xor-metric. | ||
800 | , kademliaXor :: nid -> nid -> nid | ||
801 | |||
802 | , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid | ||
803 | } | ||
804 | |||
805 | instance Contravariant (KademliaSpace nid) where | ||
806 | contramap f ks = ks | ||
807 | { kademliaLocation = kademliaLocation ks . f | ||
808 | } | ||
809 | |||
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs new file mode 100644 index 00000000..1be1afc1 --- /dev/null +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -0,0 +1,236 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE PatternSynonyms #-} | ||
3 | {-# LANGUAGE RecordWildCards #-} | ||
4 | {-# LANGUAGE ScopedTypeVariables #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | module Network.Kademlia.Search where | ||
8 | |||
9 | import Control.Concurrent.Tasks | ||
10 | import Control.Concurrent.STM | ||
11 | import Control.Monad | ||
12 | import Data.Function | ||
13 | import Data.Maybe | ||
14 | import qualified Data.Set as Set | ||
15 | ;import Data.Set (Set) | ||
16 | import Data.Hashable (Hashable(..)) -- for type sigs | ||
17 | import System.IO.Error | ||
18 | |||
19 | import qualified Data.MinMaxPSQ as MM | ||
20 | ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') | ||
21 | import qualified Data.Wrapper.PSQ as PSQ | ||
22 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) | ||
23 | import Network.Kademlia.Routing as R | ||
24 | #ifdef THREAD_DEBUG | ||
25 | import Control.Concurrent.Lifted.Instrument | ||
26 | #else | ||
27 | import Control.Concurrent.Lifted | ||
28 | import GHC.Conc (labelThread) | ||
29 | #endif | ||
30 | |||
31 | data Search nid addr tok ni r = Search | ||
32 | { searchSpace :: KademliaSpace nid ni | ||
33 | , searchNodeAddress :: ni -> addr | ||
34 | , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) | ||
35 | (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) | ||
36 | , searchAlpha :: Int -- α = 8 | ||
37 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | ||
38 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | ||
39 | -- need to ensure that closer non-responding queries don't completely push out | ||
40 | -- farther away queries. | ||
41 | -- | ||
42 | -- For BitTorrent, setting them both 8 was not an issue, but that is no longer | ||
43 | -- supported because now the number of remembered informants is now the | ||
44 | -- difference between these two numbers. So, if searchK = 16 and searchAlpha = | ||
45 | -- 4, then the number of remembered query responses is 12. | ||
46 | , searchK :: Int -- K = 16 | ||
47 | } | ||
48 | |||
49 | data SearchState nid addr tok ni r = SearchState | ||
50 | { -- | The number of pending queries. Incremented before any query is sent | ||
51 | -- and decremented when we get a reply. | ||
52 | searchPendingCount :: TVar Int | ||
53 | -- | Nodes scheduled to be queried (roughly at most K). | ||
54 | , searchQueued :: TVar (MinMaxPSQ ni nid) | ||
55 | -- | The nearest (K - α) nodes that issued a reply. | ||
56 | -- | ||
57 | -- α is the maximum number of simultaneous queries. | ||
58 | , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) | ||
59 | -- | This tracks already-queried addresses so we avoid bothering them | ||
60 | -- again. XXX: We could probably keep only the pending queries in this | ||
61 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | ||
62 | -- should limit the number of outstanding queries. | ||
63 | , searchVisited :: TVar (Set addr) | ||
64 | , searchSpec :: Search nid addr tok ni r | ||
65 | } | ||
66 | |||
67 | |||
68 | newSearch :: ( Ord addr | ||
69 | , PSQKey nid | ||
70 | , PSQKey ni | ||
71 | ) => | ||
72 | {- | ||
73 | KademliaSpace nid ni | ||
74 | -> (ni -> addr) | ||
75 | -> (ni -> IO ([ni], [r])) -- the query action. | ||
76 | -> (r -> STM Bool) -- receives search results. | ||
77 | -> nid -- target of search | ||
78 | -} | ||
79 | Search nid addr tok ni r | ||
80 | -> nid | ||
81 | -> [ni] -- Initial nodes to query. | ||
82 | -> STM (SearchState nid addr tok ni r) | ||
83 | newSearch s@(Search space nAddr qry _ _) target ns = do | ||
84 | c <- newTVar 0 | ||
85 | q <- newTVar $ MM.fromList | ||
86 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | ||
87 | $ ns | ||
88 | i <- newTVar MM.empty | ||
89 | v <- newTVar Set.empty | ||
90 | return -- (Search space nAddr qry) , r , target | ||
91 | ( SearchState c q i v s ) | ||
92 | |||
93 | -- | Discard a value from a key-priority-value tuple. This is useful for | ||
94 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". | ||
95 | stripValue :: Binding' k p v -> Binding k p | ||
96 | stripValue (Binding ni _ nid) = (ni :-> nid) | ||
97 | |||
98 | -- | Reset a 'SearchState' object to ready it for a repeated search. | ||
99 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | ||
100 | (nid -> STM [ni]) | ||
101 | -> Search nid addr1 tok1 ni r1 | ||
102 | -> nid | ||
103 | -> SearchState nid addr tok ni r | ||
104 | -> STM (SearchState nid addr tok ni r) | ||
105 | reset nearestNodes qsearch target st = do | ||
106 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | ||
107 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | ||
108 | <$> nearestNodes target | ||
109 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | ||
110 | writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes | ||
111 | writeTVar (searchInformant st) MM.empty | ||
112 | writeTVar (searchVisited st) Set.empty | ||
113 | writeTVar (searchPendingCount st) 0 | ||
114 | return st | ||
115 | |||
116 | sendAsyncQuery :: forall addr nid tok ni r. | ||
117 | ( Ord addr | ||
118 | , PSQKey nid | ||
119 | , PSQKey ni | ||
120 | , Show nid | ||
121 | ) => | ||
122 | Search nid addr tok ni r | ||
123 | -> nid | ||
124 | -> (r -> STM Bool) -- ^ return False to terminate the search. | ||
125 | -> SearchState nid addr tok ni r | ||
126 | -> Binding ni nid | ||
127 | -> TaskGroup | ||
128 | -> IO () | ||
129 | sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = | ||
130 | case searchQuery of | ||
131 | Left blockingQuery -> | ||
132 | forkTask g "searchQuery" $ do | ||
133 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | ||
134 | reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) | ||
135 | atomically $ do | ||
136 | modifyTVar searchPendingCount pred | ||
137 | maybe (return ()) go reply | ||
138 | Right nonblockingQuery -> do | ||
139 | nonblockingQuery searchTarget ni $ \reply -> | ||
140 | atomically $ do | ||
141 | modifyTVar searchPendingCount pred | ||
142 | maybe (return ()) go reply | ||
143 | where | ||
144 | go (ns,rs,tok) = do | ||
145 | vs <- readTVar searchVisited | ||
146 | -- We only queue a node if it is not yet visited | ||
147 | let insertFoundNode :: Int | ||
148 | -> ni | ||
149 | -> MinMaxPSQ ni nid | ||
150 | -> MinMaxPSQ ni nid | ||
151 | insertFoundNode k n q | ||
152 | | searchNodeAddress n `Set.member` vs | ||
153 | = q | ||
154 | | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget | ||
155 | $ kademliaLocation searchSpace n ) | ||
156 | q | ||
157 | |||
158 | qsize0 <- MM.size <$> readTVar searchQueued | ||
159 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow | ||
160 | -- only when there's fewer than | ||
161 | -- K elements. | ||
162 | modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns | ||
163 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | ||
164 | flip fix rs $ \loop -> \case | ||
165 | r:rs' -> do | ||
166 | wanting <- searchResult r | ||
167 | if wanting then loop rs' | ||
168 | else searchCancel sch | ||
169 | [] -> return () | ||
170 | |||
171 | |||
172 | searchIsFinished :: ( PSQKey nid | ||
173 | , PSQKey ni | ||
174 | ) => SearchState nid addr tok ni r -> STM Bool | ||
175 | searchIsFinished SearchState{..} = do | ||
176 | q <- readTVar searchQueued | ||
177 | cnt <- readTVar searchPendingCount | ||
178 | informants <- readTVar searchInformant | ||
179 | return $ cnt == 0 | ||
180 | && ( MM.null q | ||
181 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) | ||
182 | && ( PSQ.prio (fromJust $ MM.findMax informants) | ||
183 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
184 | |||
185 | searchCancel :: SearchState nid addr tok ni r -> STM () | ||
186 | searchCancel SearchState{..} = do | ||
187 | writeTVar searchPendingCount 0 | ||
188 | writeTVar searchQueued MM.empty | ||
189 | |||
190 | search :: | ||
191 | ( Ord r | ||
192 | , Ord addr | ||
193 | , PSQKey nid | ||
194 | , PSQKey ni | ||
195 | , Show nid | ||
196 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | ||
197 | search sch buckets target result = do | ||
198 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | ||
199 | st <- atomically $ newSearch sch target ns | ||
200 | forkIO $ searchLoop sch target result st | ||
201 | return st | ||
202 | |||
203 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | ||
204 | => Search nid addr tok ni r -- ^ Query and distance methods. | ||
205 | -> nid -- ^ The target we are searching for. | ||
206 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | ||
207 | -> SearchState nid addr tok ni r -- ^ Search-related state. | ||
208 | -> IO () | ||
209 | searchLoop sch@Search{..} target result s@SearchState{..} = do | ||
210 | myThreadId >>= flip labelThread ("search."++show target) | ||
211 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | ||
212 | join $ atomically $ do | ||
213 | cnt <- readTVar $ searchPendingCount | ||
214 | check (cnt <= 8) -- Only 8 pending queries at a time. | ||
215 | informants <- readTVar searchInformant | ||
216 | found <- MM.minView <$> readTVar searchQueued | ||
217 | case found of | ||
218 | Just (ni :-> d, q) | ||
219 | | -- If there's fewer than /k - α/ informants and there's any | ||
220 | -- node we haven't yet got a response from. | ||
221 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) | ||
222 | -- Or there's no informants yet at all. | ||
223 | || MM.null informants | ||
224 | -- Or if the closest scheduled node is nearer than the | ||
225 | -- nearest /k/ informants. | ||
226 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | ||
227 | -> -- Then the search continues, send a query. | ||
228 | do writeTVar searchQueued q | ||
229 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | ||
230 | modifyTVar searchPendingCount succ | ||
231 | return $ do | ||
232 | sendAsyncQuery sch target result s (ni :-> d) g | ||
233 | again | ||
234 | _ -> -- Otherwise, we are finished. | ||
235 | do check (cnt == 0) | ||
236 | return $ return () | ||