diff options
author | joe <joe@jerkface.net> | 2017-11-06 00:16:36 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-06 00:16:36 -0500 |
commit | 159f60689ab70413d963d8103b561dd587c448d6 (patch) | |
tree | d7d2a36839f7d30424731b0b7c4d46bd129a553a /src/Network/Kademlia.hs | |
parent | e3efc5455185bc984b2e49f0d54c9bded1b4f269 (diff) |
Factored out Network.Kademlia.Bootstrap.
Diffstat (limited to 'src/Network/Kademlia.hs')
-rw-r--r-- | src/Network/Kademlia.hs | 210 |
1 files changed, 0 insertions, 210 deletions
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs index f1ffda19..8956df2c 100644 --- a/src/Network/Kademlia.hs +++ b/src/Network/Kademlia.hs | |||
@@ -178,213 +178,3 @@ insertNode (Kademlia reporter space io) node = do | |||
178 | 178 | ||
179 | return () | 179 | return () |
180 | 180 | ||
181 | |||
182 | -- TODO: Bootstrap/Refresh | ||
183 | -- | ||
184 | -- From BEP 05: | ||
185 | -- | ||
186 | -- Each bucket should maintain a "last changed" property to indicate how | ||
187 | -- "fresh" the contents are. | ||
188 | -- | ||
189 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
190 | -- a priority search queue. | ||
191 | -- | ||
192 | -- When... | ||
193 | -- | ||
194 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
195 | -- >>> bucketEvents = | ||
196 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
197 | -- >>> | ||
198 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
199 | -- >>> | ||
200 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
201 | -- >>> , Applicant :--> Accepted -- with another node, | ||
202 | -- >>> ] | ||
203 | -- | ||
204 | -- the bucket's last changed property should be updated. Buckets | ||
205 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
206 | -- by picking a random ID in the range of the bucket and performing a | ||
207 | -- find_nodes search on it. | ||
208 | -- | ||
209 | -- The only other possible BucketTouchEvents are as follows: | ||
210 | -- | ||
211 | -- >>> not_handled = | ||
212 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
213 | -- >>> -- (Applicant :--> Stranger) | ||
214 | -- >>> -- (Applicant :--> Accepted) | ||
215 | -- >>> , Accepted :--> Applicant -- Never happens | ||
216 | -- >>> ] | ||
217 | -- | ||
218 | |||
219 | -- XXX: This will be redundantly triggered twice upon every node replacement | ||
220 | -- because we do not currently distinguish between standalone | ||
221 | -- insertion/deletion events and an insertion/deletion pair constituting | ||
222 | -- replacement. | ||
223 | -- | ||
224 | -- It might also be better to pass the timestamp of the transition here and | ||
225 | -- keep the refresh queue in better sync with the routing table by updating it | ||
226 | -- within the STM monad. | ||
227 | touchBucket :: KademliaSpace nid ni | ||
228 | -> POSIXTime | ||
229 | -> TVar (BucketList ni) | ||
230 | -> TVar (Int.PSQ POSIXTime) | ||
231 | -> RoutingTransition ni | ||
232 | -> STM (IO ()) | ||
233 | touchBucket space interval bkts psq tr | ||
234 | | (transitionedTo tr == Applicant) | ||
235 | = return $ return () | ||
236 | | otherwise = return $ do | ||
237 | now <- getPOSIXTime | ||
238 | atomically $ do | ||
239 | let nid = kademliaLocation space (transitioningNode tr) | ||
240 | num <- R.bucketNumber space nid <$> readTVar bkts | ||
241 | modifyTVar' psq $ Int.insert num (now + interval) | ||
242 | |||
243 | -- | > pollForRefresh interval queue refresh | ||
244 | -- | ||
245 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | ||
246 | -- arguments are: a staleness threshold (if a bucket goes this long without | ||
247 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | ||
248 | -- schedule for each bucket, and a refresh action to be forked when a bucket | ||
249 | -- excedes the staleness threshold. | ||
250 | -- | ||
251 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | ||
252 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | ||
253 | -- TVar. | ||
254 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId | ||
255 | forkPollForRefresh interval psq refresh = do | ||
256 | fork $ do | ||
257 | myThreadId >>= flip labelThread "pollForRefresh" | ||
258 | fix $ \again -> do | ||
259 | join $ atomically $ do | ||
260 | nextup <- Int.findMin <$> readTVar psq | ||
261 | maybe retry (return . go again) nextup | ||
262 | where | ||
263 | go again ( bktnum :-> refresh_time ) = do | ||
264 | now <- getPOSIXTime | ||
265 | case fromEnum (refresh_time - now) of | ||
266 | x | x <= 0 -> do -- Refresh time! | ||
267 | -- Move it to the back of the refresh queue. | ||
268 | atomically $ modifyTVar' psq | ||
269 | $ Int.insert bktnum (now + interval) | ||
270 | -- Now fork the refresh operation. | ||
271 | -- TODO: We should probably propogate the kill signal to this thread. | ||
272 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | ||
273 | _ <- refresh bktnum | ||
274 | return () | ||
275 | return () | ||
276 | seconds -> threadDelay ( seconds * 1000000 ) | ||
277 | again | ||
278 | |||
279 | refreshBucket :: forall nid tok ni addr. | ||
280 | ( Show nid | ||
281 | , Serialize nid | ||
282 | , Ord nid | ||
283 | , Ord ni | ||
284 | , Ord addr | ||
285 | , Hashable nid | ||
286 | , Hashable ni ) => | ||
287 | Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int | ||
288 | refreshBucket sch var n = do | ||
289 | tbl <- atomically (readTVar var) | ||
290 | let count = bktCount tbl | ||
291 | nid = kademliaLocation (searchSpace sch) (thisNode tbl) | ||
292 | sample <- if n+1 >= count -- Is this the last bucket? | ||
293 | then return nid -- Yes? Search our own id. | ||
294 | else kademliaSample (searchSpace sch) -- No? Generate a random id. | ||
295 | getEntropy | ||
296 | nid | ||
297 | (bucketRange n (n + 1 < count)) | ||
298 | fin <- atomically $ newTVar False | ||
299 | resultCounter <- atomically $ newTVar Set.empty | ||
300 | let fullcount = R.defaultBucketSize | ||
301 | saveit True = writeTVar fin True >> return True | ||
302 | saveit _ = return False | ||
303 | checkBucketFull :: ni -> STM Bool | ||
304 | checkBucketFull found_node = do | ||
305 | tbl <- readTVar var | ||
306 | let counts = R.shape tbl | ||
307 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | ||
308 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
309 | resultCount <- readTVar resultCounter | ||
310 | saveit $ case drop (n - 1) counts of | ||
311 | (cnt:_) | cnt < fullcount -> True | ||
312 | _ | Set.size resultCount < fullcount -> True | ||
313 | _ -> False | ||
314 | |||
315 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | ||
316 | |||
317 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | ||
318 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | ||
319 | then const $ return True -- Never short-circuit the last bucket. | ||
320 | else checkBucketFull | ||
321 | _ <- timeout (15*60*1000000) $ do | ||
322 | atomically $ searchIsFinished s >>= check | ||
323 | atomically $ searchCancel s | ||
324 | hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) | ||
325 | rcount <- atomically $ do | ||
326 | c <- Set.size <$> readTVar resultCounter | ||
327 | b <- readTVar fin | ||
328 | return $ if b then 1 else c | ||
329 | return rcount | ||
330 | |||
331 | bootstrap :: | ||
332 | ( Show nid | ||
333 | , Serialize nid | ||
334 | -- , FiniteBits nid | ||
335 | , Hashable ni | ||
336 | , Hashable nid | ||
337 | , Ord ni | ||
338 | , Ord addr | ||
339 | , Ord nid | ||
340 | , Traversable t1 | ||
341 | , Traversable t | ||
342 | ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () | ||
343 | bootstrap sch var ping ns ns0 = do | ||
344 | gotPing <- atomically $ newTVar False | ||
345 | |||
346 | -- First, ping the given nodes so that they are added to | ||
347 | -- our routing table. | ||
348 | withTaskGroup "bootstrap.resume" 20 $ \g -> do | ||
349 | forM_ ns $ \n -> do | ||
350 | let lbl = show $ kademliaLocation (searchSpace sch) n | ||
351 | forkTask g lbl $ do | ||
352 | b <- ping n | ||
353 | when b $ atomically $ writeTVar gotPing True | ||
354 | |||
355 | -- We resort to the hardcoded fallback nodes only when we got no | ||
356 | -- responses. This is to lesson the burden on well-known boostrap | ||
357 | -- nodes. | ||
358 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
359 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
360 | forM_ ns0 $ \n -> do | ||
361 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
362 | (void $ ping n) | ||
363 | hPutStrLn stderr "Finished bootstrap pings." | ||
364 | |||
365 | -- Now run searches until all the buckets are full. On a small network, | ||
366 | -- this may never quit. | ||
367 | -- | ||
368 | -- TODO: For small networks, we should give up on filling a nearby bucket | ||
369 | -- at some point and move on to one farther away. | ||
370 | flip fix 1 $ \again cnt -> do | ||
371 | when (cnt==0) $ do | ||
372 | -- Force a delay in case the search returns too quickly | ||
373 | hPutStrLn stderr $ "Zero results, forcing 1 minute delay" | ||
374 | threadDelay (60 * 1000000) | ||
375 | tbl <- atomically $ readTVar var | ||
376 | let shp = zip (R.shape tbl) [0 .. ] | ||
377 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp | ||
378 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of | ||
379 | [] -> do | ||
380 | when (length shp < R.defaultBucketCount) $ do | ||
381 | -- Not enough buckets, keep trying. | ||
382 | hPutStrLn stderr | ||
383 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) | ||
384 | cnt <- refreshBucket sch var | ||
385 | (R.defaultBucketCount - 1) | ||
386 | again cnt | ||
387 | (size,num):_ -> do | ||
388 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp) | ||
389 | cnt <- refreshBucket sch var num | ||
390 | again cnt | ||