summaryrefslogtreecommitdiff
path: root/src/Network/Kademlia.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
committerjoe <joe@jerkface.net>2017-11-06 00:16:36 -0500
commit159f60689ab70413d963d8103b561dd587c448d6 (patch)
treed7d2a36839f7d30424731b0b7c4d46bd129a553a /src/Network/Kademlia.hs
parente3efc5455185bc984b2e49f0d54c9bded1b4f269 (diff)
Factored out Network.Kademlia.Bootstrap.
Diffstat (limited to 'src/Network/Kademlia.hs')
-rw-r--r--src/Network/Kademlia.hs210
1 files changed, 0 insertions, 210 deletions
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs
index f1ffda19..8956df2c 100644
--- a/src/Network/Kademlia.hs
+++ b/src/Network/Kademlia.hs
@@ -178,213 +178,3 @@ insertNode (Kademlia reporter space io) node = do
178 178
179 return () 179 return ()
180 180
181
182-- TODO: Bootstrap/Refresh
183--
184-- From BEP 05:
185--
186-- Each bucket should maintain a "last changed" property to indicate how
187-- "fresh" the contents are.
188--
189-- Note: We will use a "time to next refresh" property instead and store it in
190-- a priority search queue.
191--
192-- When...
193--
194-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
195-- >>> bucketEvents =
196-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
197-- >>>
198-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
199-- >>>
200-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
201-- >>> , Applicant :--> Accepted -- with another node,
202-- >>> ]
203--
204-- the bucket's last changed property should be updated. Buckets
205-- that have not been changed in 15 minutes should be "refreshed." This is done
206-- by picking a random ID in the range of the bucket and performing a
207-- find_nodes search on it.
208--
209-- The only other possible BucketTouchEvents are as follows:
210--
211-- >>> not_handled =
212-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
213-- >>> -- (Applicant :--> Stranger)
214-- >>> -- (Applicant :--> Accepted)
215-- >>> , Accepted :--> Applicant -- Never happens
216-- >>> ]
217--
218
219-- XXX: This will be redundantly triggered twice upon every node replacement
220-- because we do not currently distinguish between standalone
221-- insertion/deletion events and an insertion/deletion pair constituting
222-- replacement.
223--
224-- It might also be better to pass the timestamp of the transition here and
225-- keep the refresh queue in better sync with the routing table by updating it
226-- within the STM monad.
227touchBucket :: KademliaSpace nid ni
228 -> POSIXTime
229 -> TVar (BucketList ni)
230 -> TVar (Int.PSQ POSIXTime)
231 -> RoutingTransition ni
232 -> STM (IO ())
233touchBucket space interval bkts psq tr
234 | (transitionedTo tr == Applicant)
235 = return $ return ()
236 | otherwise = return $ do
237 now <- getPOSIXTime
238 atomically $ do
239 let nid = kademliaLocation space (transitioningNode tr)
240 num <- R.bucketNumber space nid <$> readTVar bkts
241 modifyTVar' psq $ Int.insert num (now + interval)
242
243-- | > pollForRefresh interval queue refresh
244--
245-- Fork a refresh loop. Kill the returned thread to terminate it. The
246-- arguments are: a staleness threshold (if a bucket goes this long without
247-- being touched, a refresh will be triggered), a TVar with the time-to-refresh
248-- schedule for each bucket, and a refresh action to be forked when a bucket
249-- excedes the staleness threshold.
250--
251-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
252-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
253-- TVar.
254forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
255forkPollForRefresh interval psq refresh = do
256 fork $ do
257 myThreadId >>= flip labelThread "pollForRefresh"
258 fix $ \again -> do
259 join $ atomically $ do
260 nextup <- Int.findMin <$> readTVar psq
261 maybe retry (return . go again) nextup
262 where
263 go again ( bktnum :-> refresh_time ) = do
264 now <- getPOSIXTime
265 case fromEnum (refresh_time - now) of
266 x | x <= 0 -> do -- Refresh time!
267 -- Move it to the back of the refresh queue.
268 atomically $ modifyTVar' psq
269 $ Int.insert bktnum (now + interval)
270 -- Now fork the refresh operation.
271 -- TODO: We should probably propogate the kill signal to this thread.
272 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
273 _ <- refresh bktnum
274 return ()
275 return ()
276 seconds -> threadDelay ( seconds * 1000000 )
277 again
278
279refreshBucket :: forall nid tok ni addr.
280 ( Show nid
281 , Serialize nid
282 , Ord nid
283 , Ord ni
284 , Ord addr
285 , Hashable nid
286 , Hashable ni ) =>
287 Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int
288refreshBucket sch var n = do
289 tbl <- atomically (readTVar var)
290 let count = bktCount tbl
291 nid = kademliaLocation (searchSpace sch) (thisNode tbl)
292 sample <- if n+1 >= count -- Is this the last bucket?
293 then return nid -- Yes? Search our own id.
294 else kademliaSample (searchSpace sch) -- No? Generate a random id.
295 getEntropy
296 nid
297 (bucketRange n (n + 1 < count))
298 fin <- atomically $ newTVar False
299 resultCounter <- atomically $ newTVar Set.empty
300 let fullcount = R.defaultBucketSize
301 saveit True = writeTVar fin True >> return True
302 saveit _ = return False
303 checkBucketFull :: ni -> STM Bool
304 checkBucketFull found_node = do
305 tbl <- readTVar var
306 let counts = R.shape tbl
307 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
308 $ modifyTVar' resultCounter (Set.insert found_node)
309 resultCount <- readTVar resultCounter
310 saveit $ case drop (n - 1) counts of
311 (cnt:_) | cnt < fullcount -> True
312 _ | Set.size resultCount < fullcount -> True
313 _ -> False
314
315 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
316
317 -- Set 15 minute timeout in order to avoid overlapping refreshes.
318 s <- search sch tbl sample $ if n+1 == R.defaultBucketCount
319 then const $ return True -- Never short-circuit the last bucket.
320 else checkBucketFull
321 _ <- timeout (15*60*1000000) $ do
322 atomically $ searchIsFinished s >>= check
323 atomically $ searchCancel s
324 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
325 rcount <- atomically $ do
326 c <- Set.size <$> readTVar resultCounter
327 b <- readTVar fin
328 return $ if b then 1 else c
329 return rcount
330
331bootstrap ::
332 ( Show nid
333 , Serialize nid
334 -- , FiniteBits nid
335 , Hashable ni
336 , Hashable nid
337 , Ord ni
338 , Ord addr
339 , Ord nid
340 , Traversable t1
341 , Traversable t
342 ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
343bootstrap sch var ping ns ns0 = do
344 gotPing <- atomically $ newTVar False
345
346 -- First, ping the given nodes so that they are added to
347 -- our routing table.
348 withTaskGroup "bootstrap.resume" 20 $ \g -> do
349 forM_ ns $ \n -> do
350 let lbl = show $ kademliaLocation (searchSpace sch) n
351 forkTask g lbl $ do
352 b <- ping n
353 when b $ atomically $ writeTVar gotPing True
354
355 -- We resort to the hardcoded fallback nodes only when we got no
356 -- responses. This is to lesson the burden on well-known boostrap
357 -- nodes.
358 fallback <- atomically (readTVar gotPing) >>= return . when . not
359 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
360 forM_ ns0 $ \n -> do
361 forkTask g (show $ kademliaLocation (searchSpace sch) n)
362 (void $ ping n)
363 hPutStrLn stderr "Finished bootstrap pings."
364
365 -- Now run searches until all the buckets are full. On a small network,
366 -- this may never quit.
367 --
368 -- TODO: For small networks, we should give up on filling a nearby bucket
369 -- at some point and move on to one farther away.
370 flip fix 1 $ \again cnt -> do
371 when (cnt==0) $ do
372 -- Force a delay in case the search returns too quickly
373 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
374 threadDelay (60 * 1000000)
375 tbl <- atomically $ readTVar var
376 let shp = zip (R.shape tbl) [0 .. ]
377 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
378 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
379 [] -> do
380 when (length shp < R.defaultBucketCount) $ do
381 -- Not enough buckets, keep trying.
382 hPutStrLn stderr
383 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
384 cnt <- refreshBucket sch var
385 (R.defaultBucketCount - 1)
386 again cnt
387 (size,num):_ -> do
388 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
389 cnt <- refreshBucket sch var num
390 again cnt