diff options
Diffstat (limited to 'Kademlia.hs')
-rw-r--r-- | Kademlia.hs | 78 |
1 files changed, 47 insertions, 31 deletions
diff --git a/Kademlia.hs b/Kademlia.hs index 4920467a..57afb7fc 100644 --- a/Kademlia.hs +++ b/Kademlia.hs | |||
@@ -18,7 +18,6 @@ import Control.Concurrent.Lifted.Instrument | |||
18 | import Control.Concurrent.Lifted | 18 | import Control.Concurrent.Lifted |
19 | import GHC.Conc (labelThread) | 19 | import GHC.Conc (labelThread) |
20 | #endif | 20 | #endif |
21 | import Control.Concurrent.Async.Pool | ||
22 | import Control.Concurrent.STM | 21 | import Control.Concurrent.STM |
23 | import Control.Monad | 22 | import Control.Monad |
24 | import Data.Bits | 23 | import Data.Bits |
@@ -36,6 +35,7 @@ import System.Timeout | |||
36 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 35 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
37 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | 36 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) |
38 | import System.IO | 37 | import System.IO |
38 | import Tasks | ||
39 | 39 | ||
40 | -- | The status of a given node with respect to a given routint table. | 40 | -- | The status of a given node with respect to a given routint table. |
41 | data RoutingStatus | 41 | data RoutingStatus |
@@ -241,7 +241,7 @@ touchBucket space interval bkts psq tr | |||
241 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | 241 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's |
242 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | 242 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the |
243 | -- TVar. | 243 | -- TVar. |
244 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO ()) -> IO ThreadId | 244 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId |
245 | forkPollForRefresh interval psq refresh = do | 245 | forkPollForRefresh interval psq refresh = do |
246 | fork $ do | 246 | fork $ do |
247 | myThreadId >>= flip labelThread "pollForRefresh" | 247 | myThreadId >>= flip labelThread "pollForRefresh" |
@@ -260,14 +260,15 @@ forkPollForRefresh interval psq refresh = do | |||
260 | -- Now fork the refresh operation. | 260 | -- Now fork the refresh operation. |
261 | -- TODO: We should probably propogate the kill signal to this thread. | 261 | -- TODO: We should probably propogate the kill signal to this thread. |
262 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | 262 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) |
263 | refresh bktnum | 263 | _ <- refresh bktnum |
264 | return () | ||
264 | return () | 265 | return () |
265 | seconds -> threadDelay ( seconds * 1000000 ) | 266 | seconds -> threadDelay ( seconds * 1000000 ) |
266 | again | 267 | again |
267 | 268 | ||
268 | refreshBucket :: forall nid ni addr. | 269 | refreshBucket :: forall nid ni addr. |
269 | ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => | 270 | ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => |
270 | Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO () | 271 | Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int |
271 | refreshBucket sch var nid n = do | 272 | refreshBucket sch var nid n = do |
272 | tbl <- atomically (readTVar var) | 273 | tbl <- atomically (readTVar var) |
273 | let count = bktCount tbl | 274 | let count = bktCount tbl |
@@ -275,19 +276,22 @@ refreshBucket sch var nid n = do | |||
275 | then return nid -- Yes? Search our own id. | 276 | then return nid -- Yes? Search our own id. |
276 | else genBucketSample nid -- No? Generate a random id. | 277 | else genBucketSample nid -- No? Generate a random id. |
277 | (bucketRange n (n + 1 < count)) | 278 | (bucketRange n (n + 1 < count)) |
279 | fin <- atomically $ newTVar False | ||
278 | resultCounter <- atomically $ newTVar Set.empty | 280 | resultCounter <- atomically $ newTVar Set.empty |
279 | let fullcount = R.defaultBucketSize | 281 | let fullcount = R.defaultBucketSize |
280 | let checkBucketFull :: ni -> STM Bool | 282 | saveit True = writeTVar fin True >> return True |
283 | saveit _ = return False | ||
284 | checkBucketFull :: ni -> STM Bool | ||
281 | checkBucketFull found_node = do | 285 | checkBucketFull found_node = do |
282 | tbl <- readTVar var | 286 | tbl <- readTVar var |
283 | let counts = R.shape tbl | 287 | let counts = R.shape tbl |
284 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | 288 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) |
285 | $ modifyTVar resultCounter (Set.insert found_node) | 289 | $ modifyTVar resultCounter (Set.insert found_node) |
286 | resultCount <- readTVar resultCounter | 290 | resultCount <- readTVar resultCounter |
287 | case drop (n - 1) counts of | 291 | saveit $ case drop (n - 1) counts of |
288 | (cnt:_) | cnt < fullcount -> return True | 292 | (cnt:_) | cnt < fullcount -> True |
289 | _ | Set.size resultCount < fullcount -> return True | 293 | _ | Set.size resultCount < fullcount -> True |
290 | _ -> return False | 294 | _ -> False |
291 | 295 | ||
292 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | 296 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) |
293 | 297 | ||
@@ -299,7 +303,11 @@ refreshBucket sch var nid n = do | |||
299 | atomically $ searchIsFinished s >>= check | 303 | atomically $ searchIsFinished s >>= check |
300 | atomically $ searchCancel s | 304 | atomically $ searchCancel s |
301 | hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) | 305 | hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) |
302 | return () | 306 | rcount <- atomically $ do |
307 | c <- Set.size <$> readTVar resultCounter | ||
308 | b <- readTVar fin | ||
309 | return $ if b then 1 else c | ||
310 | return rcount | ||
303 | 311 | ||
304 | bootstrap :: | 312 | bootstrap :: |
305 | ( Show nid | 313 | ( Show nid |
@@ -312,33 +320,41 @@ bootstrap :: | |||
312 | , Ord nid | 320 | , Ord nid |
313 | , Traversable t1 | 321 | , Traversable t1 |
314 | , Traversable t | 322 | , Traversable t |
315 | ) => Search nid addr ni ni -> TVar (BucketList ni) -> (a -> IO Bool) -> t a -> t1 a -> IO () | 323 | ) => Search nid addr ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () |
316 | bootstrap sch var ping ns ns0 = do | 324 | bootstrap sch var ping ns ns0 = do |
325 | gotPing <- atomically $ newTVar False | ||
326 | |||
317 | -- First, ping the given nodes so that they are added to | 327 | -- First, ping the given nodes so that they are added to |
318 | -- our routing table. | 328 | -- our routing table. |
319 | withTaskGroup 50 $ \g -> do | 329 | withTaskGroup "bootstrap.resume" 20 $ \g -> do |
320 | got_response <- or <$> mapConcurrently g ping ns | 330 | forM_ ns $ \n -> do |
321 | -- We resort to the hardcoded fallback nodes only when we got no | 331 | let lbl = show $ kademliaLocation (searchSpace sch) n |
322 | -- responses. This is to lesson the burden on well-known boostrap | 332 | forkTask g lbl $ do |
323 | -- nodes. | 333 | b <- ping n |
324 | when (not got_response) $ do | 334 | when b $ atomically $ writeTVar gotPing True |
325 | _ <- mapConcurrently g ping ns0 | 335 | |
326 | return () | 336 | -- We resort to the hardcoded fallback nodes only when we got no |
327 | 337 | -- responses. This is to lesson the burden on well-known boostrap | |
328 | mvar <- newMVar () | 338 | -- nodes. |
339 | fallback <- atomically (readTVar gotPing) >>= return . when . not | ||
340 | fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do | ||
341 | forM_ ns0 $ \n -> do | ||
342 | forkTask g (show $ kademliaLocation (searchSpace sch) n) | ||
343 | (void $ ping n) | ||
344 | hPutStrLn stderr "Finished bootstrap pings." | ||
345 | |||
329 | -- Now run searches until all the buckets are full. On a small network, | 346 | -- Now run searches until all the buckets are full. On a small network, |
330 | -- this may never quit. | 347 | -- this may never quit. |
331 | -- | 348 | -- |
332 | -- TODO: For small networks, we should give up on filling a nearby bucket | 349 | -- TODO: For small networks, we should give up on filling a nearby bucket |
333 | -- at some point and move on to one farther away. | 350 | -- at some point and move on to one farther away. |
334 | fix $ \again -> do | 351 | flip fix 1 $ \again cnt -> do |
335 | takeMVar mvar | 352 | when (cnt==0) $ do |
336 | tbl <- atomically $ readTVar var | ||
337 | fork $ do | ||
338 | -- Force a delay in case the search returns too quickly | 353 | -- Force a delay in case the search returns too quickly |
354 | hPutStrLn stderr $ "Zero results, forcing 1 minute delay" | ||
339 | threadDelay (60 * 1000000) | 355 | threadDelay (60 * 1000000) |
340 | putMVar mvar () | 356 | tbl <- atomically $ readTVar var |
341 | let shp = reverse $ zip (R.shape tbl) [0 .. ] | 357 | let shp = zip (R.shape tbl) [0 .. ] |
342 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp | 358 | unfull = filter ( (< R.defaultBucketSize) . fst ) shp |
343 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of | 359 | case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of |
344 | [] -> do | 360 | [] -> do |
@@ -346,15 +362,15 @@ bootstrap sch var ping ns ns0 = do | |||
346 | -- Not enough buckets, keep trying. | 362 | -- Not enough buckets, keep trying. |
347 | hPutStrLn stderr | 363 | hPutStrLn stderr |
348 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) | 364 | $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) |
349 | refreshBucket sch var | 365 | cnt <- refreshBucket sch var |
350 | (kademliaLocation (searchSpace sch) (thisNode tbl)) | 366 | (kademliaLocation (searchSpace sch) (thisNode tbl)) |
351 | (R.defaultBucketCount - 1) | 367 | (R.defaultBucketCount - 1) |
352 | again | 368 | again cnt |
353 | (size,num):_ -> do | 369 | (size,num):_ -> do |
354 | -- If we don't yet have enough buckets, we need to search our own id. | 370 | -- If we don't yet have enough buckets, we need to search our own id. |
355 | -- We indicate that by setting the bucket number to the target. | 371 | -- We indicate that by setting the bucket number to the target. |
356 | let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 | 372 | let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 |
357 | | otherwise = num | 373 | | otherwise = num |
358 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) | 374 | hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) |
359 | refreshBucket sch var (kademliaLocation (searchSpace sch) (thisNode tbl)) num' | 375 | cnt <- refreshBucket sch var (kademliaLocation (searchSpace sch) (thisNode tbl)) num' |
360 | again | 376 | again cnt |