summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia/Bootstrap.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/Kademlia/Bootstrap.hs')
-rw-r--r--src/Network/Kademlia/Bootstrap.hs437
1 files changed, 0 insertions, 437 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
deleted file mode 100644
index 1324ae77..00000000
--- a/src/Network/Kademlia/Bootstrap.hs
+++ /dev/null
@@ -1,437 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE GADTs #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE PartialTypeSignatures #-}
11{-# LANGUAGE PatternSynonyms #-}
12{-# LANGUAGE RankNTypes #-}
13{-# LANGUAGE ScopedTypeVariables #-}
14module Network.Kademlia.Bootstrap where
15
16import Data.Function
17import Data.Maybe
18import qualified Data.Set as Set
19import Data.Time.Clock.POSIX (getPOSIXTime)
20import Network.Kademlia.Routing as R
21#ifdef THREAD_DEBUG
22import Control.Concurrent.Lifted.Instrument
23#else
24import Control.Concurrent.Lifted
25import GHC.Conc (labelThread)
26#endif
27import Control.Concurrent.STM
28import Control.Monad
29import Data.Hashable
30import Data.Time.Clock.POSIX (POSIXTime)
31import Data.Ord
32import System.Entropy
33import System.Timeout
34import DPut
35import DebugTag
36
37import qualified Data.Wrapper.PSQInt as Int
38 ;import Data.Wrapper.PSQInt (pattern (:->))
39import Network.Address (bucketRange)
40import Network.Kademlia.Search
41import Control.Concurrent.Tasks
42import Network.Kademlia
43
44type SensibleNodeId nid ni =
45 ( Show nid
46 , Ord nid
47 , Ord ni
48 , Hashable nid
49 , Hashable ni )
50
51data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
52 { -- | A staleness threshold (if a bucket goes this long without being
53 -- touched, a refresh will be triggered).
54 refreshInterval :: POSIXTime
55 -- | A TVar with the time-to-refresh schedule for each bucket.
56 --
57 -- To "touch" a bucket and prevent it from being refreshed, reschedule
58 -- its refresh time to some time into the future by modifying its
59 -- priority in this priority search queue.
60 , refreshQueue :: TVar (Int.PSQ POSIXTime)
61 -- | This is the kademlia node search specification.
62 , refreshSearch :: Search nid addr tok ni ni
63 -- | The current kademlia routing table buckets.
64 , refreshBuckets :: TVar (R.BucketList ni)
65 -- | Action to ping a node. This is used only during initial bootstrap
66 -- to get some nodes in our table. A 'True' result is interpreted as a a
67 -- pong, where 'False' is a non-response.
68 , refreshPing :: ni -> IO Bool
69 , -- | Timestamp of last bucket event.
70 refreshLastTouch :: TVar POSIXTime
71 , -- | This variable indicates whether or not we are in bootstrapping mode.
72 bootstrapMode :: TVar Bool
73 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
74 -- every finished refresh.
75 bootstrapCountdown :: TVar (Maybe Int)
76 }
77
78newBucketRefresher :: ( Ord addr, Hashable addr
79 , SensibleNodeId nid ni )
80 => TVar (R.BucketList ni)
81 -> Search nid addr tok ni ni
82 -> (ni -> IO Bool)
83 -> STM (BucketRefresher nid ni)
84newBucketRefresher bkts sch ping = do
85 let spc = searchSpace sch
86 nodeId = kademliaLocation spc
87 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
88 sched <- newTVar Int.empty
89 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
90 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
91 bootstrapCnt <- newTVar Nothing
92 return BucketRefresher
93 { refreshInterval = 15 * 60
94 , refreshQueue = sched
95 , refreshSearch = sch
96 , refreshBuckets = bkts
97 , refreshPing = ping
98 , refreshLastTouch = lasttouch
99 , bootstrapMode = bootstrapVar
100 , bootstrapCountdown = bootstrapCnt
101 }
102
103-- | This was added to avoid the compile error "Record update for
104-- insufficiently polymorphic field" when trying to update the existentially
105-- quantified field 'refreshSearch'.
106updateRefresherIO :: Ord addr
107 => Search nid addr tok ni ni
108 -> (ni -> IO Bool)
109 -> BucketRefresher nid ni -> BucketRefresher nid ni
110updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
111 { refreshSearch = sch
112 , refreshPing = ping
113 , refreshInterval = refreshInterval
114 , refreshBuckets = refreshBuckets
115 , refreshQueue = refreshQueue
116 , refreshLastTouch = refreshLastTouch
117 , bootstrapMode = bootstrapMode
118 , bootstrapCountdown = bootstrapCountdown
119 }
120
121-- | Fork a refresh loop. Kill the returned thread to terminate it.
122forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
123forkPollForRefresh r@BucketRefresher{ refreshInterval
124 , refreshQueue
125 , refreshBuckets
126 , refreshSearch } = fork $ do
127 myThreadId >>= flip labelThread "pollForRefresh"
128 fix $ \again -> do
129 join $ atomically $ do
130 nextup <- Int.findMin <$> readTVar refreshQueue
131 maybe retry (return . go again) nextup
132 where
133 refresh :: Int -> IO Int
134 refresh n = do
135 -- dput XRefresh $ "Refresh time! "++ show n
136 refreshBucket r n
137
138 go again ( bktnum :-> refresh_time ) = do
139 now <- getPOSIXTime
140 case fromEnum (refresh_time - now) of
141 x | x <= 0 -> do -- Refresh time!
142 -- Move it to the back of the refresh queue.
143 atomically $ do
144 interval <- effectiveRefreshInterval r bktnum
145 modifyTVar' refreshQueue
146 $ Int.insert bktnum (now + interval)
147 -- Now fork the refresh operation.
148 -- TODO: We should probably propogate the kill signal to this thread.
149 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
150 _ <- refresh bktnum
151 return ()
152 return ()
153 picoseconds -> do
154 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
155 threadDelay ( picoseconds `div` 10^6 )
156 again
157
158
159-- | This is a helper to 'refreshBucket' which does some book keeping to decide
160-- whether or not a bucket is sufficiently refreshed or not. It will return
161-- false when we can terminate a node search.
162checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
163 -> TVar (BucketList ni) -- ^ The current routing table.
164 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
165 -> TVar Bool -- ^ The result will also be written here.
166 -> Int -- ^ The bucket number of interest.
167 -> ni -- ^ A newly found node.
168 -> STM Bool
169checkBucketFull space var resultCounter fin n found_node = do
170 let fullcount = R.defaultBucketSize
171 saveit True = writeTVar fin True >> return True
172 saveit _ = return False
173 tbl <- readTVar var
174 let counts = R.shape tbl
175 nid = kademliaLocation space found_node
176 -- Update the result set with every found node that is in the
177 -- bucket of interest.
178 when (n == R.bucketNumber space nid tbl)
179 $ modifyTVar' resultCounter (Set.insert found_node)
180 resultCount <- readTVar resultCounter
181 saveit $ case drop (n - 1) counts of
182 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
183 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
184 _ -> False -- okay, good enough, let's quit.
185
186-- | Called from 'refreshBucket' with the current time when a refresh of the
187-- supplied bucket number finishes.
188onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
189onFinishedRefresh BucketRefresher { bootstrapCountdown
190 , bootstrapMode
191 , refreshQueue
192 , refreshBuckets } num now = do
193 bootstrapping <- readTVar bootstrapMode
194 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
195 else do
196 tbl <- readTVar refreshBuckets
197 action <-
198 if num /= R.bktCount tbl - 1
199 then do modifyTVar' bootstrapCountdown (fmap pred)
200 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
201 else do
202 -- The last bucket finished.
203 cnt <- readTVar bootstrapCountdown
204 case cnt of
205 Nothing -> do
206 let fullsize = R.defaultBucketSize
207 notfull (n,len) | n==num = False
208 | len>=fullsize = False
209 | otherwise = True
210 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
211 [] -> [(0,0)] -- Schedule at least 1 more refresh.
212 xs -> xs
213 forM_ unfull $ \(n,_) -> do
214 -- Schedule immediate refresh for unfull buckets (other than this one).
215 modifyTVar' refreshQueue $ Int.insert n (now - 1)
216 writeTVar bootstrapCountdown $! Just $! length unfull
217 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
218 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
219 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
220 cnt <- readTVar bootstrapCountdown
221 if (cnt == Just 0)
222 then do
223 -- Boostrap finished!
224 writeTVar bootstrapMode False
225 writeTVar bootstrapCountdown Nothing
226 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
227 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
228
229refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
230 BucketRefresher nid ni -> Int -> IO Int
231refreshBucket r@BucketRefresher{ refreshSearch = sch
232 , refreshBuckets = var }
233 n = do
234 tbl <- atomically (readTVar var)
235 let count = bktCount tbl
236 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
237 sample <- if n+1 >= count -- Is this the last bucket?
238 then return nid -- Yes? Search our own id.
239 else kademliaSample (searchSpace sch) -- No? Generate a random id.
240 getEntropy
241 nid
242 (bucketRange n (n + 1 < count))
243 fin <- atomically $ newTVar False
244 resultCounter <- atomically $ newTVar Set.empty
245
246 dput XRefresh $ "Start refresh " ++ show (n,sample)
247
248 -- Set 15 minute timeout in order to avoid overlapping refreshes.
249 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
250 then const $ return True -- Never short-circuit the last bucket.
251 else checkBucketFull (searchSpace sch) var resultCounter fin n
252 _ <- timeout (15*60*1000000) $ do
253 atomically $ searchIsFinished s >>= check
254 atomically $ searchCancel s
255 dput XDHT $ "Finish refresh " ++ show (n,sample)
256 now <- getPOSIXTime
257 join $ atomically $ onFinishedRefresh r n now
258 rcount <- atomically $ do
259 c <- Set.size <$> readTVar resultCounter
260 b <- readTVar fin
261 return $ if b then 1 else c
262 return rcount
263
264refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
265refreshLastBucket r@BucketRefresher { refreshBuckets
266 , refreshQueue } = do
267
268 now <- getPOSIXTime
269 atomically $ do
270 cnt <- bktCount <$> readTVar refreshBuckets
271 -- Schedule immediate refresh.
272 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
273
274restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
275 BucketRefresher nid ni -> STM (IO ())
276restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
277 unchanged <- readTVar bootstrapMode
278 writeTVar bootstrapMode True
279 writeTVar bootstrapCountdown Nothing
280 if not unchanged then return $ do
281 dput XRefresh "BOOTSTRAP entered bootstrap mode"
282 refreshLastBucket r
283 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
284
285bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
286 BucketRefresher nid ni
287 -> t1 ni -- ^ Nodes to bootstrap from.
288 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
289 -> IO ()
290bootstrap r@BucketRefresher { refreshSearch = sch
291 , refreshBuckets = var
292 , refreshPing = ping
293 , bootstrapMode } ns ns0 = do
294 gotPing <- atomically $ newTVar False
295
296 -- First, ping the given nodes so that they are added to
297 -- our routing table.
298 withTaskGroup "bootstrap.resume" 20 $ \g -> do
299 forM_ ns $ \n -> do
300 let lbl = show $ kademliaLocation (searchSpace sch) n
301 forkTask g lbl $ do
302 b <- ping n
303 when b $ atomically $ writeTVar gotPing True
304
305 -- We resort to the hardcoded fallback nodes only when we got no
306 -- responses. This is to lesson the burden on well-known boostrap
307 -- nodes.
308 fallback <- atomically (readTVar gotPing) >>= return . when . not
309 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
310 forM_ ns0 $ \n -> do
311 forkTask g (show $ kademliaLocation (searchSpace sch) n)
312 (void $ ping n)
313 dput XDHT "Finished bootstrap pings."
314 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
315 join $ atomically $ do
316 writeTVar bootstrapMode False
317 restartBootstrap r
318 --
319 -- Hopefully 'forkPollForRefresh' was invoked and can take over
320 -- maintenance.
321
322
323effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
324effectiveRefreshInterval BucketRefresher{ refreshInterval
325 , refreshBuckets
326 , bootstrapMode } num = do
327 tbl <- readTVar refreshBuckets
328 bootstrapping <- readTVar bootstrapMode
329 case bootstrapping of
330 False -> return refreshInterval
331 True -> do
332 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
333 let fullcount = R.defaultBucketSize
334 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
335 if count == fullcount
336 then return refreshInterval
337 else return 15 -- seconds
338
339
340
341-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
342-- changes. This will typically be invoked from 'tblTransition'.
343--
344-- From BEP 05:
345--
346-- > Each bucket should maintain a "last changed" property to indicate how
347-- > "fresh" the contents are.
348--
349-- We will use a "time to next refresh" property instead and store it in
350-- a priority search queue.
351--
352-- In detail using an expository (not actually implemented) type
353-- 'BucketTouchEvent'...
354--
355-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
356-- >>> bucketEvents =
357-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
358-- >>>
359-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
360-- >>>
361-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
362-- >>> , Applicant :--> Accepted -- with another node,
363-- >>> ]
364--
365-- the bucket's last changed property should be updated. Buckets that have not
366-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
367-- This is done by picking a random ID in the range of the bucket and
368-- performing a find_nodes search on it.
369--
370-- The only other possible BucketTouchEvents are as follows:
371--
372-- >>> not_handled =
373-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
374-- >>> -- (Applicant :--> Stranger)
375-- >>> -- (Applicant :--> Accepted)
376-- >>> , Accepted :--> Applicant -- Never happens
377-- >>> ]
378--
379-- Because this BucketTouchEvent type is not actually implemented and we only
380-- receive notifications of a node's new state, it suffices to reschedule the
381-- bucket refresh 'touchBucket' on every transition to a state other than
382-- 'Applicant'.
383--
384-- XXX: Unfortunately, this means redundantly triggering twice upon every node
385-- replacement because we do not currently distinguish between standalone
386-- insertion/deletion events and an insertion/deletion pair constituting
387-- replacement.
388--
389-- It might also be better to pass the timestamp of the transition here and
390-- keep the refresh queue in better sync with the routing table by updating it
391-- within the STM monad.
392--
393-- We embed the result in the STM monad but currently, no STM state changes
394-- occur until the returned IO action is invoked. TODO: simplify?
395touchBucket :: SensibleNodeId nid ni
396 => BucketRefresher nid ni
397 -> RoutingTransition ni -- ^ What happened to the bucket?
398 -> STM (IO ())
399touchBucket r@BucketRefresher{ refreshSearch
400 , refreshInterval
401 , refreshBuckets
402 , refreshQueue
403 , refreshLastTouch
404 , bootstrapMode
405 , bootstrapCountdown }
406 RoutingTransition{ transitionedTo
407 , transitioningNode }
408 = case transitionedTo of
409 Applicant -> return $ return () -- Ignore transition to applicant.
410 _ -> return $ do -- Reschedule for any other transition.
411 now <- getPOSIXTime
412 join $ atomically $ do
413 let space = searchSpace refreshSearch
414 nid = kademliaLocation space transitioningNode
415 tbl <- readTVar refreshBuckets
416 let num = R.bucketNumber space nid tbl
417 stamp <- readTVar refreshLastTouch
418 action <- case stamp /= 0 && (now - stamp > 60) of
419 True -> do
420 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
421 restartBootstrap r
422 False -> return $ return ()
423 interval <- effectiveRefreshInterval r num
424 modifyTVar' refreshQueue $ Int.insert num (now + interval)
425 writeTVar refreshLastTouch now
426 return action
427
428refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
429refreshKademlia r@BucketRefresher { refreshSearch = sch
430 , refreshPing = ping
431 , refreshBuckets = bkts
432 }
433 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
434 { tblTransition = \tr -> do
435 io <- touchBucket r tr
436 return io
437 }