diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /src/Network/Kademlia | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 437 | ||||
-rw-r--r-- | src/Network/Kademlia/CommonAPI.hs | 84 | ||||
-rw-r--r-- | src/Network/Kademlia/Persistence.hs | 51 | ||||
-rw-r--r-- | src/Network/Kademlia/Routing.hs | 808 | ||||
-rw-r--r-- | src/Network/Kademlia/Search.hs | 236 |
5 files changed, 0 insertions, 1616 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs deleted file mode 100644 index 1324ae77..00000000 --- a/src/Network/Kademlia/Bootstrap.hs +++ /dev/null | |||
@@ -1,437 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE ConstraintKinds #-} | ||
3 | {-# LANGUAGE DeriveFunctor #-} | ||
4 | {-# LANGUAGE DeriveTraversable #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE GADTs #-} | ||
7 | {-# LANGUAGE KindSignatures #-} | ||
8 | {-# LANGUAGE LambdaCase #-} | ||
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
10 | {-# LANGUAGE PartialTypeSignatures #-} | ||
11 | {-# LANGUAGE PatternSynonyms #-} | ||
12 | {-# LANGUAGE RankNTypes #-} | ||
13 | {-# LANGUAGE ScopedTypeVariables #-} | ||
14 | module Network.Kademlia.Bootstrap where | ||
15 | |||
16 | import Data.Function | ||
17 | import Data.Maybe | ||
18 | import qualified Data.Set as Set | ||
19 | import Data.Time.Clock.POSIX (getPOSIXTime) | ||
20 | import Network.Kademlia.Routing as R | ||
21 | #ifdef THREAD_DEBUG | ||
22 | import Control.Concurrent.Lifted.Instrument | ||
23 | #else | ||
24 | import Control.Concurrent.Lifted | ||
25 | import GHC.Conc (labelThread) | ||
26 | #endif | ||
27 | import Control.Concurrent.STM | ||
28 | import Control.Monad | ||
29 | import Data.Hashable | ||
30 | import Data.Time.Clock.POSIX (POSIXTime) | ||
31 | import Data.Ord | ||
32 | import System.Entropy | ||
33 | import System.Timeout | ||
34 | import DPut | ||
35 | import DebugTag | ||
36 | |||
37 | import qualified Data.Wrapper.PSQInt as Int | ||
38 | ;import Data.Wrapper.PSQInt (pattern (:->)) | ||
39 | import Network.Address (bucketRange) | ||
40 | import Network.Kademlia.Search | ||
41 | import Control.Concurrent.Tasks | ||
42 | import Network.Kademlia | ||
43 | |||
44 | type SensibleNodeId nid ni = | ||
45 | ( Show nid | ||
46 | , Ord nid | ||
47 | , Ord ni | ||
48 | , Hashable nid | ||
49 | , Hashable ni ) | ||
50 | |||
51 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | ||
52 | { -- | A staleness threshold (if a bucket goes this long without being | ||
53 | -- touched, a refresh will be triggered). | ||
54 | refreshInterval :: POSIXTime | ||
55 | -- | A TVar with the time-to-refresh schedule for each bucket. | ||
56 | -- | ||
57 | -- To "touch" a bucket and prevent it from being refreshed, reschedule | ||
58 | -- its refresh time to some time into the future by modifying its | ||
59 | -- priority in this priority search queue. | ||
60 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | ||
61 | -- | This is the kademlia node search specification. | ||
62 | , refreshSearch :: Search nid addr tok ni ni | ||
63 | -- | The current kademlia routing table buckets. | ||
64 | , refreshBuckets :: TVar (R.BucketList ni) | ||
65 | -- | Action to ping a node. This is used only during initial bootstrap | ||
66 | -- to get some nodes in our table. A 'True' result is interpreted as a a | ||
67 | -- pong, where 'False' is a non-response. | ||
68 | , refreshPing :: ni -> IO Bool | ||
69 | , -- | Timestamp of last bucket event. | ||
70 | refreshLastTouch :: TVar POSIXTime | ||
71 | , -- | This variable indicates whether or not we are in bootstrapping mode. | ||
72 | bootstrapMode :: TVar Bool | ||
73 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | ||
74 | -- every finished refresh. | ||
75 | bootstrapCountdown :: TVar (Maybe Int) | ||
76 | } | ||
77 | |||
78 | newBucketRefresher :: ( Ord addr, Hashable addr | ||
79 | , SensibleNodeId nid ni ) | ||
80 | => TVar (R.BucketList ni) | ||
81 | -> Search nid addr tok ni ni | ||
82 | -> (ni -> IO Bool) | ||
83 | -> STM (BucketRefresher nid ni) | ||
84 | newBucketRefresher bkts sch ping = do | ||
85 | let spc = searchSpace sch | ||
86 | nodeId = kademliaLocation spc | ||
87 | -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount | ||
88 | sched <- newTVar Int.empty | ||
89 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | ||
90 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | ||
91 | bootstrapCnt <- newTVar Nothing | ||
92 | return BucketRefresher | ||
93 | { refreshInterval = 15 * 60 | ||
94 | , refreshQueue = sched | ||
95 | , refreshSearch = sch | ||
96 | , refreshBuckets = bkts | ||
97 | , refreshPing = ping | ||
98 | , refreshLastTouch = lasttouch | ||
99 | , bootstrapMode = bootstrapVar | ||
100 | , bootstrapCountdown = bootstrapCnt | ||
101 | } | ||
102 | |||
103 | -- | This was added to avoid the compile error "Record update for | ||
104 | -- insufficiently polymorphic field" when trying to update the existentially | ||
105 | -- quantified field 'refreshSearch'. | ||
106 | updateRefresherIO :: Ord addr | ||
107 | => Search nid addr tok ni ni | ||
108 | -> (ni -> IO Bool) | ||
109 | -> BucketRefresher nid ni -> BucketRefresher nid ni | ||
110 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | ||
111 | { refreshSearch = sch | ||
112 | , refreshPing = ping | ||
113 | , refreshInterval = refreshInterval | ||
114 | , refreshBuckets = refreshBuckets | ||
115 | , refreshQueue = refreshQueue | ||
116 | , refreshLastTouch = refreshLastTouch | ||
117 | , bootstrapMode = bootstrapMode | ||
118 | , bootstrapCountdown = bootstrapCountdown | ||
119 | } | ||
120 | |||
121 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | ||
122 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | ||
123 | forkPollForRefresh r@BucketRefresher{ refreshInterval | ||
124 | , refreshQueue | ||
125 | , refreshBuckets | ||
126 | , refreshSearch } = fork $ do | ||
127 | myThreadId >>= flip labelThread "pollForRefresh" | ||
128 | fix $ \again -> do | ||
129 | join $ atomically $ do | ||
130 | nextup <- Int.findMin <$> readTVar refreshQueue | ||
131 | maybe retry (return . go again) nextup | ||
132 | where | ||
133 | refresh :: Int -> IO Int | ||
134 | refresh n = do | ||
135 | -- dput XRefresh $ "Refresh time! "++ show n | ||
136 | refreshBucket r n | ||
137 | |||
138 | go again ( bktnum :-> refresh_time ) = do | ||
139 | now <- getPOSIXTime | ||
140 | case fromEnum (refresh_time - now) of | ||
141 | x | x <= 0 -> do -- Refresh time! | ||
142 | -- Move it to the back of the refresh queue. | ||
143 | atomically $ do | ||
144 | interval <- effectiveRefreshInterval r bktnum | ||
145 | modifyTVar' refreshQueue | ||
146 | $ Int.insert bktnum (now + interval) | ||
147 | -- Now fork the refresh operation. | ||
148 | -- TODO: We should probably propogate the kill signal to this thread. | ||
149 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
150 | _ <- refresh bktnum | ||
151 | return () | ||
152 | return () | ||
153 | picoseconds -> do | ||
154 | -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum | ||
155 | threadDelay ( picoseconds `div` 10^6 ) | ||
156 | again | ||
157 | |||
158 | |||
159 | -- | This is a helper to 'refreshBucket' which does some book keeping to decide | ||
160 | -- whether or not a bucket is sufficiently refreshed or not. It will return | ||
161 | -- false when we can terminate a node search. | ||
162 | checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. | ||
163 | -> TVar (BucketList ni) -- ^ The current routing table. | ||
164 | -> TVar (Set.Set ni) -- ^ In-range nodes found so far. | ||
165 | -> TVar Bool -- ^ The result will also be written here. | ||
166 | -> Int -- ^ The bucket number of interest. | ||
167 | -> ni -- ^ A newly found node. | ||
168 | -> STM Bool | ||
169 | checkBucketFull space var resultCounter fin n found_node = do | ||
170 | let fullcount = R.defaultBucketSize | ||
171 | saveit True = writeTVar fin True >> return True | ||
172 | saveit _ = return False | ||
173 | tbl <- readTVar var | ||
174 | let counts = R.shape tbl | ||
175 | nid = kademliaLocation space found_node | ||
176 | -- Update the result set with every found node that is in the | ||
177 | -- bucket of interest. | ||
178 | when (n == R.bucketNumber space nid tbl) | ||
179 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
180 | resultCount <- readTVar resultCounter | ||
181 | saveit $ case drop (n - 1) counts of | ||
182 | (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going | ||
183 | _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going | ||
184 | _ -> False -- okay, good enough, let's quit. | ||
185 | |||
186 | -- | Called from 'refreshBucket' with the current time when a refresh of the | ||
187 | -- supplied bucket number finishes. | ||
188 | onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) | ||
189 | onFinishedRefresh BucketRefresher { bootstrapCountdown | ||
190 | , bootstrapMode | ||
191 | , refreshQueue | ||
192 | , refreshBuckets } num now = do | ||
193 | bootstrapping <- readTVar bootstrapMode | ||
194 | if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num | ||
195 | else do | ||
196 | tbl <- readTVar refreshBuckets | ||
197 | action <- | ||
198 | if num /= R.bktCount tbl - 1 | ||
199 | then do modifyTVar' bootstrapCountdown (fmap pred) | ||
200 | return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" | ||
201 | else do | ||
202 | -- The last bucket finished. | ||
203 | cnt <- readTVar bootstrapCountdown | ||
204 | case cnt of | ||
205 | Nothing -> do | ||
206 | let fullsize = R.defaultBucketSize | ||
207 | notfull (n,len) | n==num = False | ||
208 | | len>=fullsize = False | ||
209 | | otherwise = True | ||
210 | unfull = case filter notfull $ zip [0..] (R.shape tbl) of | ||
211 | [] -> [(0,0)] -- Schedule at least 1 more refresh. | ||
212 | xs -> xs | ||
213 | forM_ unfull $ \(n,_) -> do | ||
214 | -- Schedule immediate refresh for unfull buckets (other than this one). | ||
215 | modifyTVar' refreshQueue $ Int.insert n (now - 1) | ||
216 | writeTVar bootstrapCountdown $! Just $! length unfull | ||
217 | return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull | ||
218 | Just n -> do writeTVar bootstrapCountdown $! Just $! pred n | ||
219 | return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" | ||
220 | cnt <- readTVar bootstrapCountdown | ||
221 | if (cnt == Just 0) | ||
222 | then do | ||
223 | -- Boostrap finished! | ||
224 | writeTVar bootstrapMode False | ||
225 | writeTVar bootstrapCountdown Nothing | ||
226 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | ||
227 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | ||
228 | |||
229 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | ||
230 | BucketRefresher nid ni -> Int -> IO Int | ||
231 | refreshBucket r@BucketRefresher{ refreshSearch = sch | ||
232 | , refreshBuckets = var } | ||
233 | n = do | ||
234 | tbl <- atomically (readTVar var) | ||
235 | let count = bktCount tbl | ||
236 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
237 | sample <- if n+1 >= count -- Is this the last bucket? | ||
238 | then return nid -- Yes? Search our own id. | ||
239 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
240 | getEntropy | ||
241 | nid | ||
242 | (bucketRange n (n + 1 < count)) | ||
243 | fin <- atomically $ newTVar False | ||
244 | resultCounter <- atomically $ newTVar Set.empty | ||
245 | |||
246 | dput XRefresh $ "Start refresh " ++ show (n,sample) | ||
247 | |||
248 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
249 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
250 | then const $ return True -- Never short-circuit the last bucket. | ||
251 | else checkBucketFull (searchSpace sch) var resultCounter fin n | ||
252 | _ <- timeout (15*60*1000000) $ do | ||
253 | atomically $ searchIsFinished s >>= check | ||
254 | atomically $ searchCancel s | ||
255 | dput XDHT $ "Finish refresh " ++ show (n,sample) | ||
256 | now <- getPOSIXTime | ||
257 | join $ atomically $ onFinishedRefresh r n now | ||
258 | rcount <- atomically $ do | ||
259 | c <- Set.size <$> readTVar resultCounter | ||
260 | b <- readTVar fin | ||
261 | return $ if b then 1 else c | ||
262 | return rcount | ||
263 | |||
264 | refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () | ||
265 | refreshLastBucket r@BucketRefresher { refreshBuckets | ||
266 | , refreshQueue } = do | ||
267 | |||
268 | now <- getPOSIXTime | ||
269 | atomically $ do | ||
270 | cnt <- bktCount <$> readTVar refreshBuckets | ||
271 | -- Schedule immediate refresh. | ||
272 | modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) | ||
273 | |||
274 | restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => | ||
275 | BucketRefresher nid ni -> STM (IO ()) | ||
276 | restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do | ||
277 | unchanged <- readTVar bootstrapMode | ||
278 | writeTVar bootstrapMode True | ||
279 | writeTVar bootstrapCountdown Nothing | ||
280 | if not unchanged then return $ do | ||
281 | dput XRefresh "BOOTSTRAP entered bootstrap mode" | ||
282 | refreshLastBucket r | ||
283 | else return $ dput XRefresh "BOOTSTRAP already bootstrapping" | ||
284 | |||
285 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | ||
286 | BucketRefresher nid ni | ||
287 | -> t1 ni -- ^ Nodes to bootstrap from. | ||
288 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. | ||
289 | -> IO () | ||
290 | bootstrap r@BucketRefresher { refreshSearch = sch | ||
291 | , refreshBuckets = var | ||
292 | , refreshPing = ping | ||
293 | , bootstrapMode } ns ns0 = do | ||
294 | gotPing <- atomically $ newTVar False | ||
295 | |||
296 | -- First, ping the given nodes so that they are added to | ||
297 | -- our routing table. | ||
298 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
299 | forM_ ns $ \n -> do | ||
300 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
301 | forkTask g lbl $ do | ||
302 | b <- ping n | ||
303 | when b $ atomically $ writeTVar gotPing True | ||
304 | |||
305 | -- We resort to the hardcoded fallback nodes only when we got no | ||
306 | -- responses. This is to lesson the burden on well-known boostrap | ||
307 | -- nodes. | ||
308 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
309 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
310 | forM_ ns0 $ \n -> do | ||
311 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
312 | (void $ ping n) | ||
313 | dput XDHT "Finished bootstrap pings." | ||
314 | -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. | ||
315 | join $ atomically $ do | ||
316 | writeTVar bootstrapMode False | ||
317 | restartBootstrap r | ||
318 | -- | ||
319 | -- Hopefully 'forkPollForRefresh' was invoked and can take over | ||
320 | -- maintenance. | ||
321 | |||
322 | |||
323 | effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime | ||
324 | effectiveRefreshInterval BucketRefresher{ refreshInterval | ||
325 | , refreshBuckets | ||
326 | , bootstrapMode } num = do | ||
327 | tbl <- readTVar refreshBuckets | ||
328 | bootstrapping <- readTVar bootstrapMode | ||
329 | case bootstrapping of | ||
330 | False -> return refreshInterval | ||
331 | True -> do | ||
332 | -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. | ||
333 | let fullcount = R.defaultBucketSize | ||
334 | count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl | ||
335 | if count == fullcount | ||
336 | then return refreshInterval | ||
337 | else return 15 -- seconds | ||
338 | |||
339 | |||
340 | |||
341 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | ||
342 | -- changes. This will typically be invoked from 'tblTransition'. | ||
343 | -- | ||
344 | -- From BEP 05: | ||
345 | -- | ||
346 | -- > Each bucket should maintain a "last changed" property to indicate how | ||
347 | -- > "fresh" the contents are. | ||
348 | -- | ||
349 | -- We will use a "time to next refresh" property instead and store it in | ||
350 | -- a priority search queue. | ||
351 | -- | ||
352 | -- In detail using an expository (not actually implemented) type | ||
353 | -- 'BucketTouchEvent'... | ||
354 | -- | ||
355 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
356 | -- >>> bucketEvents = | ||
357 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
358 | -- >>> | ||
359 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
360 | -- >>> | ||
361 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
362 | -- >>> , Applicant :--> Accepted -- with another node, | ||
363 | -- >>> ] | ||
364 | -- | ||
365 | -- the bucket's last changed property should be updated. Buckets that have not | ||
366 | -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." | ||
367 | -- This is done by picking a random ID in the range of the bucket and | ||
368 | -- performing a find_nodes search on it. | ||
369 | -- | ||
370 | -- The only other possible BucketTouchEvents are as follows: | ||
371 | -- | ||
372 | -- >>> not_handled = | ||
373 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
374 | -- >>> -- (Applicant :--> Stranger) | ||
375 | -- >>> -- (Applicant :--> Accepted) | ||
376 | -- >>> , Accepted :--> Applicant -- Never happens | ||
377 | -- >>> ] | ||
378 | -- | ||
379 | -- Because this BucketTouchEvent type is not actually implemented and we only | ||
380 | -- receive notifications of a node's new state, it suffices to reschedule the | ||
381 | -- bucket refresh 'touchBucket' on every transition to a state other than | ||
382 | -- 'Applicant'. | ||
383 | -- | ||
384 | -- XXX: Unfortunately, this means redundantly triggering twice upon every node | ||
385 | -- replacement because we do not currently distinguish between standalone | ||
386 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
387 | -- replacement. | ||
388 | -- | ||
389 | -- It might also be better to pass the timestamp of the transition here and | ||
390 | -- keep the refresh queue in better sync with the routing table by updating it | ||
391 | -- within the STM monad. | ||
392 | -- | ||
393 | -- We embed the result in the STM monad but currently, no STM state changes | ||
394 | -- occur until the returned IO action is invoked. TODO: simplify? | ||
395 | touchBucket :: SensibleNodeId nid ni | ||
396 | => BucketRefresher nid ni | ||
397 | -> RoutingTransition ni -- ^ What happened to the bucket? | ||
398 | -> STM (IO ()) | ||
399 | touchBucket r@BucketRefresher{ refreshSearch | ||
400 | , refreshInterval | ||
401 | , refreshBuckets | ||
402 | , refreshQueue | ||
403 | , refreshLastTouch | ||
404 | , bootstrapMode | ||
405 | , bootstrapCountdown } | ||
406 | RoutingTransition{ transitionedTo | ||
407 | , transitioningNode } | ||
408 | = case transitionedTo of | ||
409 | Applicant -> return $ return () -- Ignore transition to applicant. | ||
410 | _ -> return $ do -- Reschedule for any other transition. | ||
411 | now <- getPOSIXTime | ||
412 | join $ atomically $ do | ||
413 | let space = searchSpace refreshSearch | ||
414 | nid = kademliaLocation space transitioningNode | ||
415 | tbl <- readTVar refreshBuckets | ||
416 | let num = R.bucketNumber space nid tbl | ||
417 | stamp <- readTVar refreshLastTouch | ||
418 | action <- case stamp /= 0 && (now - stamp > 60) of | ||
419 | True -> do | ||
420 | -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. | ||
421 | restartBootstrap r | ||
422 | False -> return $ return () | ||
423 | interval <- effectiveRefreshInterval r num | ||
424 | modifyTVar' refreshQueue $ Int.insert num (now + interval) | ||
425 | writeTVar refreshLastTouch now | ||
426 | return action | ||
427 | |||
428 | refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni | ||
429 | refreshKademlia r@BucketRefresher { refreshSearch = sch | ||
430 | , refreshPing = ping | ||
431 | , refreshBuckets = bkts | ||
432 | } | ||
433 | = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping) | ||
434 | { tblTransition = \tr -> do | ||
435 | io <- touchBucket r tr | ||
436 | return io | ||
437 | } | ||
diff --git a/src/Network/Kademlia/CommonAPI.hs b/src/Network/Kademlia/CommonAPI.hs deleted file mode 100644 index 601be5d8..00000000 --- a/src/Network/Kademlia/CommonAPI.hs +++ /dev/null | |||
@@ -1,84 +0,0 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | ||
2 | module Network.Kademlia.CommonAPI where | ||
3 | |||
4 | |||
5 | import Control.Concurrent | ||
6 | import Control.Concurrent.STM | ||
7 | import Data.Aeson as J (FromJSON, ToJSON) | ||
8 | import Data.Hashable | ||
9 | import qualified Data.Map as Map | ||
10 | import Data.Serialize as S | ||
11 | import qualified Data.Set as Set | ||
12 | import Data.Time.Clock.POSIX | ||
13 | import Data.Typeable | ||
14 | |||
15 | import Network.Kademlia.Search | ||
16 | import Network.Kademlia.Routing as R | ||
17 | import Crypto.Tox (SecretKey,PublicKey) | ||
18 | |||
19 | data DHT = forall nid ni. ( Show ni | ||
20 | , Read ni | ||
21 | , ToJSON ni | ||
22 | , FromJSON ni | ||
23 | , Ord ni | ||
24 | , Hashable ni | ||
25 | , Show nid | ||
26 | , Ord nid | ||
27 | , Hashable nid | ||
28 | , Typeable ni | ||
29 | , S.Serialize nid | ||
30 | ) => | ||
31 | DHT | ||
32 | { dhtBuckets :: TVar (BucketList ni) | ||
33 | , dhtSecretKey :: STM (Maybe SecretKey) | ||
34 | , dhtPing :: Map.Map String (DHTPing ni) | ||
35 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | ||
36 | , dhtAnnouncables :: Map.Map String (DHTAnnouncable nid) | ||
37 | , dhtParseId :: String -> Either String nid | ||
38 | , dhtSearches :: TVar (Map.Map (String,nid) (DHTSearch nid ni)) | ||
39 | , dhtFallbackNodes :: IO [ni] | ||
40 | , dhtBootstrap :: [ni] -> [ni] -> IO () | ||
41 | } | ||
42 | |||
43 | data DHTQuery nid ni = forall addr r tok. | ||
44 | ( Ord addr | ||
45 | , Typeable r | ||
46 | , Typeable tok | ||
47 | , Typeable ni | ||
48 | ) => DHTQuery | ||
49 | { qsearch :: Search nid addr tok ni r | ||
50 | , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. | ||
51 | , qshowR :: r -> String | ||
52 | , qshowTok :: tok -> Maybe String | ||
53 | } | ||
54 | |||
55 | data DHTAnnouncable nid = forall dta tok ni r. | ||
56 | ( Show r | ||
57 | , Typeable dta -- information being announced | ||
58 | , Typeable tok -- token | ||
59 | , Typeable r -- search result | ||
60 | , Typeable ni -- node | ||
61 | ) => DHTAnnouncable | ||
62 | { announceParseData :: String -> Either String dta | ||
63 | , announceParseToken :: dta -> String -> Either String tok | ||
64 | , announceParseAddress :: String -> Either String ni | ||
65 | , announceSendData :: Either ( String {- search name -} | ||
66 | , String -> Either String r | ||
67 | , PublicKey {- me -} -> dta -> r -> IO ()) | ||
68 | (dta -> tok -> Maybe ni -> IO (Maybe r)) | ||
69 | , announceInterval :: POSIXTime | ||
70 | , announceTarget :: dta -> nid | ||
71 | } | ||
72 | |||
73 | data DHTSearch nid ni = forall addr tok r. DHTSearch | ||
74 | { searchThread :: ThreadId | ||
75 | , searchState :: SearchState nid addr tok ni r | ||
76 | , searchShowTok :: tok -> Maybe String | ||
77 | , searchResults :: TVar (Set.Set String) | ||
78 | } | ||
79 | |||
80 | data DHTPing ni = forall r. DHTPing | ||
81 | { pingQuery :: [String] -> ni -> IO (Maybe r) | ||
82 | , pingShowResult :: r -> String | ||
83 | } | ||
84 | |||
diff --git a/src/Network/Kademlia/Persistence.hs b/src/Network/Kademlia/Persistence.hs deleted file mode 100644 index d7431671..00000000 --- a/src/Network/Kademlia/Persistence.hs +++ /dev/null | |||
@@ -1,51 +0,0 @@ | |||
1 | {-# LANGUAGE NamedFieldPuns #-} | ||
2 | module Network.Kademlia.Persistence where | ||
3 | |||
4 | import Network.Kademlia.CommonAPI | ||
5 | import Network.Kademlia.Routing as R | ||
6 | |||
7 | import Control.Concurrent.STM | ||
8 | import qualified Data.Aeson as J | ||
9 | ;import Data.Aeson as J (FromJSON) | ||
10 | import qualified Data.ByteString.Lazy as L | ||
11 | import qualified Data.HashMap.Strict as HashMap | ||
12 | import Data.List | ||
13 | import qualified Data.Vector as V | ||
14 | import System.IO.Error | ||
15 | |||
16 | saveNodes :: String -> DHT -> IO () | ||
17 | saveNodes netname DHT{dhtBuckets} = do | ||
18 | bkts <- atomically $ readTVar dhtBuckets | ||
19 | let ns = map fst $ concat $ R.toList bkts | ||
20 | bs = J.encode ns | ||
21 | fname = nodesFileName netname | ||
22 | L.writeFile fname bs | ||
23 | |||
24 | loadNodes :: FromJSON ni => String -> IO [ni] | ||
25 | loadNodes netname = do | ||
26 | let fname = nodesFileName netname | ||
27 | attempt <- tryIOError $ do | ||
28 | J.decode <$> L.readFile fname | ||
29 | >>= maybe (ioError $ userError "Nothing") return | ||
30 | either (const $ fallbackLoad fname) return attempt | ||
31 | |||
32 | nodesFileName :: String -> String | ||
33 | nodesFileName netname = netname ++ "-nodes.json" | ||
34 | |||
35 | fallbackLoad :: FromJSON t => FilePath -> IO [t] | ||
36 | fallbackLoad fname = do | ||
37 | attempt <- tryIOError $ do | ||
38 | J.decode <$> L.readFile fname | ||
39 | >>= maybe (ioError $ userError "Nothing") return | ||
40 | let go r = do | ||
41 | let m = HashMap.lookup "nodes" (r :: J.Object) | ||
42 | ns0 = case m of Just (J.Array v) -> V.toList v | ||
43 | Nothing -> [] | ||
44 | ns1 = zip (map J.fromJSON ns0) ns0 | ||
45 | issuc (J.Error _,_) = False | ||
46 | issuc _ = True | ||
47 | (ss,fs) = partition issuc ns1 | ||
48 | ns = map (\(J.Success n,_) -> n) ss | ||
49 | mapM_ print (map snd fs) >> return ns | ||
50 | either (const $ return []) go attempt | ||
51 | |||
diff --git a/src/Network/Kademlia/Routing.hs b/src/Network/Kademlia/Routing.hs deleted file mode 100644 index a52cca73..00000000 --- a/src/Network/Kademlia/Routing.hs +++ /dev/null | |||
@@ -1,808 +0,0 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013 | ||
3 | -- (c) Joe Crayne 2017 | ||
4 | -- License : BSD3 | ||
5 | -- Maintainer : pxqr.sta@gmail.com | ||
6 | -- Stability : experimental | ||
7 | -- Portability : portable | ||
8 | -- | ||
9 | -- Every node maintains a routing table of known good nodes. The | ||
10 | -- nodes in the routing table are used as starting points for | ||
11 | -- queries in the DHT. Nodes from the routing table are returned in | ||
12 | -- response to queries from other nodes. | ||
13 | -- | ||
14 | -- For more info see: | ||
15 | -- <http://www.bittorrent.org/beps/bep_0005.html#routing-table> | ||
16 | -- | ||
17 | {-# LANGUAGE CPP #-} | ||
18 | {-# LANGUAGE RecordWildCards #-} | ||
19 | {-# LANGUAGE BangPatterns #-} | ||
20 | {-# LANGUAGE RankNTypes #-} | ||
21 | {-# LANGUAGE ViewPatterns #-} | ||
22 | {-# LANGUAGE TypeOperators #-} | ||
23 | {-# LANGUAGE DeriveGeneric #-} | ||
24 | {-# LANGUAGE DeriveFunctor #-} | ||
25 | {-# LANGUAGE GADTs #-} | ||
26 | {-# LANGUAGE ScopedTypeVariables #-} | ||
27 | {-# LANGUAGE TupleSections #-} | ||
28 | {-# LANGUAGE StandaloneDeriving, FlexibleContexts, MultiParamTypeClasses, FlexibleInstances #-} | ||
29 | {-# OPTIONS_GHC -fno-warn-orphans #-} | ||
30 | module Network.Kademlia.Routing | ||
31 | {- | ||
32 | ( -- * BucketList | ||
33 | BucketList | ||
34 | , Info(..) | ||
35 | |||
36 | -- * Attributes | ||
37 | , BucketCount | ||
38 | , defaultBucketCount | ||
39 | , BucketSize | ||
40 | , defaultBucketSize | ||
41 | , NodeCount | ||
42 | |||
43 | -- * Query | ||
44 | , Network.Kademlia.Routing.null | ||
45 | , Network.Kademlia.Routing.full | ||
46 | , thisId | ||
47 | , shape | ||
48 | , Network.Kademlia.Routing.size | ||
49 | , Network.Kademlia.Routing.depth | ||
50 | , compatibleNodeId | ||
51 | |||
52 | -- * Lookup | ||
53 | , K | ||
54 | , defaultK | ||
55 | , TableKey (..) | ||
56 | , kclosest | ||
57 | |||
58 | -- * Construction | ||
59 | , Network.Kademlia.Routing.nullTable | ||
60 | , Event(..) | ||
61 | , CheckPing(..) | ||
62 | , Network.Kademlia.Routing.insert | ||
63 | |||
64 | -- * Conversion | ||
65 | , Network.Kademlia.Routing.TableEntry | ||
66 | , Network.Kademlia.Routing.toList | ||
67 | |||
68 | -- * Routing | ||
69 | , Timestamp | ||
70 | , getTimestamp | ||
71 | ) -} where | ||
72 | |||
73 | import Control.Applicative as A | ||
74 | import Control.Arrow | ||
75 | import Control.Monad | ||
76 | import Data.Function | ||
77 | import Data.Functor.Contravariant | ||
78 | import Data.Functor.Identity | ||
79 | import Data.List as L hiding (insert) | ||
80 | import Data.Maybe | ||
81 | import Data.Monoid | ||
82 | import Data.Wrapper.PSQ as PSQ | ||
83 | import Data.Serialize as S hiding (Result, Done) | ||
84 | import qualified Data.Sequence as Seq | ||
85 | import Data.Time | ||
86 | import Data.Time.Clock.POSIX | ||
87 | import Data.Word | ||
88 | import GHC.Generics | ||
89 | import Text.PrettyPrint as PP hiding ((<>)) | ||
90 | import Text.PrettyPrint.HughesPJClass (pPrint,Pretty) | ||
91 | import qualified Data.ByteString as BS | ||
92 | import Data.Bits | ||
93 | import Data.Ord | ||
94 | import Data.Reflection | ||
95 | import Network.Address | ||
96 | import Data.Typeable | ||
97 | import Data.Coerce | ||
98 | import Data.Hashable | ||
99 | |||
100 | |||
101 | -- | Last time the node was responding to our queries. | ||
102 | -- | ||
103 | -- Not all nodes that we learn about are equal. Some are \"good\" and | ||
104 | -- some are not. Many nodes using the DHT are able to send queries | ||
105 | -- and receive responses, but are not able to respond to queries | ||
106 | -- from other nodes. It is important that each node's routing table | ||
107 | -- must contain only known good nodes. A good node is a node has | ||
108 | -- responded to one of our queries within the last 15 minutes. A | ||
109 | -- node is also good if it has ever responded to one of our queries | ||
110 | -- and has sent us a query within the last 15 minutes. After 15 | ||
111 | -- minutes of inactivity, a node becomes questionable. Nodes become | ||
112 | -- bad when they fail to respond to multiple queries in a row. Nodes | ||
113 | -- that we know are good are given priority over nodes with unknown | ||
114 | -- status. | ||
115 | -- | ||
116 | type Timestamp = POSIXTime | ||
117 | |||
118 | getTimestamp :: IO Timestamp | ||
119 | getTimestamp = do | ||
120 | utcTime <- getCurrentTime | ||
121 | return $ utcTimeToPOSIXSeconds utcTime | ||
122 | |||
123 | |||
124 | |||
125 | {----------------------------------------------------------------------- | ||
126 | Bucket | ||
127 | -----------------------------------------------------------------------} | ||
128 | -- | ||
129 | -- When a k-bucket is full and a new node is discovered for that | ||
130 | -- k-bucket, the least recently seen node in the k-bucket is | ||
131 | -- PINGed. If the node is found to be still alive, the new node is | ||
132 | -- place in a secondary list, a replacement cache. The replacement | ||
133 | -- cache is used only if a node in the k-bucket stops responding. In | ||
134 | -- other words: new nodes are used only when older nodes disappear. | ||
135 | |||
136 | -- | Timestamp - last time this node is pinged. | ||
137 | type NodeEntry ni = Binding ni Timestamp | ||
138 | |||
139 | |||
140 | -- | Maximum number of 'NodeInfo's stored in a bucket. Most clients | ||
141 | -- use this value. | ||
142 | defaultBucketSize :: Int | ||
143 | defaultBucketSize = 8 | ||
144 | |||
145 | data QueueMethods m elem fifo = QueueMethods | ||
146 | { pushBack :: elem -> fifo -> m fifo | ||
147 | , popFront :: fifo -> m (Maybe elem, fifo) | ||
148 | , emptyQueue :: m fifo | ||
149 | } | ||
150 | |||
151 | {- | ||
152 | fromQ :: Functor m => | ||
153 | ( a -> b ) | ||
154 | -> ( b -> a ) | ||
155 | -> QueueMethods m elem a | ||
156 | -> QueueMethods m elem b | ||
157 | fromQ embed project QueueMethods{..} = | ||
158 | QueueMethods { pushBack = \e -> fmap embed . pushBack e . project | ||
159 | , popFront = fmap (second embed) . popFront . project | ||
160 | , emptyQueue = fmap embed emptyQueue | ||
161 | } | ||
162 | -} | ||
163 | |||
164 | seqQ :: QueueMethods Identity ni (Seq.Seq ni) | ||
165 | seqQ = QueueMethods | ||
166 | { pushBack = \e fifo -> pure (fifo Seq.|> e) | ||
167 | , popFront = \fifo -> case Seq.viewl fifo of | ||
168 | e Seq.:< fifo' -> pure (Just e, fifo') | ||
169 | Seq.EmptyL -> pure (Nothing, Seq.empty) | ||
170 | , emptyQueue = pure Seq.empty | ||
171 | } | ||
172 | |||
173 | type BucketQueue ni = Seq.Seq ni | ||
174 | |||
175 | bucketQ :: QueueMethods Identity ni (BucketQueue ni) | ||
176 | bucketQ = seqQ | ||
177 | |||
178 | |||
179 | data Compare a = Compare (a -> a -> Ordering) (Int -> a -> Int) | ||
180 | |||
181 | contramapC :: (b -> a) -> Compare a -> Compare b | ||
182 | contramapC f (Compare cmp hsh) = Compare (\a b -> cmp (f a) (f b)) | ||
183 | (\s x -> hsh s (f x)) | ||
184 | |||
185 | newtype Ordered' s a = Ordered a | ||
186 | deriving (Show) | ||
187 | |||
188 | -- | Hack to avoid UndecidableInstances | ||
189 | newtype Shrink a = Shrink a | ||
190 | deriving (Show) | ||
191 | |||
192 | type Ordered s a = Ordered' s (Shrink a) | ||
193 | |||
194 | instance Reifies s (Compare a) => Eq (Ordered' s (Shrink a)) where | ||
195 | a == b = (compare a b == EQ) | ||
196 | |||
197 | instance Reifies s (Compare a) => Ord (Ordered' s (Shrink a)) where | ||
198 | compare a b = cmp (coerce a) (coerce b) | ||
199 | where Compare cmp _ = reflect (Proxy :: Proxy s) | ||
200 | |||
201 | instance Reifies s (Compare a) => Hashable (Ordered' s (Shrink a)) where | ||
202 | hashWithSalt salt x = hash salt (coerce x) | ||
203 | where Compare _ hash = reflect (Proxy :: Proxy s) | ||
204 | |||
205 | -- | Bucket is also limited in its length — thus it's called k-bucket. | ||
206 | -- When bucket becomes full, we should split it in two lists by | ||
207 | -- current span bit. Span bit is defined by depth in the routing | ||
208 | -- table tree. Size of the bucket should be choosen such that it's | ||
209 | -- very unlikely that all nodes in bucket fail within an hour of | ||
210 | -- each other. | ||
211 | data Bucket s ni = Bucket | ||
212 | { bktNodes :: !(PSQ (Ordered s ni) Timestamp) -- current routing nodes | ||
213 | , bktQ :: !(BucketQueue (Timestamp,ni)) -- replacements pending time-outs | ||
214 | } deriving (Generic) | ||
215 | |||
216 | #define CAN_SHOW_BUCKET 0 | ||
217 | |||
218 | #if CAN_SHOW_BUCKET | ||
219 | deriving instance Show ni => Show (Bucket s ni) | ||
220 | #endif | ||
221 | |||
222 | bucketCompare :: forall p ni s. Reifies s (Compare ni) => p (Bucket s ni) -> Compare ni | ||
223 | bucketCompare _ = reflect (Proxy :: Proxy s) | ||
224 | |||
225 | mapBucket :: ( Reifies s (Compare a) | ||
226 | , Reifies t (Compare ni) | ||
227 | ) => (a -> ni) -> Bucket s a -> Bucket t ni | ||
228 | mapBucket f (Bucket ns q) = Bucket (PSQ.fromList $ map (\(ni :-> tm) -> (f' ni :-> tm)) $ PSQ.toList ns) | ||
229 | (fmap (second f) q) | ||
230 | where f' = coerce . f . coerce | ||
231 | |||
232 | |||
233 | #if 0 | ||
234 | |||
235 | {- | ||
236 | getGenericNode :: ( Serialize (NodeId) | ||
237 | , Serialize ip | ||
238 | , Serialize u | ||
239 | ) => Get (NodeInfo) | ||
240 | getGenericNode = do | ||
241 | nid <- get | ||
242 | naddr <- get | ||
243 | u <- get | ||
244 | return NodeInfo | ||
245 | { nodeId = nid | ||
246 | , nodeAddr = naddr | ||
247 | , nodeAnnotation = u | ||
248 | } | ||
249 | |||
250 | putGenericNode :: ( Serialize (NodeId) | ||
251 | , Serialize ip | ||
252 | , Serialize u | ||
253 | ) => NodeInfo -> Put | ||
254 | putGenericNode (NodeInfo nid naddr u) = do | ||
255 | put nid | ||
256 | put naddr | ||
257 | put u | ||
258 | |||
259 | instance (Eq ip, Ord (NodeId), Serialize (NodeId), Serialize ip, Serialize u) => Serialize (Bucket) where | ||
260 | get = Bucket . psqFromPairList <$> getListOf ( (,) <$> getGenericNode <*> get ) <*> pure (runIdentity $ emptyQueue bucketQ) | ||
261 | put = putListOf (\(ni,stamp) -> putGenericNode ni >> put stamp) . psqToPairList . bktNodes | ||
262 | -} | ||
263 | |||
264 | #endif | ||
265 | |||
266 | psqFromPairList :: (Ord p, PSQKey k) => [(k, p)] -> PSQ k p | ||
267 | psqFromPairList xs = PSQ.fromList $ map (\(a,b) -> a :-> b) xs | ||
268 | |||
269 | psqToPairList :: ( PSQKey t, Ord t1 ) => PSQ t t1 -> [(t, t1)] | ||
270 | psqToPairList psq = map (\(a :-> b) -> (a,b)) $ PSQ.toList psq | ||
271 | |||
272 | -- | Update interval, in seconds. | ||
273 | delta :: NominalDiffTime | ||
274 | delta = 15 * 60 | ||
275 | |||
276 | -- | Should maintain a set of stable long running nodes. | ||
277 | -- | ||
278 | -- Note: pings are triggerd only when a bucket is full. | ||
279 | updateBucketForInbound :: ( Coercible t1 t | ||
280 | , Alternative f | ||
281 | , Reifies s (Compare t1) | ||
282 | ) => NominalDiffTime -> t1 -> Bucket s t1 -> f ([t], Bucket s t1) | ||
283 | updateBucketForInbound curTime info bucket | ||
284 | -- Just update timestamp if a node is already in bucket. | ||
285 | -- | ||
286 | -- Note PingResult events should only occur for nodes we requested a ping for, | ||
287 | -- and those will always already be in the routing queue and will get their | ||
288 | -- timestamp updated here, since 'TryInsert' is called on every inbound packet, | ||
289 | -- including ping results. | ||
290 | | already_have | ||
291 | = pure ( [], map_ns $ PSQ.insertWith max (coerce info) curTime ) | ||
292 | -- bucket is good, but not full => we can insert a new node | ||
293 | | PSQ.size (bktNodes bucket) < defaultBucketSize | ||
294 | = pure ( [], map_ns $ PSQ.insert (coerce info) curTime ) | ||
295 | -- If there are any questionable nodes in the bucket have not been | ||
296 | -- seen in the last 15 minutes, the least recently seen node is | ||
297 | -- pinged. If any nodes in the bucket are known to have become bad, | ||
298 | -- then one is replaced by the new node in the next insertBucket | ||
299 | -- iteration. | ||
300 | | not (L.null stales) | ||
301 | = pure ( stales | ||
302 | , bucket { -- Update timestamps so that we don't redundantly ping. | ||
303 | bktNodes = updateStamps curTime (coerce stales) $ bktNodes bucket | ||
304 | -- Update queue with the pending NodeInfo in case of ping fail. | ||
305 | , bktQ = runIdentity $ pushBack bucketQ (curTime,info) $ bktQ bucket } ) | ||
306 | -- When the bucket is full of good nodes, the new node is simply discarded. | ||
307 | -- We must return 'A.empty' here to ensure that bucket splitting happens | ||
308 | -- inside 'modifyBucket'. | ||
309 | | otherwise = A.empty | ||
310 | where | ||
311 | -- We (take 1) to keep a 1-to-1 correspondence between pending pings and | ||
312 | -- waiting nodes in the bktQ. This way, we don't have to worry about what | ||
313 | -- to do with failed pings for which there is no ready replacements. | ||
314 | stales = -- One stale: | ||
315 | do (n :-> t) <- maybeToList $ PSQ.findMin (bktNodes bucket) | ||
316 | guard (t < curTime - delta) | ||
317 | return $ coerce n | ||
318 | -- All stale: | ||
319 | -- map key \$ PSQ.atMost (curTime - delta) $ bktNodes bucket | ||
320 | |||
321 | already_have = maybe False (const True) $ PSQ.lookup (coerce info) (bktNodes bucket) | ||
322 | |||
323 | map_ns f = bucket { bktNodes = f (bktNodes bucket) } | ||
324 | -- map_q f = bucket { bktQ = runIdentity \$ f (bktQ bucket) } | ||
325 | |||
326 | updateBucketForPingResult :: (Applicative f, Reifies s (Compare a)) => | ||
327 | a -> Bool -> Bucket s a -> f ([(a, Maybe (Timestamp, a))], Bucket s a) | ||
328 | updateBucketForPingResult bad_node got_response bucket | ||
329 | = pure ( map (,Nothing) forgotten | ||
330 | ++ map (second Just) replacements | ||
331 | , Bucket (foldr replace | ||
332 | (bktNodes bucket) | ||
333 | replacements) | ||
334 | popped | ||
335 | ) | ||
336 | where | ||
337 | (top, popped) = runIdentity $ popFront bucketQ (bktQ bucket) | ||
338 | |||
339 | -- Dropped from accepted, replaced by pending. | ||
340 | replacements | got_response = [] -- Timestamp was already updated by TryInsert. | ||
341 | | Just info <- top = do | ||
342 | -- Insert only if there's a removal. | ||
343 | _ <- maybeToList $ PSQ.lookup (coerce bad_node) (bktNodes bucket) | ||
344 | return (bad_node, info) | ||
345 | | otherwise = [] | ||
346 | |||
347 | -- Dropped from the pending queue without replacing. | ||
348 | forgotten | got_response = maybeToList $ fmap snd top | ||
349 | | otherwise = [] | ||
350 | |||
351 | |||
352 | replace (bad_node, (tm, info)) = | ||
353 | PSQ.insert (coerce info) tm | ||
354 | . PSQ.delete (coerce bad_node) | ||
355 | |||
356 | |||
357 | updateStamps :: PSQKey ni => Timestamp -> [ni] -> PSQ ni Timestamp -> PSQ ni Timestamp | ||
358 | updateStamps curTime stales nodes = foldl' (\q n -> PSQ.insert n curTime q) nodes stales | ||
359 | |||
360 | type BitIx = Word | ||
361 | |||
362 | partitionQ :: Monad f => QueueMethods f elem b -> (elem -> Bool) -> b -> f (b, b) | ||
363 | partitionQ imp test q0 = do | ||
364 | pass0 <- emptyQueue imp | ||
365 | fail0 <- emptyQueue imp | ||
366 | let flipfix a b f = fix f a b | ||
367 | flipfix q0 (pass0,fail0) $ \rec q qs -> do | ||
368 | (mb,q') <- popFront imp q | ||
369 | case mb of | ||
370 | Nothing -> return qs | ||
371 | Just e -> do qs' <- select (pushBack imp e) qs | ||
372 | rec q' qs' | ||
373 | where | ||
374 | select :: Functor f => (b -> f b) -> (b, b) -> f (b, b) | ||
375 | select f = if test e then \(a,b) -> flip (,) b <$> f a | ||
376 | else \(a,b) -> (,) a <$> f b | ||
377 | |||
378 | |||
379 | |||
380 | split :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
381 | forall ni s. ( Reifies s (Compare ni) ) => | ||
382 | (ni -> Word -> Bool) | ||
383 | -> BitIx -> Bucket s ni -> (Bucket s ni, Bucket s ni) | ||
384 | split testNodeIdBit i b = (Bucket ns qs, Bucket ms rs) | ||
385 | where | ||
386 | (ns,ms) = (PSQ.fromList *** PSQ.fromList) . partition (spanBit . coerce . key) . PSQ.toList $ bktNodes b | ||
387 | (qs,rs) = runIdentity $ partitionQ bucketQ (spanBit . snd) $ bktQ b | ||
388 | |||
389 | spanBit :: ni -> Bool | ||
390 | spanBit entry = testNodeIdBit entry i | ||
391 | |||
392 | |||
393 | {----------------------------------------------------------------------- | ||
394 | -- BucketList | ||
395 | -----------------------------------------------------------------------} | ||
396 | |||
397 | defaultBucketCount :: Int | ||
398 | defaultBucketCount = 20 | ||
399 | |||
400 | defaultMaxBucketCount :: Word | ||
401 | defaultMaxBucketCount = 24 | ||
402 | |||
403 | data Info ni nid = Info | ||
404 | { myBuckets :: BucketList ni | ||
405 | , myNodeId :: nid | ||
406 | , myAddress :: SockAddr | ||
407 | } | ||
408 | deriving Generic | ||
409 | |||
410 | deriving instance (Eq ni, Eq nid) => Eq (Info ni nid) | ||
411 | deriving instance (Show ni, Show nid) => Show (Info ni nid) | ||
412 | |||
413 | -- instance (Eq ip, Serialize ip) => Serialize (Info ip) | ||
414 | |||
415 | -- | The routing table covers the entire 'NodeId' space from 0 to 2 ^ | ||
416 | -- 160. The routing table is subdivided into 'Bucket's that each cover | ||
417 | -- a portion of the space. An empty table has one bucket with an ID | ||
418 | -- space range of @min = 0, max = 2 ^ 160@. When a node with ID \"N\" | ||
419 | -- is inserted into the table, it is placed within the bucket that has | ||
420 | -- @min <= N < max@. An empty table has only one bucket so any node | ||
421 | -- must fit within it. Each bucket can only hold 'K' nodes, currently | ||
422 | -- eight, before becoming 'Full'. When a bucket is full of known good | ||
423 | -- nodes, no more nodes may be added unless our own 'NodeId' falls | ||
424 | -- within the range of the 'Bucket'. In that case, the bucket is | ||
425 | -- replaced by two new buckets each with half the range of the old | ||
426 | -- bucket and the nodes from the old bucket are distributed among the | ||
427 | -- two new ones. For a new table with only one bucket, the full bucket | ||
428 | -- is always split into two new buckets covering the ranges @0..2 ^ | ||
429 | -- 159@ and @2 ^ 159..2 ^ 160@. | ||
430 | -- | ||
431 | data BucketList ni = forall s. Reifies s (Compare ni) => | ||
432 | BucketList { thisNode :: !ni | ||
433 | -- | Non-empty list of buckets. | ||
434 | , buckets :: [Bucket s ni] | ||
435 | } | ||
436 | |||
437 | mapTable :: (b -> t) -> (t -> b) -> BucketList t -> BucketList b | ||
438 | mapTable g f tbl@(BucketList self bkts) = reify (contramapC g $ bucketCompare bkts) | ||
439 | $ \p -> BucketList | ||
440 | { thisNode = f self | ||
441 | , buckets = map (resolve p . mapBucket f) bkts | ||
442 | } | ||
443 | where | ||
444 | resolve :: Proxy s -> Bucket s ni -> Bucket s ni | ||
445 | resolve = const id | ||
446 | |||
447 | instance (Eq ni) => Eq (BucketList ni) where | ||
448 | (==) = (==) `on` Network.Kademlia.Routing.toList | ||
449 | |||
450 | #if 0 | ||
451 | |||
452 | instance Serialize NominalDiffTime where | ||
453 | put = putWord32be . fromIntegral . fromEnum | ||
454 | get = (toEnum . fromIntegral) <$> getWord32be | ||
455 | |||
456 | #endif | ||
457 | |||
458 | #if CAN_SHOW_BUCKET | ||
459 | deriving instance (Show ni) => Show (BucketList ni) | ||
460 | #else | ||
461 | instance Show ni => Show (BucketList ni) where | ||
462 | showsPrec d (BucketList self bkts) = | ||
463 | mappend "BucketList " | ||
464 | . showsPrec (d+1) self | ||
465 | . mappend " (fromList " | ||
466 | . showsPrec (d+1) (L.map (L.map tableEntry . PSQ.toList . bktNodes) $ bkts) | ||
467 | . mappend ") " | ||
468 | #endif | ||
469 | |||
470 | #if 0 | ||
471 | |||
472 | -- | Normally, routing table should be saved between invocations of | ||
473 | -- the client software. Note that you don't need to store /this/ | ||
474 | -- 'NodeId' since it is already included in routing table. | ||
475 | instance (Eq ip, Serialize ip, Ord (NodeId), Serialize (NodeId), Serialize u) => Serialize (BucketList) | ||
476 | |||
477 | #endif | ||
478 | |||
479 | -- | Shape of the table. | ||
480 | instance Pretty (BucketList ni) where | ||
481 | pPrint t | ||
482 | | bucketCount < 6 = hcat $ punctuate ", " $ L.map PP.int ss | ||
483 | | otherwise = brackets $ | ||
484 | PP.int (L.sum ss) <> " nodes, " <> | ||
485 | PP.int bucketCount <> " buckets" | ||
486 | where | ||
487 | bucketCount = L.length ss | ||
488 | ss = shape t | ||
489 | |||
490 | -- | Empty table with specified /spine/ node id. | ||
491 | -- | ||
492 | -- XXX: The comparison function argument is awkward here. | ||
493 | nullTable :: (ni -> ni -> Ordering) -> (Int -> ni -> Int) -> ni -> Int -> BucketList ni | ||
494 | nullTable cmp hsh ni n = | ||
495 | reify (Compare cmp hsh) | ||
496 | $ \p -> BucketList | ||
497 | ni | ||
498 | [Bucket (empty p) (runIdentity $ emptyQueue bucketQ)] | ||
499 | where | ||
500 | empty :: Reifies s (Compare ni) => Proxy s -> PSQ (Ordered s ni) Timestamp | ||
501 | empty = const $ PSQ.empty | ||
502 | |||
503 | #if 0 | ||
504 | |||
505 | -- | Test if table is empty. In this case DHT should start | ||
506 | -- bootstrapping process until table becomes 'full'. | ||
507 | null :: BucketList -> Bool | ||
508 | null (Tip _ _ b) = PSQ.null $ bktNodes b | ||
509 | null _ = False | ||
510 | |||
511 | -- | Test if table have maximum number of nodes. No more nodes can be | ||
512 | -- 'insert'ed, except old ones becomes bad. | ||
513 | full :: BucketList -> Bool | ||
514 | full (Tip _ n _) = n == 0 | ||
515 | full (Zero t b) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
516 | full (One b t) = PSQ.size (bktNodes b) == defaultBucketSize && full t | ||
517 | |||
518 | -- | Get the /spine/ node id. | ||
519 | thisId :: BucketList -> NodeId | ||
520 | thisId (Tip nid _ _) = nid | ||
521 | thisId (Zero table _) = thisId table | ||
522 | thisId (One _ table) = thisId table | ||
523 | |||
524 | -- | Number of nodes in a bucket or a table. | ||
525 | type NodeCount = Int | ||
526 | |||
527 | #endif | ||
528 | |||
529 | -- | Internally, routing table is similar to list of buckets or a | ||
530 | -- /matrix/ of nodes. This function returns the shape of the matrix. | ||
531 | shape :: BucketList ni -> [Int] | ||
532 | shape (BucketList _ tbl) = map (PSQ.size . bktNodes) tbl | ||
533 | |||
534 | #if 0 | ||
535 | |||
536 | -- | Get number of nodes in the table. | ||
537 | size :: BucketList -> NodeCount | ||
538 | size = L.sum . shape | ||
539 | |||
540 | -- | Get number of buckets in the table. | ||
541 | depth :: BucketList -> BucketCount | ||
542 | depth = L.length . shape | ||
543 | |||
544 | #endif | ||
545 | |||
546 | lookupBucket :: forall ni nid x. | ||
547 | ( -- FiniteBits nid | ||
548 | Ord nid | ||
549 | ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x | ||
550 | lookupBucket space nid kont (BucketList self bkts) = kont $ go 0 [] bkts | ||
551 | where | ||
552 | d = kademliaXor space nid (kademliaLocation space self) | ||
553 | |||
554 | go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] | ||
555 | go i bs (bucket : buckets) | ||
556 | | kademliaTestBit space d i = bucket : buckets ++ bs | ||
557 | | otherwise = go (succ i) (bucket:bs) buckets | ||
558 | go _ bs [] = bs | ||
559 | |||
560 | bucketNumber :: forall ni nid. | ||
561 | KademliaSpace nid ni -> nid -> BucketList ni -> Int | ||
562 | bucketNumber space nid (BucketList self bkts) = fromIntegral $ go 0 bkts | ||
563 | where | ||
564 | d = kademliaXor space nid (kademliaLocation space self) | ||
565 | |||
566 | go :: Word -> [Bucket s ni] -> Word | ||
567 | go i (bucket : buckets) | ||
568 | | kademliaTestBit space d i = i | ||
569 | | otherwise = go (succ i) buckets | ||
570 | go i [] = i | ||
571 | |||
572 | |||
573 | compatibleNodeId :: forall ni nid. | ||
574 | ( Serialize nid, FiniteBits nid) => | ||
575 | (ni -> nid) -> BucketList ni -> IO nid | ||
576 | compatibleNodeId nodeId tbl = genBucketSample prefix br | ||
577 | where | ||
578 | br = bucketRange (L.length (shape tbl) - 1) True | ||
579 | nodeIdSize = finiteBitSize (undefined :: nid) `div` 8 | ||
580 | bs = BS.pack $ take nodeIdSize $ tablePrefix (testIdBit . nodeId) tbl ++ repeat 0 | ||
581 | prefix = either error id $ S.decode bs | ||
582 | |||
583 | tablePrefix :: (ni -> Word -> Bool) -> BucketList ni -> [Word8] | ||
584 | tablePrefix testbit = map (packByte . take 8 . (++repeat False)) | ||
585 | . chunksOf 8 | ||
586 | . tableBits testbit | ||
587 | where | ||
588 | packByte = foldl1' (.|.) . zipWith bitmask [7,6 .. 0] | ||
589 | bitmask ix True = bit ix | ||
590 | bitmask _ _ = 0 | ||
591 | |||
592 | tableBits :: (ni -> Word -> Bool) -> BucketList ni -> [Bool] | ||
593 | tableBits testbit (BucketList self bkts) = | ||
594 | zipWith const (map (testbit self) [0..]) | ||
595 | bkts | ||
596 | |||
597 | selfNode :: BucketList ni -> ni | ||
598 | selfNode (BucketList self _) = self | ||
599 | |||
600 | chunksOf :: Int -> [e] -> [[e]] | ||
601 | chunksOf i ls = map (take i) (build (splitter ls)) where | ||
602 | splitter :: [e] -> ([e] -> a -> a) -> a -> a | ||
603 | splitter [] _ n = n | ||
604 | splitter l c n = l `c` splitter (drop i l) c n | ||
605 | |||
606 | build :: ((a -> [a] -> [a]) -> [a] -> [a]) -> [a] | ||
607 | build g = g (:) [] | ||
608 | |||
609 | |||
610 | |||
611 | -- | Count of closest nodes in find_node reply. | ||
612 | type K = Int | ||
613 | |||
614 | -- | Default 'K' is equal to 'defaultBucketSize'. | ||
615 | defaultK :: K | ||
616 | defaultK = 8 | ||
617 | |||
618 | #if 0 | ||
619 | class TableKey dht k where | ||
620 | toNodeId :: k -> NodeId | ||
621 | |||
622 | instance TableKey dht (NodeId) where | ||
623 | toNodeId = id | ||
624 | |||
625 | #endif | ||
626 | |||
627 | -- | In Kademlia, the distance metric is XOR and the result is | ||
628 | -- interpreted as an unsigned integer. | ||
629 | newtype NodeDistance nodeid = NodeDistance nodeid | ||
630 | deriving (Eq, Ord) | ||
631 | |||
632 | -- | distance(A,B) = |A xor B| Smaller values are closer. | ||
633 | distance :: Bits nid => nid -> nid -> NodeDistance nid | ||
634 | distance a b = NodeDistance $ xor a b | ||
635 | |||
636 | -- | Order by closeness: nearest nodes first. | ||
637 | rank :: ( Ord nid | ||
638 | ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] | ||
639 | rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) | ||
640 | |||
641 | |||
642 | -- | Get a list of /K/ closest nodes using XOR metric. Used in | ||
643 | -- 'find_node' and 'get_peers' queries. | ||
644 | kclosest :: ( -- FiniteBits nid | ||
645 | Ord nid | ||
646 | ) => | ||
647 | KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] | ||
648 | kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) | ||
649 | ++ rank space nid (L.concat everyone) | ||
650 | where | ||
651 | (bucket,everyone) = | ||
652 | L.splitAt 1 | ||
653 | . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) | ||
654 | $ tbl | ||
655 | |||
656 | |||
657 | |||
658 | {----------------------------------------------------------------------- | ||
659 | -- Routing | ||
660 | -----------------------------------------------------------------------} | ||
661 | |||
662 | splitTip :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
663 | ( Reifies s (Compare ni) ) => | ||
664 | (ni -> Word -> Bool) | ||
665 | -> ni -> BitIx -> Bucket s ni -> [ Bucket s ni ] | ||
666 | splitTip testNodeBit ni i bucket | ||
667 | | testNodeBit ni i = [zeros , ones ] | ||
668 | | otherwise = [ones , zeros ] | ||
669 | where | ||
670 | (ones, zeros) = split testNodeBit i bucket | ||
671 | |||
672 | -- | Used in each query. | ||
673 | -- | ||
674 | -- TODO: Kademlia non-empty subtrees should should split if they have less than | ||
675 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | ||
676 | -- paper. The rule requiring additional splits is in section 2.4. | ||
677 | modifyBucket | ||
678 | :: -- ( Eq ip , Ord (NodeId) , FiniteBits (NodeId)) => | ||
679 | forall ni nid xs. | ||
680 | KademliaSpace nid ni | ||
681 | -> nid -> (forall s. Reifies s (Compare ni) => Bucket s ni -> Maybe (xs, Bucket s ni)) -> BucketList ni -> Maybe (xs,BucketList ni) | ||
682 | modifyBucket space nid f (BucketList self bkts) | ||
683 | = second (BucketList self) <$> go (0 :: BitIx) bkts | ||
684 | where | ||
685 | d = kademliaXor space nid (kademliaLocation space self) | ||
686 | |||
687 | -- go :: BitIx -> [Bucket s ni] -> Maybe (xs, [Bucket s ni]) | ||
688 | |||
689 | go !i (bucket : buckets@(_:_)) | ||
690 | | kademliaTestBit space d i = second (: buckets) <$> f bucket | ||
691 | | otherwise = second (bucket :) <$> go (succ i) buckets | ||
692 | |||
693 | go !i [bucket] = second (: []) <$> f bucket <|> gosplit | ||
694 | where | ||
695 | gosplit | i < defaultMaxBucketCount = go i (splitTip ( kademliaTestBit space | ||
696 | . kademliaLocation space ) | ||
697 | self | ||
698 | i | ||
699 | bucket) | ||
700 | | otherwise = Nothing -- Limit the number of buckets. | ||
701 | |||
702 | |||
703 | bktCount :: BucketList ni -> Int | ||
704 | bktCount (BucketList _ bkts) = L.length bkts | ||
705 | |||
706 | -- | Triggering event for atomic table update | ||
707 | data Event ni = TryInsert { foreignNode :: ni } | ||
708 | | PingResult { foreignNode :: ni , ponged :: Bool } | ||
709 | |||
710 | #if 0 | ||
711 | deriving instance Eq (NodeId) => Eq (Event) | ||
712 | deriving instance ( Show ip | ||
713 | , Show (NodeId) | ||
714 | , Show u | ||
715 | ) => Show (Event) | ||
716 | |||
717 | #endif | ||
718 | |||
719 | eventId :: (ni -> nid) -> Event ni -> nid | ||
720 | eventId nodeId (TryInsert ni) = nodeId ni | ||
721 | eventId nodeId (PingResult ni _) = nodeId ni | ||
722 | |||
723 | |||
724 | -- | Actions requested by atomic table update | ||
725 | data CheckPing ni = CheckPing [ni] | ||
726 | |||
727 | #if 0 | ||
728 | |||
729 | deriving instance Eq (NodeId) => Eq (CheckPing) | ||
730 | deriving instance ( Show ip | ||
731 | , Show (NodeId) | ||
732 | , Show u | ||
733 | ) => Show (CheckPing) | ||
734 | |||
735 | #endif | ||
736 | |||
737 | |||
738 | -- | Call on every inbound packet (including requested ping results). | ||
739 | -- Returns a triple (was_inserted, to_ping, tbl') where | ||
740 | -- | ||
741 | -- [ /was_inserted/ ] True if the node was added to the routing table. | ||
742 | -- | ||
743 | -- [ /to_ping/ ] A list of nodes to ping and then run 'updateForPingResult'. | ||
744 | -- This will be empty if /was_inserted/, but a non-inserted node | ||
745 | -- may be added to a replacement queue and will be inserted if | ||
746 | -- one of the items in this list time out. | ||
747 | -- | ||
748 | -- [ /tbl'/ ] The updated routing 'BucketList'. | ||
749 | -- | ||
750 | updateForInbound :: | ||
751 | KademliaSpace nid ni | ||
752 | -> Timestamp -> ni -> BucketList ni -> (Bool, [ni], BucketList ni) | ||
753 | updateForInbound space tm ni tbl@(BucketList _ bkts) = | ||
754 | maybe (False, [],tbl) (\(ps,tbl') -> (True, ps, tbl')) | ||
755 | $ modifyBucket space | ||
756 | (kademliaLocation space ni) | ||
757 | (updateBucketForInbound tm ni) | ||
758 | tbl | ||
759 | |||
760 | -- | Update the routing table with the results of a ping. | ||
761 | -- | ||
762 | -- Each (a,(tm,b)) in the returned list indicates that the node /a/ was deleted from the | ||
763 | -- routing table and the node /b/, with timestamp /tm/, has taken its place. | ||
764 | updateForPingResult :: | ||
765 | KademliaSpace nid ni | ||
766 | -> ni -- ^ The pinged node. | ||
767 | -> Bool -- ^ True if we got a reply, False if it timed out. | ||
768 | -> BucketList ni -- ^ The routing table. | ||
769 | -> ( [(ni,Maybe (Timestamp, ni))], BucketList ni ) | ||
770 | updateForPingResult space ni got_reply tbl = | ||
771 | fromMaybe ([],tbl) | ||
772 | $ modifyBucket space | ||
773 | (kademliaLocation space ni) | ||
774 | (updateBucketForPingResult ni got_reply) | ||
775 | tbl | ||
776 | |||
777 | |||
778 | {----------------------------------------------------------------------- | ||
779 | -- Conversion | ||
780 | -----------------------------------------------------------------------} | ||
781 | |||
782 | type TableEntry ni = (ni, Timestamp) | ||
783 | |||
784 | tableEntry :: NodeEntry ni -> TableEntry ni | ||
785 | tableEntry (a :-> b) = (a, b) | ||
786 | |||
787 | toList :: BucketList ni -> [[TableEntry ni]] | ||
788 | toList (BucketList _ bkts) = coerce $ L.map (L.map tableEntry . PSQ.toList . bktNodes) bkts | ||
789 | |||
790 | data KademliaSpace nid ni = KademliaSpace | ||
791 | { -- | Given a node record (probably including IP address), yields a | ||
792 | -- kademlia xor-metric location. | ||
793 | kademliaLocation :: ni -> nid | ||
794 | -- | Used when comparing locations. This is similar to | ||
795 | -- 'Data.Bits.testBit' except that the ordering of bits is reversed, so | ||
796 | -- that 0 is the most significant bit. | ||
797 | , kademliaTestBit :: nid -> Word -> Bool | ||
798 | -- | The Kademlia xor-metric. | ||
799 | , kademliaXor :: nid -> nid -> nid | ||
800 | |||
801 | , kademliaSample :: forall m. Applicative m => (Int -> m BS.ByteString) -> nid -> (Int,Word8,Word8) -> m nid | ||
802 | } | ||
803 | |||
804 | instance Contravariant (KademliaSpace nid) where | ||
805 | contramap f ks = ks | ||
806 | { kademliaLocation = kademliaLocation ks . f | ||
807 | } | ||
808 | |||
diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs deleted file mode 100644 index 1be1afc1..00000000 --- a/src/Network/Kademlia/Search.hs +++ /dev/null | |||
@@ -1,236 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE PatternSynonyms #-} | ||
3 | {-# LANGUAGE RecordWildCards #-} | ||
4 | {-# LANGUAGE ScopedTypeVariables #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | module Network.Kademlia.Search where | ||
8 | |||
9 | import Control.Concurrent.Tasks | ||
10 | import Control.Concurrent.STM | ||
11 | import Control.Monad | ||
12 | import Data.Function | ||
13 | import Data.Maybe | ||
14 | import qualified Data.Set as Set | ||
15 | ;import Data.Set (Set) | ||
16 | import Data.Hashable (Hashable(..)) -- for type sigs | ||
17 | import System.IO.Error | ||
18 | |||
19 | import qualified Data.MinMaxPSQ as MM | ||
20 | ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') | ||
21 | import qualified Data.Wrapper.PSQ as PSQ | ||
22 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) | ||
23 | import Network.Kademlia.Routing as R | ||
24 | #ifdef THREAD_DEBUG | ||
25 | import Control.Concurrent.Lifted.Instrument | ||
26 | #else | ||
27 | import Control.Concurrent.Lifted | ||
28 | import GHC.Conc (labelThread) | ||
29 | #endif | ||
30 | |||
31 | data Search nid addr tok ni r = Search | ||
32 | { searchSpace :: KademliaSpace nid ni | ||
33 | , searchNodeAddress :: ni -> addr | ||
34 | , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) | ||
35 | (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) | ||
36 | , searchAlpha :: Int -- α = 8 | ||
37 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | ||
38 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | ||
39 | -- need to ensure that closer non-responding queries don't completely push out | ||
40 | -- farther away queries. | ||
41 | -- | ||
42 | -- For BitTorrent, setting them both 8 was not an issue, but that is no longer | ||
43 | -- supported because now the number of remembered informants is now the | ||
44 | -- difference between these two numbers. So, if searchK = 16 and searchAlpha = | ||
45 | -- 4, then the number of remembered query responses is 12. | ||
46 | , searchK :: Int -- K = 16 | ||
47 | } | ||
48 | |||
49 | data SearchState nid addr tok ni r = SearchState | ||
50 | { -- | The number of pending queries. Incremented before any query is sent | ||
51 | -- and decremented when we get a reply. | ||
52 | searchPendingCount :: TVar Int | ||
53 | -- | Nodes scheduled to be queried (roughly at most K). | ||
54 | , searchQueued :: TVar (MinMaxPSQ ni nid) | ||
55 | -- | The nearest (K - α) nodes that issued a reply. | ||
56 | -- | ||
57 | -- α is the maximum number of simultaneous queries. | ||
58 | , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) | ||
59 | -- | This tracks already-queried addresses so we avoid bothering them | ||
60 | -- again. XXX: We could probably keep only the pending queries in this | ||
61 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | ||
62 | -- should limit the number of outstanding queries. | ||
63 | , searchVisited :: TVar (Set addr) | ||
64 | , searchSpec :: Search nid addr tok ni r | ||
65 | } | ||
66 | |||
67 | |||
68 | newSearch :: ( Ord addr | ||
69 | , PSQKey nid | ||
70 | , PSQKey ni | ||
71 | ) => | ||
72 | {- | ||
73 | KademliaSpace nid ni | ||
74 | -> (ni -> addr) | ||
75 | -> (ni -> IO ([ni], [r])) -- the query action. | ||
76 | -> (r -> STM Bool) -- receives search results. | ||
77 | -> nid -- target of search | ||
78 | -} | ||
79 | Search nid addr tok ni r | ||
80 | -> nid | ||
81 | -> [ni] -- Initial nodes to query. | ||
82 | -> STM (SearchState nid addr tok ni r) | ||
83 | newSearch s@(Search space nAddr qry _ _) target ns = do | ||
84 | c <- newTVar 0 | ||
85 | q <- newTVar $ MM.fromList | ||
86 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | ||
87 | $ ns | ||
88 | i <- newTVar MM.empty | ||
89 | v <- newTVar Set.empty | ||
90 | return -- (Search space nAddr qry) , r , target | ||
91 | ( SearchState c q i v s ) | ||
92 | |||
93 | -- | Discard a value from a key-priority-value tuple. This is useful for | ||
94 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". | ||
95 | stripValue :: Binding' k p v -> Binding k p | ||
96 | stripValue (Binding ni _ nid) = (ni :-> nid) | ||
97 | |||
98 | -- | Reset a 'SearchState' object to ready it for a repeated search. | ||
99 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | ||
100 | (nid -> STM [ni]) | ||
101 | -> Search nid addr1 tok1 ni r1 | ||
102 | -> nid | ||
103 | -> SearchState nid addr tok ni r | ||
104 | -> STM (SearchState nid addr tok ni r) | ||
105 | reset nearestNodes qsearch target st = do | ||
106 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | ||
107 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | ||
108 | <$> nearestNodes target | ||
109 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | ||
110 | writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes | ||
111 | writeTVar (searchInformant st) MM.empty | ||
112 | writeTVar (searchVisited st) Set.empty | ||
113 | writeTVar (searchPendingCount st) 0 | ||
114 | return st | ||
115 | |||
116 | sendAsyncQuery :: forall addr nid tok ni r. | ||
117 | ( Ord addr | ||
118 | , PSQKey nid | ||
119 | , PSQKey ni | ||
120 | , Show nid | ||
121 | ) => | ||
122 | Search nid addr tok ni r | ||
123 | -> nid | ||
124 | -> (r -> STM Bool) -- ^ return False to terminate the search. | ||
125 | -> SearchState nid addr tok ni r | ||
126 | -> Binding ni nid | ||
127 | -> TaskGroup | ||
128 | -> IO () | ||
129 | sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = | ||
130 | case searchQuery of | ||
131 | Left blockingQuery -> | ||
132 | forkTask g "searchQuery" $ do | ||
133 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | ||
134 | reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) | ||
135 | atomically $ do | ||
136 | modifyTVar searchPendingCount pred | ||
137 | maybe (return ()) go reply | ||
138 | Right nonblockingQuery -> do | ||
139 | nonblockingQuery searchTarget ni $ \reply -> | ||
140 | atomically $ do | ||
141 | modifyTVar searchPendingCount pred | ||
142 | maybe (return ()) go reply | ||
143 | where | ||
144 | go (ns,rs,tok) = do | ||
145 | vs <- readTVar searchVisited | ||
146 | -- We only queue a node if it is not yet visited | ||
147 | let insertFoundNode :: Int | ||
148 | -> ni | ||
149 | -> MinMaxPSQ ni nid | ||
150 | -> MinMaxPSQ ni nid | ||
151 | insertFoundNode k n q | ||
152 | | searchNodeAddress n `Set.member` vs | ||
153 | = q | ||
154 | | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget | ||
155 | $ kademliaLocation searchSpace n ) | ||
156 | q | ||
157 | |||
158 | qsize0 <- MM.size <$> readTVar searchQueued | ||
159 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow | ||
160 | -- only when there's fewer than | ||
161 | -- K elements. | ||
162 | modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns | ||
163 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | ||
164 | flip fix rs $ \loop -> \case | ||
165 | r:rs' -> do | ||
166 | wanting <- searchResult r | ||
167 | if wanting then loop rs' | ||
168 | else searchCancel sch | ||
169 | [] -> return () | ||
170 | |||
171 | |||
172 | searchIsFinished :: ( PSQKey nid | ||
173 | , PSQKey ni | ||
174 | ) => SearchState nid addr tok ni r -> STM Bool | ||
175 | searchIsFinished SearchState{..} = do | ||
176 | q <- readTVar searchQueued | ||
177 | cnt <- readTVar searchPendingCount | ||
178 | informants <- readTVar searchInformant | ||
179 | return $ cnt == 0 | ||
180 | && ( MM.null q | ||
181 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) | ||
182 | && ( PSQ.prio (fromJust $ MM.findMax informants) | ||
183 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
184 | |||
185 | searchCancel :: SearchState nid addr tok ni r -> STM () | ||
186 | searchCancel SearchState{..} = do | ||
187 | writeTVar searchPendingCount 0 | ||
188 | writeTVar searchQueued MM.empty | ||
189 | |||
190 | search :: | ||
191 | ( Ord r | ||
192 | , Ord addr | ||
193 | , PSQKey nid | ||
194 | , PSQKey ni | ||
195 | , Show nid | ||
196 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | ||
197 | search sch buckets target result = do | ||
198 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | ||
199 | st <- atomically $ newSearch sch target ns | ||
200 | forkIO $ searchLoop sch target result st | ||
201 | return st | ||
202 | |||
203 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | ||
204 | => Search nid addr tok ni r -- ^ Query and distance methods. | ||
205 | -> nid -- ^ The target we are searching for. | ||
206 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | ||
207 | -> SearchState nid addr tok ni r -- ^ Search-related state. | ||
208 | -> IO () | ||
209 | searchLoop sch@Search{..} target result s@SearchState{..} = do | ||
210 | myThreadId >>= flip labelThread ("search."++show target) | ||
211 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | ||
212 | join $ atomically $ do | ||
213 | cnt <- readTVar $ searchPendingCount | ||
214 | check (cnt <= 8) -- Only 8 pending queries at a time. | ||
215 | informants <- readTVar searchInformant | ||
216 | found <- MM.minView <$> readTVar searchQueued | ||
217 | case found of | ||
218 | Just (ni :-> d, q) | ||
219 | | -- If there's fewer than /k - α/ informants and there's any | ||
220 | -- node we haven't yet got a response from. | ||
221 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) | ||
222 | -- Or there's no informants yet at all. | ||
223 | || MM.null informants | ||
224 | -- Or if the closest scheduled node is nearer than the | ||
225 | -- nearest /k/ informants. | ||
226 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | ||
227 | -> -- Then the search continues, send a query. | ||
228 | do writeTVar searchQueued q | ||
229 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | ||
230 | modifyTVar searchPendingCount succ | ||
231 | return $ do | ||
232 | sendAsyncQuery sch target result s (ni :-> d) g | ||
233 | again | ||
234 | _ -> -- Otherwise, we are finished. | ||
235 | do check (cnt == 0) | ||
236 | return $ return () | ||