From 8c94bb53cc2eb09a5e1c550c3430935701c6f090 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 7 Nov 2017 17:12:09 -0500 Subject: Minor refactoring. --- src/Network/Kademlia/Bootstrap.hs | 134 ++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 71 deletions(-) (limited to 'src') diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 42bff665..92a20ca5 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs @@ -45,41 +45,6 @@ import Network.Kademlia.Search import Control.Concurrent.Tasks import Network.Kademlia --- From BEP 05: --- --- Each bucket should maintain a "last changed" property to indicate how --- "fresh" the contents are. --- --- Note: We will use a "time to next refresh" property instead and store it in --- a priority search queue. --- --- When... --- --- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus --- >>> bucketEvents = --- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, --- >>> --- >>> , Stranger :--> Accepted -- or a node is added to a bucket, --- >>> --- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced --- >>> , Applicant :--> Accepted -- with another node, --- >>> ] --- --- the bucket's last changed property should be updated. Buckets --- that have not been changed in 15 minutes should be "refreshed." This is done --- by picking a random ID in the range of the bucket and performing a --- find_nodes search on it. --- --- The only other possible BucketTouchEvents are as follows: --- --- >>> not_handled = --- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: --- >>> -- (Applicant :--> Stranger) --- >>> -- (Applicant :--> Accepted) --- >>> , Accepted :--> Applicant -- Never happens --- >>> ] --- - type SensibleNodeId nid ni = ( Show nid , Ord nid @@ -199,7 +164,10 @@ refreshBucket sch var n = do return rcount bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => - BucketRefresher nid ni -> t1 ni -> t ni -> IO () + BucketRefresher nid ni + -> t1 ni -- ^ Nodes to bootstrap from. + -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. + -> IO () bootstrap BucketRefresher { refreshSearch = sch , refreshBuckets = var , refreshPing = ping } ns ns0 = do @@ -223,60 +191,84 @@ bootstrap BucketRefresher { refreshSearch = sch forkTask g (show $ kademliaLocation (searchSpace sch) n) (void $ ping n) hPutStrLn stderr "Finished bootstrap pings." - - -- Now run searches until all the buckets are full. On a small network, - -- this may never quit. + -- Now search our own Id by refreshing the last bucket. + last <- atomically $ bktCount <$> readTVar var + void $ refreshBucket sch var last + -- That's it. -- - -- TODO: For small networks, we should give up on filling a nearby bucket - -- at some point and move on to one farther away. - flip fix 1 $ \again cnt -> do - when (cnt==0) $ do - -- Force a delay in case the search returns too quickly - hPutStrLn stderr $ "Zero results, forcing 1 minute delay" - threadDelay (60 * 1000000) - tbl <- atomically $ readTVar var - let shp = zip (R.shape tbl) [0 .. ] - unfull = filter ( (< R.defaultBucketSize) . fst ) shp - case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of - [] -> do - when (length shp < R.defaultBucketCount) $ do - -- Not enough buckets, keep trying. - hPutStrLn stderr - $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) - cnt <- refreshBucket sch var - (R.defaultBucketCount - 1) - again cnt - (size,num):_ -> do - hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp) - cnt <- refreshBucket sch var num - again cnt + -- Hopefully 'forkPollForRefresh' was invoked and can take over + -- maintenance. + -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket -- changes. This will typically be invoked from 'tblTransition'. -- --- XXX: This will be redundantly triggered twice upon every node replacement --- because we do not currently distinguish between standalone +-- From BEP 05: +-- +-- > Each bucket should maintain a "last changed" property to indicate how +-- > "fresh" the contents are. +-- +-- We will use a "time to next refresh" property instead and store it in +-- a priority search queue. +-- +-- In detail using an expository (not actually implemented) type +-- 'BucketTouchEvent'... +-- +-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus +-- >>> bucketEvents = +-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, +-- >>> +-- >>> , Stranger :--> Accepted -- or a node is added to a bucket, +-- >>> +-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced +-- >>> , Applicant :--> Accepted -- with another node, +-- >>> ] +-- +-- the bucket's last changed property should be updated. Buckets that have not +-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." +-- This is done by picking a random ID in the range of the bucket and +-- performing a find_nodes search on it. +-- +-- The only other possible BucketTouchEvents are as follows: +-- +-- >>> not_handled = +-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: +-- >>> -- (Applicant :--> Stranger) +-- >>> -- (Applicant :--> Accepted) +-- >>> , Accepted :--> Applicant -- Never happens +-- >>> ] +-- +-- Because this BucketTouchEvent type is not actually implemented and we only +-- receive notifications of a node's new state, it suffices to reschedule the +-- bucket refresh 'touchBucket' on every transition to a state other than +-- 'Applicant'. +-- +-- XXX: Unfortunately, this means redundantly triggering twice upon every node +-- replacement because we do not currently distinguish between standalone -- insertion/deletion events and an insertion/deletion pair constituting -- replacement. -- -- It might also be better to pass the timestamp of the transition here and -- keep the refresh queue in better sync with the routing table by updating it -- within the STM monad. +-- +-- We embed the result in the STM monad but currently, no STM state changes +-- occur until the returned IO action is invoked. TODO: simplify? touchBucket :: BucketRefresher nid ni -> RoutingTransition ni -- ^ What happened to the bucket? -> STM (IO ()) touchBucket BucketRefresher{ refreshSearch , refreshInterval , refreshBuckets - , refreshQueue - } - tr - | (transitionedTo tr == Applicant) - = return $ return () - | otherwise = return $ do + , refreshQueue } + RoutingTransition{ transitionedTo + , transitioningNode } + = case transitionedTo of + Applicant -> return $ return () -- Ignore transition to applicant. + _ -> return $ do -- Reschedule for any other transition. now <- getPOSIXTime atomically $ do let space = searchSpace refreshSearch - nid = kademliaLocation space (transitioningNode tr) + nid = kademliaLocation space transitioningNode num <- R.bucketNumber space nid <$> readTVar refreshBuckets modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) -- cgit v1.2.3