diff options
Diffstat (limited to 'kad/src/Network/Kademlia/Bootstrap.hs')
-rw-r--r-- | kad/src/Network/Kademlia/Bootstrap.hs | 44 |
1 files changed, 39 insertions, 5 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index 08ba3318..c07b3c6c 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs | |||
@@ -16,6 +16,8 @@ | |||
16 | module Network.Kademlia.Bootstrap where | 16 | module Network.Kademlia.Bootstrap where |
17 | 17 | ||
18 | import Data.Function | 18 | import Data.Function |
19 | import qualified Data.IntMap.Strict as IntMap | ||
20 | ;import Data.IntMap.Strict (IntMap) | ||
19 | import Data.Maybe | 21 | import Data.Maybe |
20 | import qualified Data.Set as Set | 22 | import qualified Data.Set as Set |
21 | import Data.Time.Clock.POSIX (getPOSIXTime) | 23 | import Data.Time.Clock.POSIX (getPOSIXTime) |
@@ -71,10 +73,12 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
71 | , -- | Timestamp of last bucket event. | 73 | , -- | Timestamp of last bucket event. |
72 | refreshLastTouch :: TVar POSIXTime | 74 | refreshLastTouch :: TVar POSIXTime |
73 | , -- | This variable indicates whether or not we are in bootstrapping mode. | 75 | , -- | This variable indicates whether or not we are in bootstrapping mode. |
74 | bootstrapMode :: TVar Bool | 76 | bootstrapMode :: TVar Bool |
75 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | 77 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on |
76 | -- every finished refresh. | 78 | -- every finished refresh. |
77 | bootstrapCountdown :: TVar (Maybe Int) | 79 | bootstrapCountdown :: TVar (Maybe Int) |
80 | -- | Internal state of background searches. Exposed for debugging purposes. | ||
81 | , refreshState :: TVar (IntMap [BucketSearch nid ni]) | ||
78 | } | 82 | } |
79 | 83 | ||
80 | newBucketRefresher :: ( Ord addr, Hashable addr | 84 | newBucketRefresher :: ( Ord addr, Hashable addr |
@@ -91,6 +95,7 @@ newBucketRefresher bkts sch ping = do | |||
91 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | 95 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... |
92 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | 96 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. |
93 | bootstrapCnt <- newTVar Nothing | 97 | bootstrapCnt <- newTVar Nothing |
98 | st <- newTVar IntMap.empty | ||
94 | return BucketRefresher | 99 | return BucketRefresher |
95 | { refreshInterval = 15 * 60 | 100 | { refreshInterval = 15 * 60 |
96 | , refreshQueue = sched | 101 | , refreshQueue = sched |
@@ -100,6 +105,7 @@ newBucketRefresher bkts sch ping = do | |||
100 | , refreshLastTouch = lasttouch | 105 | , refreshLastTouch = lasttouch |
101 | , bootstrapMode = bootstrapVar | 106 | , bootstrapMode = bootstrapVar |
102 | , bootstrapCountdown = bootstrapCnt | 107 | , bootstrapCountdown = bootstrapCnt |
108 | , refreshState = st | ||
103 | } | 109 | } |
104 | 110 | ||
105 | -- | This was added to avoid the compile error "Record update for | 111 | -- | This was added to avoid the compile error "Record update for |
@@ -118,6 +124,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | |||
118 | , refreshLastTouch = refreshLastTouch | 124 | , refreshLastTouch = refreshLastTouch |
119 | , bootstrapMode = bootstrapMode | 125 | , bootstrapMode = bootstrapMode |
120 | , bootstrapCountdown = bootstrapCountdown | 126 | , bootstrapCountdown = bootstrapCountdown |
127 | , refreshState = refreshState | ||
121 | } | 128 | } |
122 | 129 | ||
123 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | 130 | -- | Fork a refresh loop. Kill the returned thread to terminate it. |
@@ -228,10 +235,29 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown | |||
228 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | 235 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." |
229 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | 236 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) |
230 | 237 | ||
238 | data BucketSearch nid ni = forall addr tok. BucketSearch | ||
239 | { bucketSample :: nid | ||
240 | , bucketResults :: TVar (Set.Set ni) | ||
241 | , bucketFinFlag :: TVar Bool | ||
242 | , bucketState :: SearchState nid addr tok ni ni | ||
243 | , bucketThread :: ThreadId | ||
244 | } | ||
245 | |||
246 | removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
247 | removeBucketState bst Nothing = Nothing | ||
248 | removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of | ||
249 | [] -> Nothing | ||
250 | ys -> Just ys | ||
251 | |||
252 | insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
253 | insertBucketState bst Nothing = Just [bst] | ||
254 | insertBucketState bst (Just xs) = Just (bst : xs) | ||
255 | |||
231 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | 256 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => |
232 | BucketRefresher nid ni -> Int -> IO Int | 257 | BucketRefresher nid ni -> Int -> IO Int |
233 | refreshBucket r@BucketRefresher{ refreshSearch = sch | 258 | refreshBucket r@BucketRefresher{ refreshSearch = sch |
234 | , refreshBuckets = var } | 259 | , refreshBuckets = var |
260 | , refreshState = rstate } | ||
235 | n = do | 261 | n = do |
236 | tbl <- atomically (readTVar var) | 262 | tbl <- atomically (readTVar var) |
237 | let count = bktCount tbl | 263 | let count = bktCount tbl |
@@ -248,13 +274,21 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch | |||
248 | dput XRefresh $ "Start refresh " ++ show (n,sample) | 274 | dput XRefresh $ "Start refresh " ++ show (n,sample) |
249 | 275 | ||
250 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | 276 | -- Set 15 minute timeout in order to avoid overlapping refreshes. |
251 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | 277 | (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount |
252 | then const $ return True -- Never short-circuit the last bucket. | 278 | then const $ return True -- Never short-circuit the last bucket. |
253 | else checkBucketFull (searchSpace sch) var resultCounter fin n | 279 | else checkBucketFull (searchSpace sch) var resultCounter fin n |
280 | let bstate = BucketSearch sample resultCounter fin s thread | ||
281 | atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n | ||
254 | _ <- timeout (15*60*1000000) $ do | 282 | _ <- timeout (15*60*1000000) $ do |
255 | atomically $ searchIsFinished s >>= check | 283 | atomically $ searchIsFinished s >>= check |
256 | atomically $ searchCancel s | 284 | atomically $ searchCancel s |
257 | dput XDHT $ "Finish refresh " ++ show (n,sample) | 285 | dput XDHT $ "Finish refresh " ++ show (n,sample) |
286 | bg <- forkIO $ do | ||
287 | atomically $ do | ||
288 | searchIsFinished s >>= check | ||
289 | modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n | ||
290 | |||
291 | labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample) | ||
258 | now <- getPOSIXTime | 292 | now <- getPOSIXTime |
259 | join $ atomically $ onFinishedRefresh r n now | 293 | join $ atomically $ onFinishedRefresh r n now |
260 | rcount <- atomically $ do | 294 | rcount <- atomically $ do |