summaryrefslogtreecommitdiff
path: root/kad/src/Network/Kademlia/Bootstrap.hs
diff options
context:
space:
mode:
Diffstat (limited to 'kad/src/Network/Kademlia/Bootstrap.hs')
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs439
1 files changed, 439 insertions, 0 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
new file mode 100644
index 00000000..08ba3318
--- /dev/null
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -0,0 +1,439 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE GADTs #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE RecordWildCards #-}
11{-# LANGUAGE NondecreasingIndentation #-}
12{-# LANGUAGE PartialTypeSignatures #-}
13{-# LANGUAGE PatternSynonyms #-}
14{-# LANGUAGE RankNTypes #-}
15{-# LANGUAGE ScopedTypeVariables #-}
16module Network.Kademlia.Bootstrap where
17
18import Data.Function
19import Data.Maybe
20import qualified Data.Set as Set
21import Data.Time.Clock.POSIX (getPOSIXTime)
22import Network.Kademlia.Routing as R
23#ifdef THREAD_DEBUG
24import Control.Concurrent.Lifted.Instrument
25#else
26import Control.Concurrent.Lifted
27import GHC.Conc (labelThread)
28#endif
29import Control.Concurrent.STM
30import Control.Monad
31import Data.Hashable
32import Data.Time.Clock.POSIX (POSIXTime)
33import Data.Ord
34import System.Entropy
35import System.Timeout
36import DPut
37import DebugTag
38
39import qualified Data.Wrapper.PSQInt as Int
40 ;import Data.Wrapper.PSQInt (pattern (:->))
41import Network.Address (bucketRange)
42import Network.Kademlia.Search
43import Control.Concurrent.Tasks
44import Network.Kademlia
45
46type SensibleNodeId nid ni =
47 ( Show nid
48 , Ord nid
49 , Ord ni
50 , Hashable nid
51 , Hashable ni )
52
53data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
54 { -- | A staleness threshold (if a bucket goes this long without being
55 -- touched, a refresh will be triggered).
56 refreshInterval :: POSIXTime
57 -- | A TVar with the time-to-refresh schedule for each bucket.
58 --
59 -- To "touch" a bucket and prevent it from being refreshed, reschedule
60 -- its refresh time to some time into the future by modifying its
61 -- priority in this priority search queue.
62 , refreshQueue :: TVar (Int.PSQ POSIXTime)
63 -- | This is the kademlia node search specification.
64 , refreshSearch :: Search nid addr tok ni ni
65 -- | The current kademlia routing table buckets.
66 , refreshBuckets :: TVar (R.BucketList ni)
67 -- | Action to ping a node. This is used only during initial bootstrap
68 -- to get some nodes in our table. A 'True' result is interpreted as a a
69 -- pong, where 'False' is a non-response.
70 , refreshPing :: ni -> IO Bool
71 , -- | Timestamp of last bucket event.
72 refreshLastTouch :: TVar POSIXTime
73 , -- | This variable indicates whether or not we are in bootstrapping mode.
74 bootstrapMode :: TVar Bool
75 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
76 -- every finished refresh.
77 bootstrapCountdown :: TVar (Maybe Int)
78 }
79
80newBucketRefresher :: ( Ord addr, Hashable addr
81 , SensibleNodeId nid ni )
82 => TVar (R.BucketList ni)
83 -> Search nid addr tok ni ni
84 -> (ni -> IO Bool)
85 -> STM (BucketRefresher nid ni)
86newBucketRefresher bkts sch ping = do
87 let spc = searchSpace sch
88 nodeId = kademliaLocation spc
89 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
90 sched <- newTVar Int.empty
91 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
92 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
93 bootstrapCnt <- newTVar Nothing
94 return BucketRefresher
95 { refreshInterval = 15 * 60
96 , refreshQueue = sched
97 , refreshSearch = sch
98 , refreshBuckets = bkts
99 , refreshPing = ping
100 , refreshLastTouch = lasttouch
101 , bootstrapMode = bootstrapVar
102 , bootstrapCountdown = bootstrapCnt
103 }
104
105-- | This was added to avoid the compile error "Record update for
106-- insufficiently polymorphic field" when trying to update the existentially
107-- quantified field 'refreshSearch'.
108updateRefresherIO :: Ord addr
109 => Search nid addr tok ni ni
110 -> (ni -> IO Bool)
111 -> BucketRefresher nid ni -> BucketRefresher nid ni
112updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
113 { refreshSearch = sch
114 , refreshPing = ping
115 , refreshInterval = refreshInterval
116 , refreshBuckets = refreshBuckets
117 , refreshQueue = refreshQueue
118 , refreshLastTouch = refreshLastTouch
119 , bootstrapMode = bootstrapMode
120 , bootstrapCountdown = bootstrapCountdown
121 }
122
123-- | Fork a refresh loop. Kill the returned thread to terminate it.
124forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
125forkPollForRefresh r@BucketRefresher{ refreshInterval
126 , refreshQueue
127 , refreshBuckets
128 , refreshSearch } = fork $ do
129 myThreadId >>= flip labelThread "pollForRefresh"
130 fix $ \again -> do
131 join $ atomically $ do
132 nextup <- Int.findMin <$> readTVar refreshQueue
133 maybe retry (return . go again) nextup
134 where
135 refresh :: Int -> IO Int
136 refresh n = do
137 -- dput XRefresh $ "Refresh time! "++ show n
138 refreshBucket r n
139
140 go again ( bktnum :-> refresh_time ) = do
141 now <- getPOSIXTime
142 case fromEnum (refresh_time - now) of
143 x | x <= 0 -> do -- Refresh time!
144 -- Move it to the back of the refresh queue.
145 atomically $ do
146 interval <- effectiveRefreshInterval r bktnum
147 modifyTVar' refreshQueue
148 $ Int.insert bktnum (now + interval)
149 -- Now fork the refresh operation.
150 -- TODO: We should probably propogate the kill signal to this thread.
151 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
152 _ <- refresh bktnum
153 return ()
154 return ()
155 picoseconds -> do
156 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
157 threadDelay ( picoseconds `div` 10^6 )
158 again
159
160
161-- | This is a helper to 'refreshBucket' which does some book keeping to decide
162-- whether or not a bucket is sufficiently refreshed or not. It will return
163-- false when we can terminate a node search.
164checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
165 -> TVar (BucketList ni) -- ^ The current routing table.
166 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
167 -> TVar Bool -- ^ The result will also be written here.
168 -> Int -- ^ The bucket number of interest.
169 -> ni -- ^ A newly found node.
170 -> STM Bool
171checkBucketFull space var resultCounter fin n found_node = do
172 let fullcount = R.defaultBucketSize
173 saveit True = writeTVar fin True >> return True
174 saveit _ = return False
175 tbl <- readTVar var
176 let counts = R.shape tbl
177 nid = kademliaLocation space found_node
178 -- Update the result set with every found node that is in the
179 -- bucket of interest.
180 when (n == R.bucketNumber space nid tbl)
181 $ modifyTVar' resultCounter (Set.insert found_node)
182 resultCount <- readTVar resultCounter
183 saveit $ case drop (n - 1) counts of
184 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
185 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
186 _ -> False -- okay, good enough, let's quit.
187
188-- | Called from 'refreshBucket' with the current time when a refresh of the
189-- supplied bucket number finishes.
190onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
191onFinishedRefresh BucketRefresher { bootstrapCountdown
192 , bootstrapMode
193 , refreshQueue
194 , refreshBuckets } num now = do
195 bootstrapping <- readTVar bootstrapMode
196 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
197 else do
198 tbl <- readTVar refreshBuckets
199 action <-
200 if num /= R.bktCount tbl - 1
201 then do modifyTVar' bootstrapCountdown (fmap pred)
202 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
203 else do
204 -- The last bucket finished.
205 cnt <- readTVar bootstrapCountdown
206 case cnt of
207 Nothing -> do
208 let fullsize = R.defaultBucketSize
209 notfull (n,len) | n==num = False
210 | len>=fullsize = False
211 | otherwise = True
212 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
213 [] -> [(0,0)] -- Schedule at least 1 more refresh.
214 xs -> xs
215 forM_ unfull $ \(n,_) -> do
216 -- Schedule immediate refresh for unfull buckets (other than this one).
217 modifyTVar' refreshQueue $ Int.insert n (now - 1)
218 writeTVar bootstrapCountdown $! Just $! length unfull
219 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
220 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
221 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
222 cnt <- readTVar bootstrapCountdown
223 if (cnt == Just 0)
224 then do
225 -- Boostrap finished!
226 writeTVar bootstrapMode False
227 writeTVar bootstrapCountdown Nothing
228 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
229 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
230
231refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
232 BucketRefresher nid ni -> Int -> IO Int
233refreshBucket r@BucketRefresher{ refreshSearch = sch
234 , refreshBuckets = var }
235 n = do
236 tbl <- atomically (readTVar var)
237 let count = bktCount tbl
238 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
239 sample <- if n+1 >= count -- Is this the last bucket?
240 then return nid -- Yes? Search our own id.
241 else kademliaSample (searchSpace sch) -- No? Generate a random id.
242 getEntropy
243 nid
244 (bucketRange n (n + 1 < count))
245 fin <- atomically $ newTVar False
246 resultCounter <- atomically $ newTVar Set.empty
247
248 dput XRefresh $ "Start refresh " ++ show (n,sample)
249
250 -- Set 15 minute timeout in order to avoid overlapping refreshes.
251 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
252 then const $ return True -- Never short-circuit the last bucket.
253 else checkBucketFull (searchSpace sch) var resultCounter fin n
254 _ <- timeout (15*60*1000000) $ do
255 atomically $ searchIsFinished s >>= check
256 atomically $ searchCancel s
257 dput XDHT $ "Finish refresh " ++ show (n,sample)
258 now <- getPOSIXTime
259 join $ atomically $ onFinishedRefresh r n now
260 rcount <- atomically $ do
261 c <- Set.size <$> readTVar resultCounter
262 b <- readTVar fin
263 return $ if b then 1 else c
264 return rcount
265
266refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
267refreshLastBucket r@BucketRefresher { refreshBuckets
268 , refreshQueue } = do
269
270 now <- getPOSIXTime
271 atomically $ do
272 cnt <- bktCount <$> readTVar refreshBuckets
273 -- Schedule immediate refresh.
274 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
275
276restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
277 BucketRefresher nid ni -> STM (IO ())
278restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
279 unchanged <- readTVar bootstrapMode
280 writeTVar bootstrapMode True
281 writeTVar bootstrapCountdown Nothing
282 if not unchanged then return $ do
283 dput XRefresh "BOOTSTRAP entered bootstrap mode"
284 refreshLastBucket r
285 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
286
287bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
288 BucketRefresher nid ni
289 -> t1 ni -- ^ Nodes to bootstrap from.
290 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
291 -> IO ()
292bootstrap r@BucketRefresher { refreshSearch = sch
293 , refreshBuckets = var
294 , refreshPing = ping
295 , bootstrapMode } ns ns0 = do
296 gotPing <- atomically $ newTVar False
297
298 -- First, ping the given nodes so that they are added to
299 -- our routing table.
300 withTaskGroup "bootstrap.resume" 20 $ \g -> do
301 forM_ ns $ \n -> do
302 let lbl = show $ kademliaLocation (searchSpace sch) n
303 forkTask g lbl $ do
304 b <- ping n
305 when b $ atomically $ writeTVar gotPing True
306
307 -- We resort to the hardcoded fallback nodes only when we got no
308 -- responses. This is to lesson the burden on well-known boostrap
309 -- nodes.
310 fallback <- atomically (readTVar gotPing) >>= return . when . not
311 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
312 forM_ ns0 $ \n -> do
313 forkTask g (show $ kademliaLocation (searchSpace sch) n)
314 (void $ ping n)
315 dput XDHT "Finished bootstrap pings."
316 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
317 join $ atomically $ do
318 writeTVar bootstrapMode False
319 restartBootstrap r
320 --
321 -- Hopefully 'forkPollForRefresh' was invoked and can take over
322 -- maintenance.
323
324
325effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
326effectiveRefreshInterval BucketRefresher{ refreshInterval
327 , refreshBuckets
328 , bootstrapMode } num = do
329 tbl <- readTVar refreshBuckets
330 bootstrapping <- readTVar bootstrapMode
331 case bootstrapping of
332 False -> return refreshInterval
333 True -> do
334 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
335 let fullcount = R.defaultBucketSize
336 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
337 if count == fullcount
338 then return refreshInterval
339 else return 15 -- seconds
340
341
342
343-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
344-- changes. This will typically be invoked from 'tblTransition'.
345--
346-- From BEP 05:
347--
348-- > Each bucket should maintain a "last changed" property to indicate how
349-- > "fresh" the contents are.
350--
351-- We will use a "time to next refresh" property instead and store it in
352-- a priority search queue.
353--
354-- In detail using an expository (not actually implemented) type
355-- 'BucketTouchEvent'...
356--
357-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
358-- >>> bucketEvents =
359-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
360-- >>>
361-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
362-- >>>
363-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
364-- >>> , Applicant :--> Accepted -- with another node,
365-- >>> ]
366--
367-- the bucket's last changed property should be updated. Buckets that have not
368-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
369-- This is done by picking a random ID in the range of the bucket and
370-- performing a find_nodes search on it.
371--
372-- The only other possible BucketTouchEvents are as follows:
373--
374-- >>> not_handled =
375-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
376-- >>> -- (Applicant :--> Stranger)
377-- >>> -- (Applicant :--> Accepted)
378-- >>> , Accepted :--> Applicant -- Never happens
379-- >>> ]
380--
381-- Because this BucketTouchEvent type is not actually implemented and we only
382-- receive notifications of a node's new state, it suffices to reschedule the
383-- bucket refresh 'touchBucket' on every transition to a state other than
384-- 'Applicant'.
385--
386-- XXX: Unfortunately, this means redundantly triggering twice upon every node
387-- replacement because we do not currently distinguish between standalone
388-- insertion/deletion events and an insertion/deletion pair constituting
389-- replacement.
390--
391-- It might also be better to pass the timestamp of the transition here and
392-- keep the refresh queue in better sync with the routing table by updating it
393-- within the STM monad.
394--
395-- We embed the result in the STM monad but currently, no STM state changes
396-- occur until the returned IO action is invoked. TODO: simplify?
397touchBucket :: SensibleNodeId nid ni
398 => BucketRefresher nid ni
399 -> RoutingTransition ni -- ^ What happened to the bucket?
400 -> STM (IO ())
401touchBucket r@BucketRefresher{ refreshSearch
402 , refreshInterval
403 , refreshBuckets
404 , refreshQueue
405 , refreshLastTouch
406 , bootstrapMode
407 , bootstrapCountdown }
408 RoutingTransition{ transitionedTo
409 , transitioningNode }
410 = case transitionedTo of
411 Applicant -> return $ return () -- Ignore transition to applicant.
412 _ -> return $ do -- Reschedule for any other transition.
413 now <- getPOSIXTime
414 join $ atomically $ do
415 let space = searchSpace refreshSearch
416 nid = kademliaLocation space transitioningNode
417 tbl <- readTVar refreshBuckets
418 let num = R.bucketNumber space nid tbl
419 stamp <- readTVar refreshLastTouch
420 action <- case stamp /= 0 && (now - stamp > 60) of
421 True -> do
422 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
423 restartBootstrap r
424 False -> return $ return ()
425 interval <- effectiveRefreshInterval r num
426 modifyTVar' refreshQueue $ Int.insert num (now + interval)
427 writeTVar refreshLastTouch now
428 return action
429
430refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
431refreshKademlia r@BucketRefresher { refreshSearch = sch
432 , refreshPing = ping
433 , refreshBuckets = bkts
434 }
435 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
436 { tblTransition = \tr -> do
437 io <- touchBucket r tr
438 return io
439 }