diff options
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 134 |
1 files changed, 63 insertions, 71 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 42bff665..92a20ca5 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs | |||
@@ -45,41 +45,6 @@ import Network.Kademlia.Search | |||
45 | import Control.Concurrent.Tasks | 45 | import Control.Concurrent.Tasks |
46 | import Network.Kademlia | 46 | import Network.Kademlia |
47 | 47 | ||
48 | -- From BEP 05: | ||
49 | -- | ||
50 | -- Each bucket should maintain a "last changed" property to indicate how | ||
51 | -- "fresh" the contents are. | ||
52 | -- | ||
53 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
54 | -- a priority search queue. | ||
55 | -- | ||
56 | -- When... | ||
57 | -- | ||
58 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
59 | -- >>> bucketEvents = | ||
60 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
61 | -- >>> | ||
62 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
63 | -- >>> | ||
64 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
65 | -- >>> , Applicant :--> Accepted -- with another node, | ||
66 | -- >>> ] | ||
67 | -- | ||
68 | -- the bucket's last changed property should be updated. Buckets | ||
69 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
70 | -- by picking a random ID in the range of the bucket and performing a | ||
71 | -- find_nodes search on it. | ||
72 | -- | ||
73 | -- The only other possible BucketTouchEvents are as follows: | ||
74 | -- | ||
75 | -- >>> not_handled = | ||
76 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
77 | -- >>> -- (Applicant :--> Stranger) | ||
78 | -- >>> -- (Applicant :--> Accepted) | ||
79 | -- >>> , Accepted :--> Applicant -- Never happens | ||
80 | -- >>> ] | ||
81 | -- | ||
82 | |||
83 | type SensibleNodeId nid ni = | 48 | type SensibleNodeId nid ni = |
84 | ( Show nid | 49 | ( Show nid |
85 | , Ord nid | 50 | , Ord nid |
@@ -199,7 +164,10 @@ refreshBucket sch var n = do | |||
199 | return rcount | 164 | return rcount |
200 | 165 | ||
201 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | 166 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => |
202 | BucketRefresher nid ni -> t1 ni -> t ni -> IO () | 167 | BucketRefresher nid ni |
168 | -> t1 ni -- ^ Nodes to bootstrap from. | ||
169 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. | ||
170 | -> IO () | ||
203 | bootstrap BucketRefresher { refreshSearch = sch | 171 | bootstrap BucketRefresher { refreshSearch = sch |
204 | , refreshBuckets = var | 172 | , refreshBuckets = var |
205 | , refreshPing = ping } ns ns0 = do | 173 | , refreshPing = ping } ns ns0 = do |
@@ -223,60 +191,84 @@ bootstrap BucketRefresher { refreshSearch = sch | |||
223 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | 191 | forkTask g (show $ kademliaLocation (searchSpace sch) n) |
224 | (void $ ping n) | 192 | (void $ ping n) |
225 | hPutStrLn stderr "Finished bootstrap pings." | 193 | hPutStrLn stderr "Finished bootstrap pings." |
226 | 194 | -- Now search our own Id by refreshing the last bucket. | |
227 | -- Now run searches until all the buckets are full. On a small network, | 195 | last <- atomically $ bktCount <$> readTVar var |
228 | -- this may never quit. | 196 | void $ refreshBucket sch var last |
197 | -- That's it. | ||
229 | -- | 198 | -- |
230 | -- TODO: For small networks, we should give up on filling a nearby bucket | 199 | -- Hopefully 'forkPollForRefresh' was invoked and can take over |
231 | -- at some point and move on to one farther away. | 200 | -- maintenance. |
232 | flip fix 1 $ \again cnt -> do | 201 | |
233 | when (cnt==0) $ do | ||
234 | -- Force a delay in case the search returns too quickly | ||
235 | hPutStrLn stderr $ "Zero results, forcing 1 minute delay" | ||
236 | threadDelay (60 * 1000000) | ||
237 | tbl <- atomically $ readTVar var | ||
238 | let shp = zip (R.shape tbl) [0 .. ] | ||
239 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp | ||
240 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of | ||
241 | [] -> do | ||
242 | when (length shp < R.defaultBucketCount) $ do | ||
243 | -- Not enough buckets, keep trying. | ||
244 | hPutStrLn stderr | ||
245 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) | ||
246 | cnt <- refreshBucket sch var | ||
247 | (R.defaultBucketCount - 1) | ||
248 | again cnt | ||
249 | (size,num):_ -> do | ||
250 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (size,num,shp) | ||
251 | cnt <- refreshBucket sch var num | ||
252 | again cnt | ||
253 | 202 | ||
254 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | 203 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket |
255 | -- changes. This will typically be invoked from 'tblTransition'. | 204 | -- changes. This will typically be invoked from 'tblTransition'. |
256 | -- | 205 | -- |
257 | -- XXX: This will be redundantly triggered twice upon every node replacement | 206 | -- From BEP 05: |
258 | -- because we do not currently distinguish between standalone | 207 | -- |
208 | -- > Each bucket should maintain a "last changed" property to indicate how | ||
209 | -- > "fresh" the contents are. | ||
210 | -- | ||
211 | -- We will use a "time to next refresh" property instead and store it in | ||
212 | -- a priority search queue. | ||
213 | -- | ||
214 | -- In detail using an expository (not actually implemented) type | ||
215 | -- 'BucketTouchEvent'... | ||
216 | -- | ||
217 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
218 | -- >>> bucketEvents = | ||
219 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
220 | -- >>> | ||
221 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
222 | -- >>> | ||
223 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
224 | -- >>> , Applicant :--> Accepted -- with another node, | ||
225 | -- >>> ] | ||
226 | -- | ||
227 | -- the bucket's last changed property should be updated. Buckets that have not | ||
228 | -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." | ||
229 | -- This is done by picking a random ID in the range of the bucket and | ||
230 | -- performing a find_nodes search on it. | ||
231 | -- | ||
232 | -- The only other possible BucketTouchEvents are as follows: | ||
233 | -- | ||
234 | -- >>> not_handled = | ||
235 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
236 | -- >>> -- (Applicant :--> Stranger) | ||
237 | -- >>> -- (Applicant :--> Accepted) | ||
238 | -- >>> , Accepted :--> Applicant -- Never happens | ||
239 | -- >>> ] | ||
240 | -- | ||
241 | -- Because this BucketTouchEvent type is not actually implemented and we only | ||
242 | -- receive notifications of a node's new state, it suffices to reschedule the | ||
243 | -- bucket refresh 'touchBucket' on every transition to a state other than | ||
244 | -- 'Applicant'. | ||
245 | -- | ||
246 | -- XXX: Unfortunately, this means redundantly triggering twice upon every node | ||
247 | -- replacement because we do not currently distinguish between standalone | ||
259 | -- insertion/deletion events and an insertion/deletion pair constituting | 248 | -- insertion/deletion events and an insertion/deletion pair constituting |
260 | -- replacement. | 249 | -- replacement. |
261 | -- | 250 | -- |
262 | -- It might also be better to pass the timestamp of the transition here and | 251 | -- It might also be better to pass the timestamp of the transition here and |
263 | -- keep the refresh queue in better sync with the routing table by updating it | 252 | -- keep the refresh queue in better sync with the routing table by updating it |
264 | -- within the STM monad. | 253 | -- within the STM monad. |
254 | -- | ||
255 | -- We embed the result in the STM monad but currently, no STM state changes | ||
256 | -- occur until the returned IO action is invoked. TODO: simplify? | ||
265 | touchBucket :: BucketRefresher nid ni | 257 | touchBucket :: BucketRefresher nid ni |
266 | -> RoutingTransition ni -- ^ What happened to the bucket? | 258 | -> RoutingTransition ni -- ^ What happened to the bucket? |
267 | -> STM (IO ()) | 259 | -> STM (IO ()) |
268 | touchBucket BucketRefresher{ refreshSearch | 260 | touchBucket BucketRefresher{ refreshSearch |
269 | , refreshInterval | 261 | , refreshInterval |
270 | , refreshBuckets | 262 | , refreshBuckets |
271 | , refreshQueue | 263 | , refreshQueue } |
272 | } | 264 | RoutingTransition{ transitionedTo |
273 | tr | 265 | , transitioningNode } |
274 | | (transitionedTo tr == Applicant) | 266 | = case transitionedTo of |
275 | = return $ return () | 267 | Applicant -> return $ return () -- Ignore transition to applicant. |
276 | | otherwise = return $ do | 268 | _ -> return $ do -- Reschedule for any other transition. |
277 | now <- getPOSIXTime | 269 | now <- getPOSIXTime |
278 | atomically $ do | 270 | atomically $ do |
279 | let space = searchSpace refreshSearch | 271 | let space = searchSpace refreshSearch |
280 | nid = kademliaLocation space (transitioningNode tr) | 272 | nid = kademliaLocation space transitioningNode |
281 | num <- R.bucketNumber space nid <$> readTVar refreshBuckets | 273 | num <- R.bucketNumber space nid <$> readTVar refreshBuckets |
282 | modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) | 274 | modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) |