diff options
Diffstat (limited to 'src/Network/Kademlia/Bootstrap.hs')
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 437 |
1 files changed, 0 insertions, 437 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs deleted file mode 100644 index 1324ae77..00000000 --- a/src/Network/Kademlia/Bootstrap.hs +++ /dev/null | |||
@@ -1,437 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE ConstraintKinds #-} | ||
3 | {-# LANGUAGE DeriveFunctor #-} | ||
4 | {-# LANGUAGE DeriveTraversable #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE GADTs #-} | ||
7 | {-# LANGUAGE KindSignatures #-} | ||
8 | {-# LANGUAGE LambdaCase #-} | ||
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
10 | {-# LANGUAGE PartialTypeSignatures #-} | ||
11 | {-# LANGUAGE PatternSynonyms #-} | ||
12 | {-# LANGUAGE RankNTypes #-} | ||
13 | {-# LANGUAGE ScopedTypeVariables #-} | ||
14 | module Network.Kademlia.Bootstrap where | ||
15 | |||
16 | import Data.Function | ||
17 | import Data.Maybe | ||
18 | import qualified Data.Set as Set | ||
19 | import Data.Time.Clock.POSIX (getPOSIXTime) | ||
20 | import Network.Kademlia.Routing as R | ||
21 | #ifdef THREAD_DEBUG | ||
22 | import Control.Concurrent.Lifted.Instrument | ||
23 | #else | ||
24 | import Control.Concurrent.Lifted | ||
25 | import GHC.Conc (labelThread) | ||
26 | #endif | ||
27 | import Control.Concurrent.STM | ||
28 | import Control.Monad | ||
29 | import Data.Hashable | ||
30 | import Data.Time.Clock.POSIX (POSIXTime) | ||
31 | import Data.Ord | ||
32 | import System.Entropy | ||
33 | import System.Timeout | ||
34 | import DPut | ||
35 | import DebugTag | ||
36 | |||
37 | import qualified Data.Wrapper.PSQInt as Int | ||
38 | ;import Data.Wrapper.PSQInt (pattern (:->)) | ||
39 | import Network.Address (bucketRange) | ||
40 | import Network.Kademlia.Search | ||
41 | import Control.Concurrent.Tasks | ||
42 | import Network.Kademlia | ||
43 | |||
44 | type SensibleNodeId nid ni = | ||
45 | ( Show nid | ||
46 | , Ord nid | ||
47 | , Ord ni | ||
48 | , Hashable nid | ||
49 | , Hashable ni ) | ||
50 | |||
51 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | ||
52 | { -- | A staleness threshold (if a bucket goes this long without being | ||
53 | -- touched, a refresh will be triggered). | ||
54 | refreshInterval :: POSIXTime | ||
55 | -- | A TVar with the time-to-refresh schedule for each bucket. | ||
56 | -- | ||
57 | -- To "touch" a bucket and prevent it from being refreshed, reschedule | ||
58 | -- its refresh time to some time into the future by modifying its | ||
59 | -- priority in this priority search queue. | ||
60 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | ||
61 | -- | This is the kademlia node search specification. | ||
62 | , refreshSearch :: Search nid addr tok ni ni | ||
63 | -- | The current kademlia routing table buckets. | ||
64 | , refreshBuckets :: TVar (R.BucketList ni) | ||
65 | -- | Action to ping a node. This is used only during initial bootstrap | ||
66 | -- to get some nodes in our table. A 'True' result is interpreted as a a | ||
67 | -- pong, where 'False' is a non-response. | ||
68 | , refreshPing :: ni -> IO Bool | ||
69 | , -- | Timestamp of last bucket event. | ||
70 | refreshLastTouch :: TVar POSIXTime | ||
71 | , -- | This variable indicates whether or not we are in bootstrapping mode. | ||
72 | bootstrapMode :: TVar Bool | ||
73 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | ||
74 | -- every finished refresh. | ||
75 | bootstrapCountdown :: TVar (Maybe Int) | ||
76 | } | ||
77 | |||
78 | newBucketRefresher :: ( Ord addr, Hashable addr | ||
79 | , SensibleNodeId nid ni ) | ||
80 | => TVar (R.BucketList ni) | ||
81 | -> Search nid addr tok ni ni | ||
82 | -> (ni -> IO Bool) | ||
83 | -> STM (BucketRefresher nid ni) | ||
84 | newBucketRefresher bkts sch ping = do | ||
85 | let spc = searchSpace sch | ||
86 | nodeId = kademliaLocation spc | ||
87 | -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount | ||
88 | sched <- newTVar Int.empty | ||
89 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | ||
90 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | ||
91 | bootstrapCnt <- newTVar Nothing | ||
92 | return BucketRefresher | ||
93 | { refreshInterval = 15 * 60 | ||
94 | , refreshQueue = sched | ||
95 | , refreshSearch = sch | ||
96 | , refreshBuckets = bkts | ||
97 | , refreshPing = ping | ||
98 | , refreshLastTouch = lasttouch | ||
99 | , bootstrapMode = bootstrapVar | ||
100 | , bootstrapCountdown = bootstrapCnt | ||
101 | } | ||
102 | |||
103 | -- | This was added to avoid the compile error "Record update for | ||
104 | -- insufficiently polymorphic field" when trying to update the existentially | ||
105 | -- quantified field 'refreshSearch'. | ||
106 | updateRefresherIO :: Ord addr | ||
107 | => Search nid addr tok ni ni | ||
108 | -> (ni -> IO Bool) | ||
109 | -> BucketRefresher nid ni -> BucketRefresher nid ni | ||
110 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | ||
111 | { refreshSearch = sch | ||
112 | , refreshPing = ping | ||
113 | , refreshInterval = refreshInterval | ||
114 | , refreshBuckets = refreshBuckets | ||
115 | , refreshQueue = refreshQueue | ||
116 | , refreshLastTouch = refreshLastTouch | ||
117 | , bootstrapMode = bootstrapMode | ||
118 | , bootstrapCountdown = bootstrapCountdown | ||
119 | } | ||
120 | |||
121 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | ||
122 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | ||
123 | forkPollForRefresh r@BucketRefresher{ refreshInterval | ||
124 | , refreshQueue | ||
125 | , refreshBuckets | ||
126 | , refreshSearch } = fork $ do | ||
127 | myThreadId >>= flip labelThread "pollForRefresh" | ||
128 | fix $ \again -> do | ||
129 | join $ atomically $ do | ||
130 | nextup <- Int.findMin <$> readTVar refreshQueue | ||
131 | maybe retry (return . go again) nextup | ||
132 | where | ||
133 | refresh :: Int -> IO Int | ||
134 | refresh n = do | ||
135 | -- dput XRefresh $ "Refresh time! "++ show n | ||
136 | refreshBucket r n | ||
137 | |||
138 | go again ( bktnum :-> refresh_time ) = do | ||
139 | now <- getPOSIXTime | ||
140 | case fromEnum (refresh_time - now) of | ||
141 | x | x <= 0 -> do -- Refresh time! | ||
142 | -- Move it to the back of the refresh queue. | ||
143 | atomically $ do | ||
144 | interval <- effectiveRefreshInterval r bktnum | ||
145 | modifyTVar' refreshQueue | ||
146 | $ Int.insert bktnum (now + interval) | ||
147 | -- Now fork the refresh operation. | ||
148 | -- TODO: We should probably propogate the kill signal to this thread. | ||
149 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
150 | _ <- refresh bktnum | ||
151 | return () | ||
152 | return () | ||
153 | picoseconds -> do | ||
154 | -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum | ||
155 | threadDelay ( picoseconds `div` 10^6 ) | ||
156 | again | ||
157 | |||
158 | |||
159 | -- | This is a helper to 'refreshBucket' which does some book keeping to decide | ||
160 | -- whether or not a bucket is sufficiently refreshed or not. It will return | ||
161 | -- false when we can terminate a node search. | ||
162 | checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. | ||
163 | -> TVar (BucketList ni) -- ^ The current routing table. | ||
164 | -> TVar (Set.Set ni) -- ^ In-range nodes found so far. | ||
165 | -> TVar Bool -- ^ The result will also be written here. | ||
166 | -> Int -- ^ The bucket number of interest. | ||
167 | -> ni -- ^ A newly found node. | ||
168 | -> STM Bool | ||
169 | checkBucketFull space var resultCounter fin n found_node = do | ||
170 | let fullcount = R.defaultBucketSize | ||
171 | saveit True = writeTVar fin True >> return True | ||
172 | saveit _ = return False | ||
173 | tbl <- readTVar var | ||
174 | let counts = R.shape tbl | ||
175 | nid = kademliaLocation space found_node | ||
176 | -- Update the result set with every found node that is in the | ||
177 | -- bucket of interest. | ||
178 | when (n == R.bucketNumber space nid tbl) | ||
179 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
180 | resultCount <- readTVar resultCounter | ||
181 | saveit $ case drop (n - 1) counts of | ||
182 | (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going | ||
183 | _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going | ||
184 | _ -> False -- okay, good enough, let's quit. | ||
185 | |||
186 | -- | Called from 'refreshBucket' with the current time when a refresh of the | ||
187 | -- supplied bucket number finishes. | ||
188 | onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) | ||
189 | onFinishedRefresh BucketRefresher { bootstrapCountdown | ||
190 | , bootstrapMode | ||
191 | , refreshQueue | ||
192 | , refreshBuckets } num now = do | ||
193 | bootstrapping <- readTVar bootstrapMode | ||
194 | if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num | ||
195 | else do | ||
196 | tbl <- readTVar refreshBuckets | ||
197 | action <- | ||
198 | if num /= R.bktCount tbl - 1 | ||
199 | then do modifyTVar' bootstrapCountdown (fmap pred) | ||
200 | return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" | ||
201 | else do | ||
202 | -- The last bucket finished. | ||
203 | cnt <- readTVar bootstrapCountdown | ||
204 | case cnt of | ||
205 | Nothing -> do | ||
206 | let fullsize = R.defaultBucketSize | ||
207 | notfull (n,len) | n==num = False | ||
208 | | len>=fullsize = False | ||
209 | | otherwise = True | ||
210 | unfull = case filter notfull $ zip [0..] (R.shape tbl) of | ||
211 | [] -> [(0,0)] -- Schedule at least 1 more refresh. | ||
212 | xs -> xs | ||
213 | forM_ unfull $ \(n,_) -> do | ||
214 | -- Schedule immediate refresh for unfull buckets (other than this one). | ||
215 | modifyTVar' refreshQueue $ Int.insert n (now - 1) | ||
216 | writeTVar bootstrapCountdown $! Just $! length unfull | ||
217 | return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull | ||
218 | Just n -> do writeTVar bootstrapCountdown $! Just $! pred n | ||
219 | return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" | ||
220 | cnt <- readTVar bootstrapCountdown | ||
221 | if (cnt == Just 0) | ||
222 | then do | ||
223 | -- Boostrap finished! | ||
224 | writeTVar bootstrapMode False | ||
225 | writeTVar bootstrapCountdown Nothing | ||
226 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | ||
227 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | ||
228 | |||
229 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | ||
230 | BucketRefresher nid ni -> Int -> IO Int | ||
231 | refreshBucket r@BucketRefresher{ refreshSearch = sch | ||
232 | , refreshBuckets = var } | ||
233 | n = do | ||
234 | tbl <- atomically (readTVar var) | ||
235 | let count = bktCount tbl | ||
236 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
237 | sample <- if n+1 >= count -- Is this the last bucket? | ||
238 | then return nid -- Yes? Search our own id. | ||
239 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
240 | getEntropy | ||
241 | nid | ||
242 | (bucketRange n (n + 1 < count)) | ||
243 | fin <- atomically $ newTVar False | ||
244 | resultCounter <- atomically $ newTVar Set.empty | ||
245 | |||
246 | dput XRefresh $ "Start refresh " ++ show (n,sample) | ||
247 | |||
248 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
249 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
250 | then const $ return True -- Never short-circuit the last bucket. | ||
251 | else checkBucketFull (searchSpace sch) var resultCounter fin n | ||
252 | _ <- timeout (15*60*1000000) $ do | ||
253 | atomically $ searchIsFinished s >>= check | ||
254 | atomically $ searchCancel s | ||
255 | dput XDHT $ "Finish refresh " ++ show (n,sample) | ||
256 | now <- getPOSIXTime | ||
257 | join $ atomically $ onFinishedRefresh r n now | ||
258 | rcount <- atomically $ do | ||
259 | c <- Set.size <$> readTVar resultCounter | ||
260 | b <- readTVar fin | ||
261 | return $ if b then 1 else c | ||
262 | return rcount | ||
263 | |||
264 | refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () | ||
265 | refreshLastBucket r@BucketRefresher { refreshBuckets | ||
266 | , refreshQueue } = do | ||
267 | |||
268 | now <- getPOSIXTime | ||
269 | atomically $ do | ||
270 | cnt <- bktCount <$> readTVar refreshBuckets | ||
271 | -- Schedule immediate refresh. | ||
272 | modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) | ||
273 | |||
274 | restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => | ||
275 | BucketRefresher nid ni -> STM (IO ()) | ||
276 | restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do | ||
277 | unchanged <- readTVar bootstrapMode | ||
278 | writeTVar bootstrapMode True | ||
279 | writeTVar bootstrapCountdown Nothing | ||
280 | if not unchanged then return $ do | ||
281 | dput XRefresh "BOOTSTRAP entered bootstrap mode" | ||
282 | refreshLastBucket r | ||
283 | else return $ dput XRefresh "BOOTSTRAP already bootstrapping" | ||
284 | |||
285 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | ||
286 | BucketRefresher nid ni | ||
287 | -> t1 ni -- ^ Nodes to bootstrap from. | ||
288 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. | ||
289 | -> IO () | ||
290 | bootstrap r@BucketRefresher { refreshSearch = sch | ||
291 | , refreshBuckets = var | ||
292 | , refreshPing = ping | ||
293 | , bootstrapMode } ns ns0 = do | ||
294 | gotPing <- atomically $ newTVar False | ||
295 | |||
296 | -- First, ping the given nodes so that they are added to | ||
297 | -- our routing table. | ||
298 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
299 | forM_ ns $ \n -> do | ||
300 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
301 | forkTask g lbl $ do | ||
302 | b <- ping n | ||
303 | when b $ atomically $ writeTVar gotPing True | ||
304 | |||
305 | -- We resort to the hardcoded fallback nodes only when we got no | ||
306 | -- responses. This is to lesson the burden on well-known boostrap | ||
307 | -- nodes. | ||
308 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
309 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
310 | forM_ ns0 $ \n -> do | ||
311 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
312 | (void $ ping n) | ||
313 | dput XDHT "Finished bootstrap pings." | ||
314 | -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. | ||
315 | join $ atomically $ do | ||
316 | writeTVar bootstrapMode False | ||
317 | restartBootstrap r | ||
318 | -- | ||
319 | -- Hopefully 'forkPollForRefresh' was invoked and can take over | ||
320 | -- maintenance. | ||
321 | |||
322 | |||
323 | effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime | ||
324 | effectiveRefreshInterval BucketRefresher{ refreshInterval | ||
325 | , refreshBuckets | ||
326 | , bootstrapMode } num = do | ||
327 | tbl <- readTVar refreshBuckets | ||
328 | bootstrapping <- readTVar bootstrapMode | ||
329 | case bootstrapping of | ||
330 | False -> return refreshInterval | ||
331 | True -> do | ||
332 | -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. | ||
333 | let fullcount = R.defaultBucketSize | ||
334 | count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl | ||
335 | if count == fullcount | ||
336 | then return refreshInterval | ||
337 | else return 15 -- seconds | ||
338 | |||
339 | |||
340 | |||
341 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | ||
342 | -- changes. This will typically be invoked from 'tblTransition'. | ||
343 | -- | ||
344 | -- From BEP 05: | ||
345 | -- | ||
346 | -- > Each bucket should maintain a "last changed" property to indicate how | ||
347 | -- > "fresh" the contents are. | ||
348 | -- | ||
349 | -- We will use a "time to next refresh" property instead and store it in | ||
350 | -- a priority search queue. | ||
351 | -- | ||
352 | -- In detail using an expository (not actually implemented) type | ||
353 | -- 'BucketTouchEvent'... | ||
354 | -- | ||
355 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
356 | -- >>> bucketEvents = | ||
357 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
358 | -- >>> | ||
359 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
360 | -- >>> | ||
361 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
362 | -- >>> , Applicant :--> Accepted -- with another node, | ||
363 | -- >>> ] | ||
364 | -- | ||
365 | -- the bucket's last changed property should be updated. Buckets that have not | ||
366 | -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." | ||
367 | -- This is done by picking a random ID in the range of the bucket and | ||
368 | -- performing a find_nodes search on it. | ||
369 | -- | ||
370 | -- The only other possible BucketTouchEvents are as follows: | ||
371 | -- | ||
372 | -- >>> not_handled = | ||
373 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
374 | -- >>> -- (Applicant :--> Stranger) | ||
375 | -- >>> -- (Applicant :--> Accepted) | ||
376 | -- >>> , Accepted :--> Applicant -- Never happens | ||
377 | -- >>> ] | ||
378 | -- | ||
379 | -- Because this BucketTouchEvent type is not actually implemented and we only | ||
380 | -- receive notifications of a node's new state, it suffices to reschedule the | ||
381 | -- bucket refresh 'touchBucket' on every transition to a state other than | ||
382 | -- 'Applicant'. | ||
383 | -- | ||
384 | -- XXX: Unfortunately, this means redundantly triggering twice upon every node | ||
385 | -- replacement because we do not currently distinguish between standalone | ||
386 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
387 | -- replacement. | ||
388 | -- | ||
389 | -- It might also be better to pass the timestamp of the transition here and | ||
390 | -- keep the refresh queue in better sync with the routing table by updating it | ||
391 | -- within the STM monad. | ||
392 | -- | ||
393 | -- We embed the result in the STM monad but currently, no STM state changes | ||
394 | -- occur until the returned IO action is invoked. TODO: simplify? | ||
395 | touchBucket :: SensibleNodeId nid ni | ||
396 | => BucketRefresher nid ni | ||
397 | -> RoutingTransition ni -- ^ What happened to the bucket? | ||
398 | -> STM (IO ()) | ||
399 | touchBucket r@BucketRefresher{ refreshSearch | ||
400 | , refreshInterval | ||
401 | , refreshBuckets | ||
402 | , refreshQueue | ||
403 | , refreshLastTouch | ||
404 | , bootstrapMode | ||
405 | , bootstrapCountdown } | ||
406 | RoutingTransition{ transitionedTo | ||
407 | , transitioningNode } | ||
408 | = case transitionedTo of | ||
409 | Applicant -> return $ return () -- Ignore transition to applicant. | ||
410 | _ -> return $ do -- Reschedule for any other transition. | ||
411 | now <- getPOSIXTime | ||
412 | join $ atomically $ do | ||
413 | let space = searchSpace refreshSearch | ||
414 | nid = kademliaLocation space transitioningNode | ||
415 | tbl <- readTVar refreshBuckets | ||
416 | let num = R.bucketNumber space nid tbl | ||
417 | stamp <- readTVar refreshLastTouch | ||
418 | action <- case stamp /= 0 && (now - stamp > 60) of | ||
419 | True -> do | ||
420 | -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. | ||
421 | restartBootstrap r | ||
422 | False -> return $ return () | ||
423 | interval <- effectiveRefreshInterval r num | ||
424 | modifyTVar' refreshQueue $ Int.insert num (now + interval) | ||
425 | writeTVar refreshLastTouch now | ||
426 | return action | ||
427 | |||
428 | refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni | ||
429 | refreshKademlia r@BucketRefresher { refreshSearch = sch | ||
430 | , refreshPing = ping | ||
431 | , refreshBuckets = bkts | ||
432 | } | ||
433 | = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping) | ||
434 | { tblTransition = \tr -> do | ||
435 | io <- touchBucket r tr | ||
436 | return io | ||
437 | } | ||