summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-07 17:12:09 -0500
committerjoe <joe@jerkface.net>2017-11-08 02:30:43 -0500
commit8c94bb53cc2eb09a5e1c550c3430935701c6f090 (patch)
treef4be72efbc2eefbd2ab40ebbec8cfe191df8ccc5 /src
parent70a96073db817b19e98d058702b1a8aa3d4b8445 (diff)
Minor refactoring.
Diffstat (limited to 'src')
-rw-r--r--src/Network/Kademlia/Bootstrap.hs134
1 files changed, 63 insertions, 71 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs
index 42bff665..92a20ca5 100644
--- a/src/Network/Kademlia/Bootstrap.hs
+++ b/src/Network/Kademlia/Bootstrap.hs
@@ -45,41 +45,6 @@ import Network.Kademlia.Search
45import Control.Concurrent.Tasks 45import Control.Concurrent.Tasks
46import Network.Kademlia 46import Network.Kademlia
47 47
48-- From BEP 05:
49--
50-- Each bucket should maintain a "last changed" property to indicate how
51-- "fresh" the contents are.
52--
53-- Note: We will use a "time to next refresh" property instead and store it in
54-- a priority search queue.
55--
56-- When...
57--
58-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
59-- >>> bucketEvents =
60-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
61-- >>>
62-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
63-- >>>
64-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
65-- >>> , Applicant :--> Accepted -- with another node,
66-- >>> ]
67--
68-- the bucket's last changed property should be updated. Buckets
69-- that have not been changed in 15 minutes should be "refreshed." This is done
70-- by picking a random ID in the range of the bucket and performing a
71-- find_nodes search on it.
72--
73-- The only other possible BucketTouchEvents are as follows:
74--
75-- >>> not_handled =
76-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
77-- >>> -- (Applicant :--> Stranger)
78-- >>> -- (Applicant :--> Accepted)
79-- >>> , Accepted :--> Applicant -- Never happens
80-- >>> ]
81--
82
83type SensibleNodeId nid ni = 48type SensibleNodeId nid ni =
84 ( Show nid 49 ( Show nid
85 , Ord nid 50 , Ord nid
@@ -199,7 +164,10 @@ refreshBucket sch var n = do
199 return rcount 164 return rcount
200 165
201bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => 166bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
202 BucketRefresher nid ni -> t1 ni -> t ni -> IO () 167 BucketRefresher nid ni
168 -> t1 ni -- ^ Nodes to bootstrap from.
169 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
170 -> IO ()
203bootstrap BucketRefresher { refreshSearch = sch 171bootstrap BucketRefresher { refreshSearch = sch
204 , refreshBuckets = var 172 , refreshBuckets = var
205 , refreshPing = ping } ns ns0 = do 173 , refreshPing = ping } ns ns0 = do
@@ -223,60 +191,84 @@ bootstrap BucketRefresher { refreshSearch = sch
223 forkTask g (show $ kademliaLocation (searchSpace sch) n) 191 forkTask g (show $ kademliaLocation (searchSpace sch) n)
224 (void $ ping n) 192 (void $ ping n)
225 hPutStrLn stderr "Finished bootstrap pings." 193 hPutStrLn stderr "Finished bootstrap pings."
226 194 -- Now search our own Id by refreshing the last bucket.
227 -- Now run searches until all the buckets are full. On a small network, 195 last <- atomically $ bktCount <$> readTVar var
228 -- this may never quit. 196 void $ refreshBucket sch var last
197 -- That's it.
229 -- 198 --
230 -- TODO: For small networks, we should give up on filling a nearby bucket 199 -- Hopefully 'forkPollForRefresh' was invoked and can take over
231 -- at some point and move on to one farther away. 200 -- maintenance.
232 flip fix 1 $ \again cnt -> do 201
233 when (cnt==0) $ do
234 -- Force a delay in case the search returns too quickly
235 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
236 threadDelay (60 * 1000000)
237 tbl <- atomically $ readTVar var
238 let shp = zip (R.shape tbl) [0 .. ]
239 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
240 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
241 [] -> do
242 when (length shp < R.defaultBucketCount) $ do
243 -- Not enough buckets, keep trying.
244 hPutStrLn stderr
245 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
246 cnt <- refreshBucket sch var
247 (R.defaultBucketCount - 1)
248 again cnt
249 (size,num):_ -> do
250 hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp)
251 cnt <- refreshBucket sch var num
252 again cnt
253 202
254-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket 203-- | Reschedule a bucket's refresh-time. It should be called whenever a bucket
255-- changes. This will typically be invoked from 'tblTransition'. 204-- changes. This will typically be invoked from 'tblTransition'.
256-- 205--
257-- XXX: This will be redundantly triggered twice upon every node replacement 206-- From BEP 05:
258-- because we do not currently distinguish between standalone 207--
208-- > Each bucket should maintain a "last changed" property to indicate how
209-- > "fresh" the contents are.
210--
211-- We will use a "time to next refresh" property instead and store it in
212-- a priority search queue.
213--
214-- In detail using an expository (not actually implemented) type
215-- 'BucketTouchEvent'...
216--
217-- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus
218-- >>> bucketEvents =
219-- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds,
220-- >>>
221-- >>> , Stranger :--> Accepted -- or a node is added to a bucket,
222-- >>>
223-- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced
224-- >>> , Applicant :--> Accepted -- with another node,
225-- >>> ]
226--
227-- the bucket's last changed property should be updated. Buckets that have not
228-- been changed in 15 minutes (see 'refreshInterval') should be "refreshed."
229-- This is done by picking a random ID in the range of the bucket and
230-- performing a find_nodes search on it.
231--
232-- The only other possible BucketTouchEvents are as follows:
233--
234-- >>> not_handled =
235-- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered:
236-- >>> -- (Applicant :--> Stranger)
237-- >>> -- (Applicant :--> Accepted)
238-- >>> , Accepted :--> Applicant -- Never happens
239-- >>> ]
240--
241-- Because this BucketTouchEvent type is not actually implemented and we only
242-- receive notifications of a node's new state, it suffices to reschedule the
243-- bucket refresh 'touchBucket' on every transition to a state other than
244-- 'Applicant'.
245--
246-- XXX: Unfortunately, this means redundantly triggering twice upon every node
247-- replacement because we do not currently distinguish between standalone
259-- insertion/deletion events and an insertion/deletion pair constituting 248-- insertion/deletion events and an insertion/deletion pair constituting
260-- replacement. 249-- replacement.
261-- 250--
262-- It might also be better to pass the timestamp of the transition here and 251-- It might also be better to pass the timestamp of the transition here and
263-- keep the refresh queue in better sync with the routing table by updating it 252-- keep the refresh queue in better sync with the routing table by updating it
264-- within the STM monad. 253-- within the STM monad.
254--
255-- We embed the result in the STM monad but currently, no STM state changes
256-- occur until the returned IO action is invoked. TODO: simplify?
265touchBucket :: BucketRefresher nid ni 257touchBucket :: BucketRefresher nid ni
266 -> RoutingTransition ni -- ^ What happened to the bucket? 258 -> RoutingTransition ni -- ^ What happened to the bucket?
267 -> STM (IO ()) 259 -> STM (IO ())
268touchBucket BucketRefresher{ refreshSearch 260touchBucket BucketRefresher{ refreshSearch
269 , refreshInterval 261 , refreshInterval
270 , refreshBuckets 262 , refreshBuckets
271 , refreshQueue 263 , refreshQueue }
272 } 264 RoutingTransition{ transitionedTo
273 tr 265 , transitioningNode }
274 | (transitionedTo tr == Applicant) 266 = case transitionedTo of
275 = return $ return () 267 Applicant -> return $ return () -- Ignore transition to applicant.
276 | otherwise = return $ do 268 _ -> return $ do -- Reschedule for any other transition.
277 now <- getPOSIXTime 269 now <- getPOSIXTime
278 atomically $ do 270 atomically $ do
279 let space = searchSpace refreshSearch 271 let space = searchSpace refreshSearch
280 nid = kademliaLocation space (transitioningNode tr) 272 nid = kademliaLocation space transitioningNode
281 num <- R.bucketNumber space nid <$> readTVar refreshBuckets 273 num <- R.bucketNumber space nid <$> readTVar refreshBuckets
282 modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) 274 modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval)