{-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Kademlia.Bootstrap where import Data.Function import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.POSIX (getPOSIXTime, utcTimeToPOSIXSeconds) import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Monad import Data.Bits import Data.Hashable import Data.IP import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) import Data.Ord import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) import System.IO import qualified Data.Wrapper.PSQInt as Int ;import Data.Wrapper.PSQInt (pattern (:->)) import Network.Address (bucketRange,genBucketSample) import Network.Kademlia.Search import Control.Concurrent.Tasks import Network.Kademlia type SensibleNodeId nid ni = ( Show nid , Ord nid , Ord ni , Hashable nid , Hashable ni ) data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher { -- | A staleness threshold (if a bucket goes this long without being -- touched, a refresh will be triggered). refreshInterval :: POSIXTime -- | A TVar with the time-to-refresh schedule for each bucket. -- -- To "touch" a bucket and prevent it from being refreshed, reschedule -- it's refresh time to some time into the future by modifying the -- 'Int.PSQ' in the TVar. (See 'touchBucket'). , refreshQueue :: TVar (Int.PSQ POSIXTime) -- | This is the kademlia node search specification. , refreshSearch :: Search nid addr tok ni ni -- | The current kademlia routing table buckets. , refreshBuckets :: TVar (R.BucketList ni) -- | Action to ping a node. This is used only during initial bootstrap -- to get some nodes in our table. A 'True' result is interpreted as a a -- pong, where 'False' is a non-response. , refreshPing :: ni -> IO Bool } newBucketRefresher :: (Ord addr, Ord a, Hashable a) => KademliaSpace a ni -> ni -> Search nid addr tok ni ni -> (ni -> IO Bool) -> STM (BucketRefresher nid ni) newBucketRefresher spc template_ni sch ping = do let nodeId = kademliaLocation spc bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount sched <- newTVar Int.empty return BucketRefresher { refreshInterval = 15 * 60 , refreshQueue = sched , refreshSearch = sch , refreshBuckets = bkts , refreshPing = ping } -- | This was added to avoid the compile error "Record update for -- insufficiently polymorphic field" when trying to update the existentially -- quantified field 'refreshSearch'. updateRefresherIO :: Ord addr => Search nid addr tok ni ni -> (ni -> IO Bool) -> BucketRefresher nid ni -> BucketRefresher nid ni updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher { refreshSearch = sch , refreshPing = ping , refreshInterval = refreshInterval , refreshBuckets = refreshBuckets , refreshQueue = refreshQueue } -- | Fork a refresh loop. Kill the returned thread to terminate it. forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId forkPollForRefresh BucketRefresher{ refreshInterval , refreshQueue , refreshBuckets , refreshSearch } = fork $ do myThreadId >>= flip labelThread "pollForRefresh" fix $ \again -> do join $ atomically $ do nextup <- Int.findMin <$> readTVar refreshQueue maybe retry (return . go again) nextup where refresh :: Int -> IO Int refresh = refreshBucket refreshSearch refreshBuckets go again ( bktnum :-> refresh_time ) = do now <- getPOSIXTime case fromEnum (refresh_time - now) of x | x <= 0 -> do -- Refresh time! -- Move it to the back of the refresh queue. atomically $ modifyTVar' refreshQueue $ Int.insert bktnum (now + refreshInterval) -- Now fork the refresh operation. -- TODO: We should probably propogate the kill signal to this thread. fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) _ <- refresh bktnum return () return () seconds -> threadDelay ( seconds * 1000000 ) again -- | This is a helper to 'refreshBucket' which does some book keeping to decide -- whether or not a bucket is sufficiently refreshed or not. It will return -- false when we can terminate a node search. checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. -> TVar (BucketList ni) -- ^ The current routing table. -> TVar (Set.Set ni) -- ^ In-range nodes found so far. -> TVar Bool -- ^ The result will also be written here. -> Int -- ^ The bucket number of interest. -> ni -- ^ A newly found node. -> STM Bool checkBucketFull space var resultCounter fin n found_node = do let fullcount = R.defaultBucketSize saveit True = writeTVar fin True >> return True saveit _ = return False tbl <- readTVar var let counts = R.shape tbl nid = kademliaLocation space found_node -- Update the result set with every found node that is in the -- bucket of interest. when (n == R.bucketNumber space nid tbl) $ modifyTVar' resultCounter (Set.insert found_node) resultCount <- readTVar resultCounter saveit $ case drop (n - 1) counts of (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going _ -> False -- okay, good enough, let's quit. refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) => Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int refreshBucket sch var n = do tbl <- atomically (readTVar var) let count = bktCount tbl nid = kademliaLocation (searchSpace sch) (thisNode tbl) sample <- if n+1 >= count -- Is this the last bucket? then return nid -- Yes? Search our own id. else kademliaSample (searchSpace sch) -- No? Generate a random id. getEntropy nid (bucketRange n (n + 1 < count)) fin <- atomically $ newTVar False resultCounter <- atomically $ newTVar Set.empty hPutStrLn stderr $ "Start refresh " ++ show (n,sample) -- Set 15 minute timeout in order to avoid overlapping refreshes. s <- search sch tbl sample $ if n+1 == R.defaultBucketCount then const $ return True -- Never short-circuit the last bucket. else checkBucketFull (searchSpace sch) var resultCounter fin n _ <- timeout (15*60*1000000) $ do atomically $ searchIsFinished s >>= check atomically $ searchCancel s hPutStrLn stderr $ "Finish refresh " ++ show (n,sample) rcount <- atomically $ do c <- Set.size <$> readTVar resultCounter b <- readTVar fin return $ if b then 1 else c return rcount bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => BucketRefresher nid ni -> t1 ni -- ^ Nodes to bootstrap from. -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. -> IO () bootstrap BucketRefresher { refreshSearch = sch , refreshBuckets = var , refreshPing = ping } ns ns0 = do gotPing <- atomically $ newTVar False -- First, ping the given nodes so that they are added to -- our routing table. withTaskGroup "bootstrap.resume" 20 $ \g -> do forM_ ns $ \n -> do let lbl = show $ kademliaLocation (searchSpace sch) n forkTask g lbl $ do b <- ping n when b $ atomically $ writeTVar gotPing True -- We resort to the hardcoded fallback nodes only when we got no -- responses. This is to lesson the burden on well-known boostrap -- nodes. fallback <- atomically (readTVar gotPing) >>= return . when . not fallback $ withTaskGroup "bootstrap.ping" 20 $ \g -> do forM_ ns0 $ \n -> do forkTask g (show $ kademliaLocation (searchSpace sch) n) (void $ ping n) hPutStrLn stderr "Finished bootstrap pings." -- Now search our own Id by refreshing the last bucket. last <- atomically $ bktCount <$> readTVar var void $ refreshBucket sch var last -- That's it. -- -- Hopefully 'forkPollForRefresh' was invoked and can take over -- maintenance. -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket -- changes. This will typically be invoked from 'tblTransition'. -- -- From BEP 05: -- -- > Each bucket should maintain a "last changed" property to indicate how -- > "fresh" the contents are. -- -- We will use a "time to next refresh" property instead and store it in -- a priority search queue. -- -- In detail using an expository (not actually implemented) type -- 'BucketTouchEvent'... -- -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus -- >>> bucketEvents = -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, -- >>> -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, -- >>> -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced -- >>> , Applicant :--> Accepted -- with another node, -- >>> ] -- -- the bucket's last changed property should be updated. Buckets that have not -- been changed in 15 minutes (see 'refreshInterval') should be "refreshed." -- This is done by picking a random ID in the range of the bucket and -- performing a find_nodes search on it. -- -- The only other possible BucketTouchEvents are as follows: -- -- >>> not_handled = -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: -- >>> -- (Applicant :--> Stranger) -- >>> -- (Applicant :--> Accepted) -- >>> , Accepted :--> Applicant -- Never happens -- >>> ] -- -- Because this BucketTouchEvent type is not actually implemented and we only -- receive notifications of a node's new state, it suffices to reschedule the -- bucket refresh 'touchBucket' on every transition to a state other than -- 'Applicant'. -- -- XXX: Unfortunately, this means redundantly triggering twice upon every node -- replacement because we do not currently distinguish between standalone -- insertion/deletion events and an insertion/deletion pair constituting -- replacement. -- -- It might also be better to pass the timestamp of the transition here and -- keep the refresh queue in better sync with the routing table by updating it -- within the STM monad. -- -- We embed the result in the STM monad but currently, no STM state changes -- occur until the returned IO action is invoked. TODO: simplify? touchBucket :: BucketRefresher nid ni -> RoutingTransition ni -- ^ What happened to the bucket? -> STM (IO ()) touchBucket BucketRefresher{ refreshSearch , refreshInterval , refreshBuckets , refreshQueue } RoutingTransition{ transitionedTo , transitioningNode } = case transitionedTo of Applicant -> return $ return () -- Ignore transition to applicant. _ -> return $ do -- Reschedule for any other transition. now <- getPOSIXTime atomically $ do let space = searchSpace refreshSearch nid = kademliaLocation space transitioningNode num <- R.bucketNumber space nid <$> readTVar refreshBuckets modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval)