{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Kademlia.Bootstrap where import Data.Function import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Bits import Data.Hashable #if MIN_VERSION_iproute(1,7,4) import Data.IP hiding (fromSockAddr) #else import Data.IP #endif import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) import Data.Ord import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import System.IO import DPut import qualified Data.Wrapper.PSQInt as Int ;import Data.Wrapper.PSQInt (pattern (:->)) import Network.Address (bucketRange,genBucketSample) import Network.Kademlia.Search import Control.Concurrent.Tasks import Network.Kademlia type SensibleNodeId nid ni = ( Show nid , Ord nid , Ord ni , Hashable nid , Hashable ni ) data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher { -- | A staleness threshold (if a bucket goes this long without being -- touched, a refresh will be triggered). refreshInterval :: POSIXTime -- | A TVar with the time-to-refresh schedule for each bucket. -- -- To "touch" a bucket and prevent it from being refreshed, reschedule -- its refresh time to some time into the future by modifying its -- priority in this priority search queue. , refreshQueue :: TVar (Int.PSQ POSIXTime) -- | This is the kademlia node search specification. , refreshSearch :: Search nid addr tok ni ni -- | The current kademlia routing table buckets. , refreshBuckets :: TVar (R.BucketList ni) -- | Action to ping a node. This is used only during initial bootstrap -- to get some nodes in our table. A 'True' result is interpreted as a a -- pong, where 'False' is a non-response. , refreshPing :: ni -> IO Bool , -- | Timestamp of last bucket event. refreshLastTouch :: TVar POSIXTime , -- | This variable indicates whether or not we are in bootstrapping mode. bootstrapMode :: TVar Bool , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on -- every finished refresh. bootstrapCountdown :: TVar (Maybe Int) } newBucketRefresher :: ( Ord addr, Hashable addr , SensibleNodeId nid ni ) => ni -> Search nid addr tok ni ni -> (ni -> IO Bool) -> STM (BucketRefresher nid ni) newBucketRefresher template_ni sch ping = do let spc = searchSpace sch nodeId = kademliaLocation spc bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount sched <- newTVar Int.empty lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... bootstrapVar <- newTVar True -- Start in bootstrapping mode. bootstrapCnt <- newTVar Nothing return BucketRefresher { refreshInterval = 15 * 60 , refreshQueue = sched , refreshSearch = sch , refreshBuckets = bkts , refreshPing = ping , refreshLastTouch = lasttouch , bootstrapMode = bootstrapVar , bootstrapCountdown = bootstrapCnt } -- | This was added to avoid the compile error "Record update for -- insufficiently polymorphic field" when trying to update the existentially -- quantified field 'refreshSearch'. updateRefresherIO :: Ord addr => Search nid addr tok ni ni -> (ni -> IO Bool) -> BucketRefresher nid ni -> BucketRefresher nid ni updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher { refreshSearch = sch , refreshPing = ping , refreshInterval = refreshInterval , refreshBuckets = refreshBuckets , refreshQueue = refreshQueue , refreshLastTouch = refreshLastTouch , bootstrapMode = bootstrapMode , bootstrapCountdown = bootstrapCountdown } -- | Fork a refresh loop. Kill the returned thread to terminate it. forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId forkPollForRefresh r@BucketRefresher{ refreshInterval , refreshQueue , refreshBuckets , refreshSearch } = fork $ do myThreadId >>= flip labelThread "pollForRefresh" fix $ \again -> do join $ atomically $ do nextup <- Int.findMin <$> readTVar refreshQueue maybe retry (return . go again) nextup where refresh :: Int -> IO Int refresh n = do -- dput XRefresh $ "Refresh time! "++ show n refreshBucket r n go again ( bktnum :-> refresh_time ) = do now <- getPOSIXTime case fromEnum (refresh_time - now) of x | x <= 0 -> do -- Refresh time! -- Move it to the back of the refresh queue. atomically $ do interval <- effectiveRefreshInterval r bktnum modifyTVar' refreshQueue $ Int.insert bktnum (now + interval) -- Now fork the refresh operation. -- TODO: We should probably propogate the kill signal to this thread. fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) _ <- refresh bktnum return () return () picoseconds -> do -- dput XRefresh $ show (picoseconds `div` 10^12) ++ " seconds until refresh " ++ show bktnum threadDelay ( picoseconds `div` 10^6 ) again -- | This is a helper to 'refreshBucket' which does some book keeping to decide -- whether or not a bucket is sufficiently refreshed or not. It will return -- false when we can terminate a node search. checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. -> TVar (BucketList ni) -- ^ The current routing table. -> TVar (Set.Set ni) -- ^ In-range nodes found so far. -> TVar Bool -- ^ The result will also be written here. -> Int -- ^ The bucket number of interest. -> ni -- ^ A newly found node. -> STM Bool checkBucketFull space var resultCounter fin n found_node = do let fullcount = R.defaultBucketSize saveit True = writeTVar fin True >> return True saveit _ = return False tbl <- readTVar var let counts = R.shape tbl nid = kademliaLocation space found_node -- Update the result set with every found node that is in the -- bucket of interest. when (n == R.bucketNumber space nid tbl) $ modifyTVar' resultCounter (Set.insert found_node) resultCount <- readTVar resultCounter saveit $ case drop (n - 1) counts of (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going _ -> False -- okay, good enough, let's quit. -- | Called from 'refreshBucket' with the current time when a refresh of the -- supplied bucket number finishes. onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) onFinishedRefresh BucketRefresher { bootstrapCountdown , bootstrapMode , refreshQueue , refreshBuckets } num now = do bootstrapping <- readTVar bootstrapMode if not bootstrapping then return $ return () -- dput XRefresh $ "Finished non-boostrapping refresh: "++show num else do tbl <- readTVar refreshBuckets action <- if num /= R.bktCount tbl - 1 then do modifyTVar' bootstrapCountdown (fmap pred) return $ return () -- dput XRefresh $ "BOOTSTRAP decrement" else do -- The last bucket finished. cnt <- readTVar bootstrapCountdown case cnt of Nothing -> do let fullsize = R.defaultBucketSize notfull (n,len) | n==num = False | len>=fullsize = False | otherwise = True unfull = case filter notfull $ zip [0..] (R.shape tbl) of [] -> [(0,0)] -- Schedule at least 1 more refresh. xs -> xs forM_ unfull $ \(n,_) -> do -- Schedule immediate refresh for unfull buckets (other than this one). modifyTVar' refreshQueue $ Int.insert n (now - 1) writeTVar bootstrapCountdown $! Just $! length unfull return $ return () -- dput XRefresh $ "BOOTSTRAP scheduling: "++show unfull Just n -> do writeTVar bootstrapCountdown $! Just $! pred n return $ return () -- dput XRefresh "BOOTSTRAP decrement (last bucket)" cnt <- readTVar bootstrapCountdown if (cnt == Just 0) then do -- Boostrap finished! writeTVar bootstrapMode False writeTVar bootstrapCountdown Nothing return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => BucketRefresher nid ni -> Int -> IO Int refreshBucket r@BucketRefresher{ refreshSearch = sch , refreshBuckets = var } n = do tbl <- atomically (readTVar var) let count = bktCount tbl nid = kademliaLocation (searchSpace sch) (thisNode tbl) sample <- if n+1 >= count -- Is this the last bucket? then return nid -- Yes? Search our own id. else kademliaSample (searchSpace sch) -- No? Generate a random id. getEntropy nid (bucketRange n (n + 1 < count)) fin <- atomically $ newTVar False resultCounter <- atomically $ newTVar Set.empty dput XRefresh $ "Start refresh " ++ show (n,sample) -- Set 15 minute timeout in order to avoid overlapping refreshes. s <- search sch tbl sample $ if n+1 == R.defaultBucketCount then const $ return True -- Never short-circuit the last bucket. else checkBucketFull (searchSpace sch) var resultCounter fin n _ <- timeout (15*60*1000000) $ do atomically $ searchIsFinished s >>= check atomically $ searchCancel s dput XDHT $ "Finish refresh " ++ show (n,sample) now <- getPOSIXTime join $ atomically $ onFinishedRefresh r n now rcount <- atomically $ do c <- Set.size <$> readTVar resultCounter b <- readTVar fin return $ if b then 1 else c return rcount refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () refreshLastBucket r@BucketRefresher { refreshBuckets , refreshQueue } = do now <- getPOSIXTime atomically $ do cnt <- bktCount <$> readTVar refreshBuckets -- Schedule immediate refresh. modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => BucketRefresher nid ni -> STM (IO ()) restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do unchanged <- readTVar bootstrapMode writeTVar bootstrapMode True writeTVar bootstrapCountdown Nothing if not unchanged then return $ do dput XRefresh "BOOTSTRAP entered bootstrap mode" refreshLastBucket r else return $ dput XRefresh "BOOTSTRAP already bootstrapping" bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => BucketRefresher nid ni -> t1 ni -- ^ Nodes to bootstrap from. -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. -> IO () bootstrap r@BucketRefresher { refreshSearch = sch , refreshBuckets = var , refreshPing = ping , bootstrapMode } ns ns0 = do gotPing <- atomically $ newTVar False -- First, ping the given nodes so that they are added to -- our routing table. withTaskGroup "bootstrap.resume" 20 $ \g -> do forM_ ns $ \n -> do let lbl = show $ kademliaLocation (searchSpace sch) n forkTask g lbl $ do b <- ping n when b $ atomically $ writeTVar gotPing True -- We resort to the hardcoded fallback nodes only when we got no -- responses. This is to lesson the burden on well-known boostrap -- nodes. fallback <- atomically (readTVar gotPing) >>= return . when . not fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do forM_ ns0 $ \n -> do forkTask g (show $ kademliaLocation (searchSpace sch) n) (void $ ping n) dput XDHT "Finished bootstrap pings." -- Now search our own Id by entering bootstrap mode from non-bootstrap mode. join $ atomically $ do writeTVar bootstrapMode False restartBootstrap r -- -- Hopefully 'forkPollForRefresh' was invoked and can take over -- maintenance. effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime effectiveRefreshInterval BucketRefresher{ refreshInterval , refreshBuckets , bootstrapMode } num = do tbl <- readTVar refreshBuckets bootstrapping <- readTVar bootstrapMode case bootstrapping of False -> return refreshInterval True -> do -- When bootstrapping, refresh interval for non-full buckets is only 15 seconds. let fullcount = R.defaultBucketSize count = fromMaybe fullcount $ listToMaybe $ drop (num - 1) $ R.shape tbl if count == fullcount then return refreshInterval else return 15 -- seconds -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket -- changes. This will typically be invoked from 'tblTransition'. -- -- From BEP 05: -- -- > Each bucket should maintain a "last changed" property to indicate how -- > "fresh" the contents are. -- -- We will use a "time to next refresh" property instead and store it in -- a priority search queue. -- -- In detail using an expository (not actually implemented) type -- 'BucketTouchEvent'... -- -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus -- >>> bucketEvents = -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, -- >>> -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, -- >>> -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced -- >>> , Applicant :--> Accepted -- with another node, -- >>> ] -- -- the bucket's last changed property should be updated. Buckets that have not -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." -- This is done by picking a random ID in the range of the bucket and -- performing a find_nodes search on it. -- -- The only other possible BucketTouchEvents are as follows: -- -- >>> not_handled = -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: -- >>> -- (Applicant :--> Stranger) -- >>> -- (Applicant :--> Accepted) -- >>> , Accepted :--> Applicant -- Never happens -- >>> ] -- -- Because this BucketTouchEvent type is not actually implemented and we only -- receive notifications of a node's new state, it suffices to reschedule the -- bucket refresh 'touchBucket' on every transition to a state other than -- 'Applicant'. -- -- XXX: Unfortunately, this means redundantly triggering twice upon every node -- replacement because we do not currently distinguish between standalone -- insertion/deletion events and an insertion/deletion pair constituting -- replacement. -- -- It might also be better to pass the timestamp of the transition here and -- keep the refresh queue in better sync with the routing table by updating it -- within the STM monad. -- -- We embed the result in the STM monad but currently, no STM state changes -- occur until the returned IO action is invoked. TODO: simplify? touchBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> RoutingTransition ni -- ^ What happened to the bucket? -> STM (IO ()) touchBucket r@BucketRefresher{ refreshSearch , refreshInterval , refreshBuckets , refreshQueue , refreshLastTouch , bootstrapMode , bootstrapCountdown } RoutingTransition{ transitionedTo , transitioningNode } = case transitionedTo of Applicant -> return $ return () -- Ignore transition to applicant. _ -> return $ do -- Reschedule for any other transition. now <- getPOSIXTime join $ atomically $ do let space = searchSpace refreshSearch nid = kademliaLocation space transitioningNode tbl <- readTVar refreshBuckets let num = R.bucketNumber space nid tbl stamp <- readTVar refreshLastTouch action <- case stamp /= 0 && (now - stamp > 60) of True -> do -- It's been one minute since any bucket has been touched, re-enter bootstrap mode. restartBootstrap r False -> return $ return () interval <- effectiveRefreshInterval r num modifyTVar' refreshQueue $ Int.insert num (now + interval) writeTVar refreshLastTouch now return action