path: root/src/Network/Kademlia/Bootstrap.hs
diff options
Diffstat (limited to 'src/Network/Kademlia/Bootstrap.hs')
1 files changed, 0 insertions, 437 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
deleted file mode 100644
index 1324ae77..00000000
--- a/src/Network/Kademlia/Bootstrap.hs
+++ /dev/null
@@ -1,437 +0,0 @@
2{-# LANGUAGE ConstraintKinds #-}
3{-# LANGUAGE DeriveFunctor #-}
4{-# LANGUAGE DeriveTraversable #-}
5{-# LANGUAGE FlexibleContexts #-}
7{-# LANGUAGE KindSignatures #-}
8{-# LANGUAGE LambdaCase #-}
9{-# LANGUAGE NamedFieldPuns #-}
10{-# LANGUAGE PartialTypeSignatures #-}
11{-# LANGUAGE PatternSynonyms #-}
12{-# LANGUAGE RankNTypes #-}
13{-# LANGUAGE ScopedTypeVariables #-}
14module Network.Kademlia.Bootstrap where
16import Data.Function
17import Data.Maybe
18import qualified Data.Set as Set
19import Data.Time.Clock.POSIX (getPOSIXTime)
20import Network.Kademlia.Routing as R
22import Control.Concurrent.Lifted.Instrument
24import Control.Concurrent.Lifted
25import GHC.Conc (labelThread)
27import Control.Concurrent.STM
28import Control.Monad
29import Data.Hashable
30import Data.Time.Clock.POSIX (POSIXTime)
31import Data.Ord
32import System.Entropy
33import System.Timeout
34import DPut
35import DebugTag
37import qualified Data.Wrapper.PSQInt as Int
38 ;import Data.Wrapper.PSQInt (pattern (:->))
39import Network.Address (bucketRange)
40import Network.Kademlia.Search
41import Control.Concurrent.Tasks
42import Network.Kademlia
44type SensibleNodeId nid ni =
45 ( Show nid
46 , Ord nid
47 , Ord ni
48 , Hashable nid
49 , Hashable ni )
51data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
52 { -- | A staleness threshold (if a bucket goes this long without being
53 -- touched, a refresh will be triggered).
54 refreshInterval :: POSIXTime
55 -- | A TVar with the time-to-refresh schedule for each bucket.
56 --
57 -- To "touch" a bucket and prevent it from being refreshed, reschedule
58 -- its refresh time to some time into the future by modifying its
59 -- priority in this priority search queue.
60 , refreshQueue :: TVar (Int.PSQ POSIXTime)
61 -- | This is the kademlia node search specification.
62 , refreshSearch :: Search nid addr tok ni ni
63 -- | The current kademlia routing table buckets.
64 , refreshBuckets :: TVar (R.BucketList ni)
65 -- | Action to ping a node. This is used only during initial bootstrap
66 -- to get some nodes in our table. A 'True' result is interpreted as a a
67 -- pong, where 'False' is a non-response.
68 , refreshPing :: ni -> IO Bool
69 , -- | Timestamp of last bucket event.
70 refreshLastTouch :: TVar POSIXTime
71 , -- | This variable indicates whether or not we are in bootstrapping mode.
72 bootstrapMode :: TVar Bool
73 , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on
74 -- every finished refresh.
75 bootstrapCountdown :: TVar (Maybe Int)
76 }
78newBucketRefresher :: ( Ord addr, Hashable addr
79 , SensibleNodeId nid ni )
80 => TVar (R.BucketList ni)
81 -> Search nid addr tok ni ni
82 -> (ni -> IO Bool)
83 -> STM (BucketRefresher nid ni)
84newBucketRefresher bkts sch ping = do
85 let spc = searchSpace sch
86 nodeId = kademliaLocation spc
87 -- bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount
88 sched <- newTVar Int.empty
89 lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas...
90 bootstrapVar <- newTVar True -- Start in bootstrapping mode.
91 bootstrapCnt <- newTVar Nothing
92 return BucketRefresher
93 { refreshInterval = 15 * 60
94 , refreshQueue = sched
95 , refreshSearch = sch
96 , refreshBuckets = bkts
97 , refreshPing = ping
98 , refreshLastTouch = lasttouch
99 , bootstrapMode = bootstrapVar
100 , bootstrapCountdown = bootstrapCnt
101 }
103-- | This was added to avoid the compile error "Record update for
104-- insufficiently polymorphic field" when trying to update the existentially
105-- quantified field 'refreshSearch'.
106updateRefresherIO :: Ord addr
107 => Search nid addr tok ni ni
108 -> (ni -> IO Bool)
109 -> BucketRefresher nid ni -> BucketRefresher nid ni
110updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
111 { refreshSearch = sch
112 , refreshPing = ping
113 , refreshInterval = refreshInterval
114 , refreshBuckets = refreshBuckets
115 , refreshQueue = refreshQueue
116 , refreshLastTouch = refreshLastTouch
117 , bootstrapMode = bootstrapMode
118 , bootstrapCountdown = bootstrapCountdown
119 }
121-- | Fork a refresh loop. Kill the returned thread to terminate it.
122forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId
123forkPollForRefresh r@BucketRefresher{ refreshInterval
124 , refreshQueue
125 , refreshBuckets
126 , refreshSearch } = fork $ do
127 myThreadId >>= flip labelThread "pollForRefresh"
128 fix $ \again -> do
129 join $ atomically $ do
130 nextup <- Int.findMin <$> readTVar refreshQueue
131 maybe retry (return . go again) nextup
132 where
133 refresh :: Int -> IO Int
134 refresh n = do
135 -- dput XRefresh $ "Refresh time! "++ show n
136 refreshBucket r n
138 go again ( bktnum :-> refresh_time ) = do
139 now <- getPOSIXTime
140 case fromEnum (refresh_time - now) of
141 x | x <= 0 -> do -- Refresh time!
142 -- Move it to the back of the refresh queue.
143 atomically $ do
144 interval <- effectiveRefreshInterval r bktnum
145 modifyTVar' refreshQueue
146 $ Int.insert bktnum (now + interval)
147 -- Now fork the refresh operation.
148 -- TODO: We should probably propogate the kill signal to this thread.
149 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
150 _ <- refresh bktnum
151 return ()
152 return ()
153 picoseconds -> do
154 -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum
155 threadDelay ( picoseconds `div` 10^6 )
156 again
159-- | This is a helper to 'refreshBucket' which does some book keeping to decide
160-- whether or not a bucket is sufficiently refreshed or not. It will return
161-- false when we can terminate a node search.
162checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node.
163 -> TVar (BucketList ni) -- ^ The current routing table.
164 -> TVar (Set.Set ni) -- ^ In-range nodes found so far.
165 -> TVar Bool -- ^ The result will also be written here.
166 -> Int -- ^ The bucket number of interest.
167 -> ni -- ^ A newly found node.
168 -> STM Bool
169checkBucketFull space var resultCounter fin n found_node = do
170 let fullcount = R.defaultBucketSize
171 saveit True = writeTVar fin True >> return True
172 saveit _ = return False
173 tbl <- readTVar var
174 let counts = R.shape tbl
175 nid = kademliaLocation space found_node
176 -- Update the result set with every found node that is in the
177 -- bucket of interest.
178 when (n == R.bucketNumber space nid tbl)
179 $ modifyTVar' resultCounter (Set.insert found_node)
180 resultCount <- readTVar resultCounter
181 saveit $ case drop (n - 1) counts of
182 (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going
183 _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going
184 _ -> False -- okay, good enough, let's quit.
186-- | Called from 'refreshBucket' with the current time when a refresh of the
187-- supplied bucket number finishes.
188onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ())
189onFinishedRefresh BucketRefresher { bootstrapCountdown
190 , bootstrapMode
191 , refreshQueue
192 , refreshBuckets } num now = do
193 bootstrapping <- readTVar bootstrapMode
194 if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num
195 else do
196 tbl <- readTVar refreshBuckets
197 action <-
198 if num /= R.bktCount tbl - 1
199 then do modifyTVar' bootstrapCountdown (fmap pred)
200 return $ return () -- dput XRefresh $ "BOOTSTRAP decrement"
201 else do
202 -- The last bucket finished.
203 cnt <- readTVar bootstrapCountdown
204 case cnt of
205 Nothing -> do
206 let fullsize = R.defaultBucketSize
207 notfull (n,len) | n==num = False
208 | len>=fullsize = False
209 | otherwise = True
210 unfull = case filter notfull $ zip [0..] (R.shape tbl) of
211 [] -> [(0,0)] -- Schedule at least 1 more refresh.
212 xs -> xs
213 forM_ unfull $ \(n,_) -> do
214 -- Schedule immediate refresh for unfull buckets (other than this one).
215 modifyTVar' refreshQueue $ Int.insert n (now - 1)
216 writeTVar bootstrapCountdown $! Just $! length unfull
217 return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull
218 Just n -> do writeTVar bootstrapCountdown $! Just $! pred n
219 return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)"
220 cnt <- readTVar bootstrapCountdown
221 if (cnt == Just 0)
222 then do
223 -- Boostrap finished!
224 writeTVar bootstrapMode False
225 writeTVar bootstrapCountdown Nothing
226 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
227 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
229refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) =>
230 BucketRefresher nid ni -> Int -> IO Int
231refreshBucket r@BucketRefresher{ refreshSearch = sch
232 , refreshBuckets = var }
233 n = do
234 tbl <- atomically (readTVar var)
235 let count = bktCount tbl
236 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
237 sample <- if n+1 >= count -- Is this the last bucket?
238 then return nid -- Yes? Search our own id.
239 else kademliaSample (searchSpace sch) -- No? Generate a random id.
240 getEntropy
241 nid
242 (bucketRange n (n + 1 < count))
243 fin <- atomically $ newTVar False
244 resultCounter <- atomically $ newTVar Set.empty
246 dput XRefresh $ "Start refresh " ++ show (n,sample)
248 -- Set 15 minute timeout in order to avoid overlapping refreshes.
249 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
250 then const $ return True -- Never short-circuit the last bucket.
251 else checkBucketFull (searchSpace sch) var resultCounter fin n
252 _ <- timeout (15*60*1000000) $ do
253 atomically $ searchIsFinished s >>= check
254 atomically $ searchCancel s
255 dput XDHT $ "Finish refresh " ++ show (n,sample)
256 now <- getPOSIXTime
257 join $ atomically $ onFinishedRefresh r n now
258 rcount <- atomically $ do
259 c <- Set.size <$> readTVar resultCounter
260 b <- readTVar fin
261 return $ if b then 1 else c
262 return rcount
264refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ()
265refreshLastBucket r@BucketRefresher { refreshBuckets
266 , refreshQueue } = do
268 now <- getPOSIXTime
269 atomically $ do
270 cnt <- bktCount <$> readTVar refreshBuckets
271 -- Schedule immediate refresh.
272 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
274restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
275 BucketRefresher nid ni -> STM (IO ())
276restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
277 unchanged <- readTVar bootstrapMode
278 writeTVar bootstrapMode True
279 writeTVar bootstrapCountdown Nothing
280 if not unchanged then return $ do
281 dput XRefresh "BOOTSTRAP entered bootstrap mode"
282 refreshLastBucket r
283 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
285bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
286 BucketRefresher nid ni
287 -> t1 ni -- ^ Nodes to bootstrap from.
288 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
289 -> IO ()
290bootstrap r@BucketRefresher { refreshSearch = sch
291 , refreshBuckets = var
292 , refreshPing = ping
293 , bootstrapMode } ns ns0 = do
294 gotPing <- atomically $ newTVar False
296 -- First, ping the given nodes so that they are added to
297 -- our routing table.
298 withTaskGroup "bootstrap.resume" 20 $ \g -> do
299 forM_ ns $ \n -> do
300 let lbl = show $ kademliaLocation (searchSpace sch) n
301 forkTask g lbl $ do
302 b <- ping n
303 when b $ atomically $ writeTVar gotPing True
305 -- We resort to the hardcoded fallback nodes only when we got no
306 -- responses. This is to lesson the burden on well-known boostrap
307 -- nodes.
308 fallback <- atomically (readTVar gotPing) >>= return . when . not
309 fallback $ withTaskGroup "" 20 $ \g -> do
310 forM_ ns0 $ \n -> do
311 forkTask g (show $ kademliaLocation (searchSpace sch) n)
312 (void $ ping n)
313 dput XDHT "Finished bootstrap pings."
314 -- Now search our own Id by entering bootstrap mode from non-bootstrap mode.
315 join $ atomically $ do
316 writeTVar bootstrapMode False
317 restartBootstrap r
318 --
319 -- Hopefully 'forkPollForRefresh' was invoked and can take over
320 -- maintenance.
323effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime
324effectiveRefreshInterval BucketRefresher{ refreshInterval
325 , refreshBuckets
326 , bootstrapMode } num = do
327 tbl <- readTVar refreshBuckets
328 bootstrapping <- readTVar bootstrapMode
329 case bootstrapping of
330 False -> return refreshInterval
331 True -> do
332 -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds.
333 let fullcount = R.defaultBucketSize
334 count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl
335 if count == fullcount
336 then return refreshInterval
337 else return 15 -- seconds
341-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
342-- changes. This will typically be invoked from 'tblTransition'.
344-- From BEP 05:
346-- > Each bucket should maintain a "last changed" property to indicate how
347-- > "fresh" the contents are.
349-- We will use a "time to next refresh" property instead and store it in
350-- a priority search queue.
352-- In detail using an expository (not actually implemented) type
353-- 'BucketTouchEvent'...
355-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
356-- >>> bucketEvents =
357-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
358-- >>>
359-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
360-- >>>
361-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
362-- >>> , Applicant :--> Accepted -- with another node,
363-- >>> ]
365-- the bucket's last changed property should be updated. Buckets that have not
366-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
367-- This is done by picking a random ID in the range of the bucket and
368-- performing a find_nodes search on it.
370-- The only other possible BucketTouchEvents are as follows:
372-- >>> not_handled =
373-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
374-- >>> -- (Applicant :--> Stranger)
375-- >>> -- (Applicant :--> Accepted)
376-- >>> , Accepted :--> Applicant -- Never happens
377-- >>> ]
379-- Because this BucketTouchEvent type is not actually implemented and we only
380-- receive notifications of a node's new state, it suffices to reschedule the
381-- bucket refresh 'touchBucket' on every transition to a state other than
382-- 'Applicant'.
384-- XXX: Unfortunately, this means redundantly triggering twice upon every node
385-- replacement because we do not currently distinguish between standalone
386-- insertion/deletion events and an insertion/deletion pair constituting
387-- replacement.
389-- It might also be better to pass the timestamp of the transition here and
390-- keep the refresh queue in better sync with the routing table by updating it
391-- within the STM monad.
393-- We embed the result in the STM monad but currently, no STM state changes
394-- occur until the returned IO action is invoked. TODO: simplify?
395touchBucket :: SensibleNodeId nid ni
396 => BucketRefresher nid ni
397 -> RoutingTransition ni -- ^ What happened to the bucket?
398 -> STM (IO ())
399touchBucket r@BucketRefresher{ refreshSearch
400 , refreshInterval
401 , refreshBuckets
402 , refreshQueue
403 , refreshLastTouch
404 , bootstrapMode
405 , bootstrapCountdown }
406 RoutingTransition{ transitionedTo
407 , transitioningNode }
408 = case transitionedTo of
409 Applicant -> return $ return () -- Ignore transition to applicant.
410 _ -> return $ do -- Reschedule for any other transition.
411 now <- getPOSIXTime
412 join $ atomically $ do
413 let space = searchSpace refreshSearch
414 nid = kademliaLocation space transitioningNode
415 tbl <- readTVar refreshBuckets
416 let num = R.bucketNumber space nid tbl
417 stamp <- readTVar refreshLastTouch
418 action <- case stamp /= 0 && (now - stamp > 60) of
419 True -> do
420 -- It's been one minute since any bucket has been touched, re-enter bootstrap mode.
421 restartBootstrap r
422 False -> return $ return ()
423 interval <- effectiveRefreshInterval r num
424 modifyTVar' refreshQueue $ Int.insert num (now + interval)
425 writeTVar refreshLastTouch now
426 return action
428refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni
429refreshKademlia r@BucketRefresher { refreshSearch = sch
430 , refreshPing = ping
431 , refreshBuckets = bkts
432 }
433 = Kademlia quietInsertions (searchSpace sch) (vanillaIO bkts ping)
434 { tblTransition = \tr -> do
435 io <- touchBucket r tr
436 return io
437 }