diff options
Diffstat (limited to 'src/Network/Kademlia')
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 227 |
1 files changed, 127 insertions, 100 deletions
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 283e054a..42bff665 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs | |||
@@ -1,11 +1,14 @@ | |||
1 | {-# LANGUAGE CPP #-} | 1 | {-# LANGUAGE CPP #-} |
2 | {-# LANGUAGE ConstraintKinds #-} | ||
2 | {-# LANGUAGE DeriveFunctor #-} | 3 | {-# LANGUAGE DeriveFunctor #-} |
3 | {-# LANGUAGE DeriveTraversable #-} | 4 | {-# LANGUAGE DeriveTraversable #-} |
4 | {-# LANGUAGE FlexibleContexts #-} | 5 | {-# LANGUAGE FlexibleContexts #-} |
5 | {-# LANGUAGE GADTs #-} | 6 | {-# LANGUAGE GADTs #-} |
6 | {-# LANGUAGE KindSignatures #-} | 7 | {-# LANGUAGE KindSignatures #-} |
8 | {-# LANGUAGE NamedFieldPuns #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | 9 | {-# LANGUAGE PartialTypeSignatures #-} |
8 | {-# LANGUAGE PatternSynonyms #-} | 10 | {-# LANGUAGE PatternSynonyms #-} |
11 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | 12 | {-# LANGUAGE ScopedTypeVariables #-} |
10 | module Network.Kademlia.Bootstrap where | 13 | module Network.Kademlia.Bootstrap where |
11 | 14 | ||
@@ -42,34 +45,90 @@ import Network.Kademlia.Search | |||
42 | import Control.Concurrent.Tasks | 45 | import Control.Concurrent.Tasks |
43 | import Network.Kademlia | 46 | import Network.Kademlia |
44 | 47 | ||
45 | 48 | -- From BEP 05: | |
46 | -- | > pollForRefresh interval queue refresh | 49 | -- |
50 | -- Each bucket should maintain a "last changed" property to indicate how | ||
51 | -- "fresh" the contents are. | ||
52 | -- | ||
53 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
54 | -- a priority search queue. | ||
55 | -- | ||
56 | -- When... | ||
57 | -- | ||
58 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
59 | -- >>> bucketEvents = | ||
60 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
61 | -- >>> | ||
62 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
63 | -- >>> | ||
64 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
65 | -- >>> , Applicant :--> Accepted -- with another node, | ||
66 | -- >>> ] | ||
67 | -- | ||
68 | -- the bucket's last changed property should be updated. Buckets | ||
69 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
70 | -- by picking a random ID in the range of the bucket and performing a | ||
71 | -- find_nodes search on it. | ||
72 | -- | ||
73 | -- The only other possible BucketTouchEvents are as follows: | ||
47 | -- | 74 | -- |
48 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | 75 | -- >>> not_handled = |
49 | -- arguments are: a staleness threshold (if a bucket goes this long without | 76 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: |
50 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | 77 | -- >>> -- (Applicant :--> Stranger) |
51 | -- schedule for each bucket, and a refresh action to be forked when a bucket | 78 | -- >>> -- (Applicant :--> Accepted) |
52 | -- excedes the staleness threshold. | 79 | -- >>> , Accepted :--> Applicant -- Never happens |
80 | -- >>> ] | ||
53 | -- | 81 | -- |
54 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | 82 | |
55 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | 83 | type SensibleNodeId nid ni = |
56 | -- TVar. | 84 | ( Show nid |
57 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId | 85 | , Ord nid |
58 | forkPollForRefresh interval psq refresh = do | 86 | , Ord ni |
59 | fork $ do | 87 | , Hashable nid |
88 | , Hashable ni ) | ||
89 | |||
90 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | ||
91 | { -- | A staleness threshold (if a bucket goes this long without being | ||
92 | -- touched, a refresh will be triggered). | ||
93 | refreshInterval :: POSIXTime | ||
94 | -- | A TVar with the time-to-refresh schedule for each bucket. | ||
95 | -- | ||
96 | -- To "touch" a bucket and prevent it from being refreshed, reschedule | ||
97 | -- it's refresh time to some time into the future by modifying the | ||
98 | -- 'Int.PSQ' in the TVar. (See 'touchBucket'). | ||
99 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | ||
100 | -- | This is the kademlia node search specification. | ||
101 | , refreshSearch :: Search nid addr tok ni ni | ||
102 | -- | The current kademlia routing table buckets. | ||
103 | , refreshBuckets :: TVar (R.BucketList ni) | ||
104 | -- | Action to ping a node. This is used only during initial bootstrap | ||
105 | -- to get some nodes in our table. A 'True' result is interpreted as a a | ||
106 | -- pong, where 'False' is a non-response. | ||
107 | , refreshPing :: ni -> IO Bool | ||
108 | } | ||
109 | |||
110 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | ||
111 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | ||
112 | forkPollForRefresh BucketRefresher{ refreshInterval | ||
113 | , refreshQueue | ||
114 | , refreshBuckets | ||
115 | , refreshSearch } = fork $ do | ||
60 | myThreadId >>= flip labelThread "pollForRefresh" | 116 | myThreadId >>= flip labelThread "pollForRefresh" |
61 | fix $ \again -> do | 117 | fix $ \again -> do |
62 | join $ atomically $ do | 118 | join $ atomically $ do |
63 | nextup <- Int.findMin <$> readTVar psq | 119 | nextup <- Int.findMin <$> readTVar refreshQueue |
64 | maybe retry (return . go again) nextup | 120 | maybe retry (return . go again) nextup |
65 | where | 121 | where |
122 | refresh :: Int -> IO Int | ||
123 | refresh = refreshBucket refreshSearch refreshBuckets | ||
124 | |||
66 | go again ( bktnum :-> refresh_time ) = do | 125 | go again ( bktnum :-> refresh_time ) = do |
67 | now <- getPOSIXTime | 126 | now <- getPOSIXTime |
68 | case fromEnum (refresh_time - now) of | 127 | case fromEnum (refresh_time - now) of |
69 | x | x <= 0 -> do -- Refresh time! | 128 | x | x <= 0 -> do -- Refresh time! |
70 | -- Move it to the back of the refresh queue. | 129 | -- Move it to the back of the refresh queue. |
71 | atomically $ modifyTVar' psq | 130 | atomically $ modifyTVar' refreshQueue |
72 | $ Int.insert bktnum (now + interval) | 131 | $ Int.insert bktnum (now + refreshInterval) |
73 | -- Now fork the refresh operation. | 132 | -- Now fork the refresh operation. |
74 | -- TODO: We should probably propogate the kill signal to this thread. | 133 | -- TODO: We should probably propogate the kill signal to this thread. |
75 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | 134 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) |
@@ -79,15 +138,37 @@ forkPollForRefresh interval psq refresh = do | |||
79 | seconds -> threadDelay ( seconds * 1000000 ) | 138 | seconds -> threadDelay ( seconds * 1000000 ) |
80 | again | 139 | again |
81 | 140 | ||
82 | refreshBucket :: forall nid tok ni addr. | 141 | |
83 | ( Show nid | 142 | -- | This is a helper to 'refreshBucket' which does some book keeping to decide |
84 | , Serialize nid | 143 | -- whether or not a bucket is sufficiently refreshed or not. It will return |
85 | , Ord nid | 144 | -- false when we can terminate a node search. |
86 | , Ord ni | 145 | checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. |
87 | , Ord addr | 146 | -> TVar (BucketList ni) -- ^ The current routing table. |
88 | , Hashable nid | 147 | -> TVar (Set.Set ni) -- ^ In-range nodes found so far. |
89 | , Hashable ni ) => | 148 | -> TVar Bool -- ^ The result will also be written here. |
90 | Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int | 149 | -> Int -- ^ The bucket number of interest. |
150 | -> ni -- ^ A newly found node. | ||
151 | -> STM Bool | ||
152 | checkBucketFull space var resultCounter fin n found_node = do | ||
153 | let fullcount = R.defaultBucketSize | ||
154 | saveit True = writeTVar fin True >> return True | ||
155 | saveit _ = return False | ||
156 | tbl <- readTVar var | ||
157 | let counts = R.shape tbl | ||
158 | nid = kademliaLocation space found_node | ||
159 | -- Update the result set with every found node that is in the | ||
160 | -- bucket of interest. | ||
161 | when (n == R.bucketNumber space nid tbl) | ||
162 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
163 | resultCount <- readTVar resultCounter | ||
164 | saveit $ case drop (n - 1) counts of | ||
165 | (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going | ||
166 | _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going | ||
167 | _ -> False -- okay, good enough, let's quit. | ||
168 | |||
169 | |||
170 | refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) => | ||
171 | Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int | ||
91 | refreshBucket sch var n = do | 172 | refreshBucket sch var n = do |
92 | tbl <- atomically (readTVar var) | 173 | tbl <- atomically (readTVar var) |
93 | let count = bktCount tbl | 174 | let count = bktCount tbl |
@@ -100,27 +181,13 @@ refreshBucket sch var n = do | |||
100 | (bucketRange n (n + 1 < count)) | 181 | (bucketRange n (n + 1 < count)) |
101 | fin <- atomically $ newTVar False | 182 | fin <- atomically $ newTVar False |
102 | resultCounter <- atomically $ newTVar Set.empty | 183 | resultCounter <- atomically $ newTVar Set.empty |
103 | let fullcount = R.defaultBucketSize | ||
104 | saveit True = writeTVar fin True >> return True | ||
105 | saveit _ = return False | ||
106 | checkBucketFull :: ni -> STM Bool | ||
107 | checkBucketFull found_node = do | ||
108 | tbl <- readTVar var | ||
109 | let counts = R.shape tbl | ||
110 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | ||
111 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
112 | resultCount <- readTVar resultCounter | ||
113 | saveit $ case drop (n - 1) counts of | ||
114 | (cnt:_) | cnt < fullcount -> True | ||
115 | _ | Set.size resultCount < fullcount -> True | ||
116 | _ -> False | ||
117 | 184 | ||
118 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | 185 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) |
119 | 186 | ||
120 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | 187 | -- Set 15 minute timeout in order to avoid overlapping refreshes. |
121 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | 188 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount |
122 | then const $ return True -- Never short-circuit the last bucket. | 189 | then const $ return True -- Never short-circuit the last bucket. |
123 | else checkBucketFull | 190 | else checkBucketFull (searchSpace sch) var resultCounter fin n |
124 | _ <- timeout (15*60*1000000) $ do | 191 | _ <- timeout (15*60*1000000) $ do |
125 | atomically $ searchIsFinished s >>= check | 192 | atomically $ searchIsFinished s >>= check |
126 | atomically $ searchCancel s | 193 | atomically $ searchCancel s |
@@ -131,20 +198,11 @@ refreshBucket sch var n = do | |||
131 | return $ if b then 1 else c | 198 | return $ if b then 1 else c |
132 | return rcount | 199 | return rcount |
133 | 200 | ||
134 | 201 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | |
135 | bootstrap :: | 202 | BucketRefresher nid ni -> t1 ni -> t ni -> IO () |
136 | ( Show nid | 203 | bootstrap BucketRefresher { refreshSearch = sch |
137 | , Serialize nid | 204 | , refreshBuckets = var |
138 | -- , FiniteBits nid | 205 | , refreshPing = ping } ns ns0 = do |
139 | , Hashable ni | ||
140 | , Hashable nid | ||
141 | , Ord ni | ||
142 | , Ord addr | ||
143 | , Ord nid | ||
144 | , Traversable t1 | ||
145 | , Traversable t | ||
146 | ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () | ||
147 | bootstrap sch var ping ns ns0 = do | ||
148 | gotPing <- atomically $ newTVar False | 206 | gotPing <- atomically $ newTVar False |
149 | 207 | ||
150 | -- First, ping the given nodes so that they are added to | 208 | -- First, ping the given nodes so that they are added to |
@@ -193,6 +251,9 @@ bootstrap sch var ping ns ns0 = do | |||
193 | cnt <- refreshBucket sch var num | 251 | cnt <- refreshBucket sch var num |
194 | again cnt | 252 | again cnt |
195 | 253 | ||
254 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | ||
255 | -- changes. This will typically be invoked from 'tblTransition'. | ||
256 | -- | ||
196 | -- XXX: This will be redundantly triggered twice upon every node replacement | 257 | -- XXX: This will be redundantly triggered twice upon every node replacement |
197 | -- because we do not currently distinguish between standalone | 258 | -- because we do not currently distinguish between standalone |
198 | -- insertion/deletion events and an insertion/deletion pair constituting | 259 | -- insertion/deletion events and an insertion/deletion pair constituting |
@@ -201,55 +262,21 @@ bootstrap sch var ping ns ns0 = do | |||
201 | -- It might also be better to pass the timestamp of the transition here and | 262 | -- It might also be better to pass the timestamp of the transition here and |
202 | -- keep the refresh queue in better sync with the routing table by updating it | 263 | -- keep the refresh queue in better sync with the routing table by updating it |
203 | -- within the STM monad. | 264 | -- within the STM monad. |
204 | touchBucket :: KademliaSpace nid ni | 265 | touchBucket :: BucketRefresher nid ni |
205 | -> POSIXTime | 266 | -> RoutingTransition ni -- ^ What happened to the bucket? |
206 | -> TVar (BucketList ni) | ||
207 | -> TVar (Int.PSQ POSIXTime) | ||
208 | -> RoutingTransition ni | ||
209 | -> STM (IO ()) | 267 | -> STM (IO ()) |
210 | touchBucket space interval bkts psq tr | 268 | touchBucket BucketRefresher{ refreshSearch |
269 | , refreshInterval | ||
270 | , refreshBuckets | ||
271 | , refreshQueue | ||
272 | } | ||
273 | tr | ||
211 | | (transitionedTo tr == Applicant) | 274 | | (transitionedTo tr == Applicant) |
212 | = return $ return () | 275 | = return $ return () |
213 | | otherwise = return $ do | 276 | | otherwise = return $ do |
214 | now <- getPOSIXTime | 277 | now <- getPOSIXTime |
215 | atomically $ do | 278 | atomically $ do |
216 | let nid = kademliaLocation space (transitioningNode tr) | 279 | let space = searchSpace refreshSearch |
217 | num <- R.bucketNumber space nid <$> readTVar bkts | 280 | nid = kademliaLocation space (transitioningNode tr) |
218 | modifyTVar' psq $ Int.insert num (now + interval) | 281 | num <- R.bucketNumber space nid <$> readTVar refreshBuckets |
219 | 282 | modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) | |
220 | -- TODO: Bootstrap/Refresh | ||
221 | -- | ||
222 | -- From BEP 05: | ||
223 | -- | ||
224 | -- Each bucket should maintain a "last changed" property to indicate how | ||
225 | -- "fresh" the contents are. | ||
226 | -- | ||
227 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
228 | -- a priority search queue. | ||
229 | -- | ||
230 | -- When... | ||
231 | -- | ||
232 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
233 | -- >>> bucketEvents = | ||
234 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
235 | -- >>> | ||
236 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
237 | -- >>> | ||
238 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
239 | -- >>> , Applicant :--> Accepted -- with another node, | ||
240 | -- >>> ] | ||
241 | -- | ||
242 | -- the bucket's last changed property should be updated. Buckets | ||
243 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
244 | -- by picking a random ID in the range of the bucket and performing a | ||
245 | -- find_nodes search on it. | ||
246 | -- | ||
247 | -- The only other possible BucketTouchEvents are as follows: | ||
248 | -- | ||
249 | -- >>> not_handled = | ||
250 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
251 | -- >>> -- (Applicant :--> Stranger) | ||
252 | -- >>> -- (Applicant :--> Accepted) | ||
253 | -- >>> , Accepted :--> Applicant -- Never happens | ||
254 | -- >>> ] | ||
255 | -- | ||