summaryrefslogtreecommitdiff
path: root/Kademlia.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Kademlia.hs')
-rw-r--r--Kademlia.hs78
1 files changed, 47 insertions, 31 deletions
diff --git a/Kademlia.hs b/Kademlia.hs
index 4920467a..57afb7fc 100644
--- a/Kademlia.hs
+++ b/Kademlia.hs
@@ -18,7 +18,6 @@ import Control.Concurrent.Lifted.Instrument
18import Control.Concurrent.Lifted 18import Control.Concurrent.Lifted
19import GHC.Conc (labelThread) 19import GHC.Conc (labelThread)
20#endif 20#endif
21import Control.Concurrent.Async.Pool
22import Control.Concurrent.STM 21import Control.Concurrent.STM
23import Control.Monad 22import Control.Monad
24import Data.Bits 23import Data.Bits
@@ -36,6 +35,7 @@ import System.Timeout
36import Text.PrettyPrint as PP hiding (($$), (<>)) 35import Text.PrettyPrint as PP hiding (($$), (<>))
37import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) 36import Text.PrettyPrint.HughesPJClass hiding (($$), (<>))
38import System.IO 37import System.IO
38import Tasks
39 39
40-- | The status of a given node with respect to a given routint table. 40-- | The status of a given node with respect to a given routint table.
41data RoutingStatus 41data RoutingStatus
@@ -241,7 +241,7 @@ touchBucket space interval bkts psq tr
241-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's 241-- TO "touch" a bucket and prevent it from being refreshed, reschedule it's
242-- refresh time to some time into the future by modifying the 'Int.PSQ' in the 242-- refresh time to some time into the future by modifying the 'Int.PSQ' in the
243-- TVar. 243-- TVar.
244forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO ()) -> IO ThreadId 244forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId
245forkPollForRefresh interval psq refresh = do 245forkPollForRefresh interval psq refresh = do
246 fork $ do 246 fork $ do
247 myThreadId >>= flip labelThread "pollForRefresh" 247 myThreadId >>= flip labelThread "pollForRefresh"
@@ -260,14 +260,15 @@ forkPollForRefresh interval psq refresh = do
260 -- Now fork the refresh operation. 260 -- Now fork the refresh operation.
261 -- TODO: We should probably propogate the kill signal to this thread. 261 -- TODO: We should probably propogate the kill signal to this thread.
262 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) 262 fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum)
263 refresh bktnum 263 _ <- refresh bktnum
264 return ()
264 return () 265 return ()
265 seconds -> threadDelay ( seconds * 1000000 ) 266 seconds -> threadDelay ( seconds * 1000000 )
266 again 267 again
267 268
268refreshBucket :: forall nid ni addr. 269refreshBucket :: forall nid ni addr.
269 ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) => 270 ( Show nid, FiniteBits nid, Serialize nid, Ord nid, Ord ni, Hashable nid, Hashable ni, Ord addr ) =>
270 Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO () 271 Search nid addr ni ni -> TVar (BucketList ni) -> nid -> Int -> IO Int
271refreshBucket sch var nid n = do 272refreshBucket sch var nid n = do
272 tbl <- atomically (readTVar var) 273 tbl <- atomically (readTVar var)
273 let count = bktCount tbl 274 let count = bktCount tbl
@@ -275,19 +276,22 @@ refreshBucket sch var nid n = do
275 then return nid -- Yes? Search our own id. 276 then return nid -- Yes? Search our own id.
276 else genBucketSample nid -- No? Generate a random id. 277 else genBucketSample nid -- No? Generate a random id.
277 (bucketRange n (n + 1 < count)) 278 (bucketRange n (n + 1 < count))
279 fin <- atomically $ newTVar False
278 resultCounter <- atomically $ newTVar Set.empty 280 resultCounter <- atomically $ newTVar Set.empty
279 let fullcount = R.defaultBucketSize 281 let fullcount = R.defaultBucketSize
280 let checkBucketFull :: ni -> STM Bool 282 saveit True = writeTVar fin True >> return True
283 saveit _ = return False
284 checkBucketFull :: ni -> STM Bool
281 checkBucketFull found_node = do 285 checkBucketFull found_node = do
282 tbl <- readTVar var 286 tbl <- readTVar var
283 let counts = R.shape tbl 287 let counts = R.shape tbl
284 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) 288 when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl)
285 $ modifyTVar resultCounter (Set.insert found_node) 289 $ modifyTVar resultCounter (Set.insert found_node)
286 resultCount <- readTVar resultCounter 290 resultCount <- readTVar resultCounter
287 case drop (n - 1) counts of 291 saveit $ case drop (n - 1) counts of
288 (cnt:_) | cnt < fullcount -> return True 292 (cnt:_) | cnt < fullcount -> True
289 _ | Set.size resultCount < fullcount -> return True 293 _ | Set.size resultCount < fullcount -> True
290 _ -> return False 294 _ -> False
291 295
292 hPutStrLn stderr $ "Start refresh " ++ show (n,sample) 296 hPutStrLn stderr $ "Start refresh " ++ show (n,sample)
293 297
@@ -299,7 +303,11 @@ refreshBucket sch var nid n = do
299 atomically $ searchIsFinished s >>= check 303 atomically $ searchIsFinished s >>= check
300 atomically $ searchCancel s 304 atomically $ searchCancel s
301 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) 305 hPutStrLn stderr $ "Finish refresh " ++ show (n,sample)
302 return () 306 rcount <- atomically $ do
307 c <- Set.size <$> readTVar resultCounter
308 b <- readTVar fin
309 return $ if b then 1 else c
310 return rcount
303 311
304bootstrap :: 312bootstrap ::
305 ( Show nid 313 ( Show nid
@@ -312,33 +320,41 @@ bootstrap ::
312 , Ord nid 320 , Ord nid
313 , Traversable t1 321 , Traversable t1
314 , Traversable t 322 , Traversable t
315 ) => Search nid addr ni ni -> TVar (BucketList ni) -> (a -> IO Bool) -> t a -> t1 a -> IO () 323 ) => Search nid addr ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO ()
316bootstrap sch var ping ns ns0 = do 324bootstrap sch var ping ns ns0 = do
325 gotPing <- atomically $ newTVar False
326
317 -- First, ping the given nodes so that they are added to 327 -- First, ping the given nodes so that they are added to
318 -- our routing table. 328 -- our routing table.
319 withTaskGroup 50 $ \g -> do 329 withTaskGroup "bootstrap.resume" 20 $ \g -> do
320 got_response <- or <$> mapConcurrently g ping ns 330 forM_ ns $ \n -> do
321 -- We resort to the hardcoded fallback nodes only when we got no 331 let lbl = show $ kademliaLocation (searchSpace sch) n
322 -- responses. This is to lesson the burden on well-known boostrap 332 forkTask g lbl $ do
323 -- nodes. 333 b <- ping n
324 when (not got_response) $ do 334 when b $ atomically $ writeTVar gotPing True
325 _ <- mapConcurrently g ping ns0 335
326 return () 336 -- We resort to the hardcoded fallback nodes only when we got no
327 337 -- responses. This is to lesson the burden on well-known boostrap
328 mvar <- newMVar () 338 -- nodes.
339 fallback <- atomically (readTVar gotPing) >>= return . when . not
340 fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do
341 forM_ ns0 $ \n -> do
342 forkTask g (show $ kademliaLocation (searchSpace sch) n)
343 (void $ ping n)
344 hPutStrLn stderr "Finished bootstrap pings."
345
329 -- Now run searches until all the buckets are full. On a small network, 346 -- Now run searches until all the buckets are full. On a small network,
330 -- this may never quit. 347 -- this may never quit.
331 -- 348 --
332 -- TODO: For small networks, we should give up on filling a nearby bucket 349 -- TODO: For small networks, we should give up on filling a nearby bucket
333 -- at some point and move on to one farther away. 350 -- at some point and move on to one farther away.
334 fix $ \again -> do 351 flip fix 1 $ \again cnt -> do
335 takeMVar mvar 352 when (cnt==0) $ do
336 tbl <- atomically $ readTVar var
337 fork $ do
338 -- Force a delay in case the search returns too quickly 353 -- Force a delay in case the search returns too quickly
354 hPutStrLn stderr $ "Zero results, forcing 1 minute delay"
339 threadDelay (60 * 1000000) 355 threadDelay (60 * 1000000)
340 putMVar mvar () 356 tbl <- atomically $ readTVar var
341 let shp = reverse $ zip (R.shape tbl) [0 .. ] 357 let shp = zip (R.shape tbl) [0 .. ]
342 unfull = filter ( (< R.defaultBucketSize) . fst ) shp 358 unfull = filter ( (< R.defaultBucketSize) . fst ) shp
343 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of 359 case dropWhile ((> R.defaultBucketCount - 1) . snd) unfull of
344 [] -> do 360 [] -> do
@@ -346,15 +362,15 @@ bootstrap sch var ping ns ns0 = do
346 -- Not enough buckets, keep trying. 362 -- Not enough buckets, keep trying.
347 hPutStrLn stderr 363 hPutStrLn stderr
348 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1) 364 $ "Not enough buckets, refresh " ++ show (R.defaultBucketCount - 1)
349 refreshBucket sch var 365 cnt <- refreshBucket sch var
350 (kademliaLocation (searchSpace sch) (thisNode tbl)) 366 (kademliaLocation (searchSpace sch) (thisNode tbl))
351 (R.defaultBucketCount - 1) 367 (R.defaultBucketCount - 1)
352 again 368 again cnt
353 (size,num):_ -> do 369 (size,num):_ -> do
354 -- If we don't yet have enough buckets, we need to search our own id. 370 -- If we don't yet have enough buckets, we need to search our own id.
355 -- We indicate that by setting the bucket number to the target. 371 -- We indicate that by setting the bucket number to the target.
356 let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1 372 let num' | bktCount tbl < R.defaultBucketCount = R.defaultBucketCount - 1
357 | otherwise = num 373 | otherwise = num
358 hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp) 374 hPutStrLn stderr $ "Bucket too small, refresh "++ show (num',(size,num),shp)
359 refreshBucket sch var (kademliaLocation (searchSpace sch) (thisNode tbl)) num' 375 cnt <- refreshBucket sch var (kademliaLocation (searchSpace sch) (thisNode tbl)) num'
360 again 376 again cnt