{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Kademlia.Bootstrap where import Data.Function import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Bits import Data.Hashable import Data.IP import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) import Data.Ord import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import System.IO import DPut import qualified Data.Wrapper.PSQInt as Int ;import Data.Wrapper.PSQInt (pattern (:->)) import Network.Address (bucketRange,genBucketSample) import Network.Kademlia.Search import Control.Concurrent.Tasks import Network.Kademlia type SensibleNodeId nid ni = ( Show nid , Ord nid , Ord ni , Hashable nid , Hashable ni ) data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher { -- | A staleness threshold (if a bucket goes this long without being -- touched, a refresh will be triggered). refreshInterval :: POSIXTime -- | A TVar with the time-to-refresh schedule for each bucket. -- -- To "touch" a bucket and prevent it from being refreshed, reschedule -- its refresh time to some time into the future by modifying its -- priority in this priority search queue. , refreshQueue :: TVar (Int.PSQ POSIXTime) -- | This is the kademlia node search specification. , refreshSearch :: Search nid addr tok ni ni -- | The current kademlia routing table buckets. , refreshBuckets :: TVar (R.BucketList ni) -- | Action to ping a node. This is used only during initial bootstrap -- to get some nodes in our table. A 'True' result is interpreted as a a -- pong, where 'False' is a non-response. , refreshPing :: ni -> IO Bool , -- | Timestamp of last bucket event. refreshLastTouch :: TVar POSIXTime , -- | This variable indicates whether or not we are in bootstrapping mode. bootstrapMode :: TVar Bool , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on -- every finished refresh. bootstrapCountdown :: TVar (Maybe Int) } newBucketRefresher :: ( Ord addr, Hashable addr , SensibleNodeId nid ni ) => ni -> Search nid addr tok ni ni -> (ni -> IO Bool) -> STM (BucketRefresher nid ni) newBucketRefresher template_ni sch ping = do let spc = searchSpace sch nodeId = kademliaLocation spc bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount sched <- newTVar Int.empty lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... bootstrapVar <- newTVar True -- Start in bootstrapping mode. bootstrapCnt <- newTVar Nothing return BucketRefresher { refreshInterval = 15 * 60 , refreshQueue = sched , refreshSearch = sch , refreshBuckets = bkts , refreshPing = ping , refreshLastTouch = lasttouch , bootstrapMode = bootstrapVar , bootstrapCountdown = bootstrapCnt } -- | This was added to avoid the compile error "Record update for -- insufficiently polymorphic field" when trying to update the existentially -- quantified field 'refreshSearch'. updateRefresherIO :: Ord addr => Search nid addr tok ni ni -> (ni -> IO Bool) -> BucketRefresher nid ni -> BucketRefresher nid ni updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher { refreshSearch = sch , refreshPing = ping , refreshInterval = refreshInterval , refreshBuckets = refreshBuckets , refreshQueue = refreshQueue , refreshLastTouch = refreshLastTouch , bootstrapMode = bootstrapMode , bootstrapCountdown = bootstrapCountdown } -- | Fork a refresh loop. Kill the returned thread to terminate it. forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId forkPollForRefresh r@BucketRefresher{ refreshInterval , refreshQueue , refreshBuckets , refreshSearch } = fork $ do myThreadId >>= flip labelThread "pollForRefresh" fix $ \again -> do join $ atomically $ do nextup <- Int.findMin <$> readTVar refreshQueue maybe retry (return . go again) nextup where refresh :: Int -> IO Int refresh n = do -- hPutStrLn stderr $ "Refresh time! "++ show n refreshBucket r n go again ( bktnum :-> refresh_time ) = do now <- getPOSIXTime case fromEnum (refresh_time - now) of x | x <= 0 -> do -- Refresh time! -- Move it to the back of the refresh queue. atomically $ do interval <- effectiveRefreshInterval r bktnum modifyTVar' refreshQueue $ Int.insert bktnum (now + interval) -- Now fork the refresh operation. -- TODO: We should probably propogate the kill signal to this thread. fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) _ <- refresh bktnum return () return () picoseconds -> do -- hPutStrLn stderr $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum threadDelay ( picoseconds `div` 10^6 ) again -- | This is a helper to 'refreshBucket' which does some book keeping to decide -- whether or not a bucket is sufficiently refreshed or not. It will return -- false when we can terminate a node search. checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. -> TVar (BucketList ni) -- ^ The current routing table. -> TVar (Set.Set ni) -- ^ In-range nodes found so far. -> TVar Bool -- ^ The result will also be written here. -> Int -- ^ The bucket number of interest. -> ni -- ^ A newly found node. -> STM Bool checkBucketFull space var resultCounter fin n found_node = do let fullcount = R.defaultBucketSize saveit True = writeTVar fin True >> return True saveit _ = return False tbl <- readTVar var let counts = R.shape tbl nid = kademliaLocation space found_node -- Update the result set with every found node that is in the -- bucket of interest. when (n == R.bucketNumber space nid tbl) $ modifyTVar' resultCounter (Set.insert found_node) resultCount <- readTVar resultCounter saveit $ case drop (n - 1) counts of (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going _ -> False -- okay, good enough, let's quit. -- | Called from 'refreshBucket' with the current time when a refresh of the -- supplied bucket number finishes. onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) onFinishedRefresh BucketRefresher { bootstrapCountdown , bootstrapMode , refreshQueue , refreshBuckets } num now = do bootstrapping <- readTVar bootstrapMode if not bootstrapping then return $ return () -- hPutStrLn stderr $ "Finished non-boostrapping refresh: "++show num else do tbl <- readTVar refreshBuckets action <- if num /= R.bktCount tbl - 1 then do modifyTVar' bootstrapCountdown (fmap pred) return $ return () -- hPutStrLn stderr $ "BOOTSTRAP decrement" else do -- The last bucket finished. cnt <- readTVar bootstrapCountdown case cnt of Nothing -> do let fullsize = R.defaultBucketSize notfull (n,len) | n==num = False | len>=fullsize = False | otherwise = True unfull = case filter notfull $ zip [0..] (R.shape tbl) of [] -> [(0,0)] -- Schedule at least 1 more refresh. xs -> xs forM_ unfull $ \(n,_) -> do -- Schedule immediate refresh for unfull buckets (other than this one). modifyTVar' refreshQueue $ Int.insert n (now - 1) writeTVar bootstrapCountdown $! Just $! length unfull return $ return () -- hPutStrLn stderr $ "BOOTSTRAP scheduling: "++show unfull Just n -> do writeTVar bootstrapCountdown $! Just $! pred n return $ return () -- hPutStrLn stderr "BOOTSTRAP decrement (last bucket)" cnt <- readTVar bootstrapCountdown if (cnt == Just 0) then do -- Boostrap finished! writeTVar bootstrapMode False writeTVar bootstrapCountdown Nothing return $ do action ; hPutStrLn stderr $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." else return $ do action ; hPutStrLn stderr $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => BucketRefresher nid ni -> Int -> IO Int refreshBucket r@BucketRefresher{ refreshSearch = sch , refreshBuckets = var } n = do tbl <- atomically (readTVar var) let count = bktCount tbl nid = kademliaLocation (searchSpace sch) (thisNode tbl) sample <- if n+1 >= count -- Is this the last bucket? then return nid -- Yes? Search our own id. else kademliaSample (searchSpace sch) -- No? Generate a random id. getEntropy nid (bucketRange n (n + 1 < count)) fin <- atomically $ newTVar False resultCounter <- atomically $ newTVar Set.empty hPutStrLn stderr $ "Start refresh " ++ show (n,sample) -- Set 15 minute timeout in order to avoid overlapping refreshes. s <- search sch tbl sample $ if n+1 == R.defaultBucketCount then const $ return True -- Never short-circuit the last bucket. else checkBucketFull (searchSpace sch) var resultCounter fin n _ <- timeout (15*60*1000000) $ do atomically $ searchIsFinished s >>= check atomically $ searchCancel s dput XDHT $ "Finish refresh " ++ show (n,sample) now <- getPOSIXTime join $ atomically $ onFinishedRefresh r n now rcount <- atomically $ do c <- Set.size <$> readTVar resultCounter b <- readTVar fin return $ if b then 1 else c return rcount refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () refreshLastBucket r@BucketRefresher { refreshBuckets , refreshQueue } = do now <- getPOSIXTime atomically $ do cnt <- bktCount <$> readTVar refreshBuckets -- Schedule immediate refresh. modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => BucketRefresher nid ni -> STM (IO ()) restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do unchanged <- readTVar bootstrapMode writeTVar bootstrapMode True writeTVar bootstrapCountdown Nothing if not unchanged then return $ do hPutStrLn stderr "BOOTSTRAP entered bootstrap mode" refreshLastBucket r else return $ hPutStrLn stderr "BOOTSTRAP already bootstrapping" bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => BucketRefresher nid ni -> t1 ni -- ^ Nodes to bootstrap from. -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. -> IO () bootstrap r@BucketRefresher { refreshSearch = sch , refreshBuckets = var , refreshPing = ping , bootstrapMode } ns ns0 = do gotPing <- atomically $ newTVar False -- First, ping the given nodes so that they are added to -- our routing table. withTaskGroup "bootstrap.resume" 20 $ \g -> do forM_ ns $ \n -> do let lbl = show $ kademliaLocation (searchSpace sch) n forkTask g lbl $ do b <- ping n when b $ atomically $ writeTVar gotPing True -- We resort to the hardcoded fallback nodes only when we got no -- responses. This is to lesson the burden on well-known boostrap -- nodes. fallback <- atomically (readTVar gotPing) >>= return . when . not fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do forM_ ns0 $ \n -> do forkTask g (show $ kademliaLocation (searchSpace sch) n) (void $ ping n) dput XDHT "Finished bootstrap pings." -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. join $ atomically $ do writeTVar bootstrapMode False restartBootstrap r -- -- Hopefully 'forkPollForRefresh' was invoked and can take over -- maintenance. effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime effectiveRefreshInterval BucketRefresher{ refreshInterval , refreshBuckets , bootstrapMode } num = do tbl <- readTVar refreshBuckets bootstrapping <- readTVar bootstrapMode case bootstrapping of False -> return refreshInterval True -> do -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. let fullcount = R.defaultBucketSize count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl if count == fullcount then return refreshInterval else return 15 -- seconds -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket -- changes. This will typically be invoked from 'tblTransition'. -- -- From BEP 05: -- -- > Each bucket should maintain a "last changed" property to indicate how -- > "fresh" the contents are. -- -- We will use a "time to next refresh" property instead and store it in -- a priority search queue. -- -- In detail using an expository (not actually implemented) type -- 'BucketTouchEvent'... -- -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus -- >>> bucketEvents = -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, -- >>> -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, -- >>> -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced -- >>> , Applicant :--> Accepted -- with another node, -- >>> ] -- -- the bucket's last changed property should be updated. Buckets that have not -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." -- This is done by picking a random ID in the range of the bucket and -- performing a find_nodes search on it. -- -- The only other possible BucketTouchEvents are as follows: -- -- >>> not_handled = -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: -- >>> -- (Applicant :--> Stranger) -- >>> -- (Applicant :--> Accepted) -- >>> , Accepted :--> Applicant -- Never happens -- >>> ] -- -- Because this BucketTouchEvent type is not actually implemented and we only -- receive notifications of a node's new state, it suffices to reschedule the -- bucket refresh 'touchBucket' on every transition to a state other than -- 'Applicant'. -- -- XXX: Unfortunately, this means redundantly triggering twice upon every node -- replacement because we do not currently distinguish between standalone -- insertion/deletion events and an insertion/deletion pair constituting -- replacement. -- -- It might also be better to pass the timestamp of the transition here and -- keep the refresh queue in better sync with the routing table by updating it -- within the STM monad. -- -- We embed the result in the STM monad but currently, no STM state changes -- occur until the returned IO action is invoked. TODO: simplify? touchBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> RoutingTransition ni -- ^ What happened to the bucket? -> STM (IO ()) touchBucket r@BucketRefresher{ refreshSearch , refreshInterval , refreshBuckets , refreshQueue , refreshLastTouch , bootstrapMode , bootstrapCountdown } RoutingTransition{ transitionedTo , transitioningNode } = case transitionedTo of Applicant -> return $ return () -- Ignore transition to applicant. _ -> return $ do -- Reschedule for any other transition. now <- getPOSIXTime join $ atomically $ do let space = searchSpace refreshSearch nid = kademliaLocation space transitioningNode tbl <- readTVar refreshBuckets let num = R.bucketNumber space nid tbl stamp <- readTVar refreshLastTouch action <- case stamp /= 0 && (now - stamp > 60) of True -> do -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. restartBootstrap r False -> return $ return () interval <- effectiveRefreshInterval r num modifyTVar' refreshQueue $ Int.insert num (now + interval) writeTVar refreshLastTouch now return action