From d8a7ad88bfdb76b7c481c0ce89de63528a06e734 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 27 Dec 2019 01:20:59 -0500 Subject: Made the BucketRefresher state accessible from CommonAPI. --- dht/examples/dhtd.hs | 26 +++++++++---------- kad/src/Network/Kademlia/Bootstrap.hs | 44 +++++++++++++++++++++++++++++---- kad/src/Network/Kademlia/CommonAPI.hs | 8 ++++-- kad/src/Network/Kademlia/Persistence.hs | 2 +- kad/src/Network/Kademlia/Search.hs | 9 ++++--- 5 files changed, 65 insertions(+), 24 deletions(-) diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index ef3f6bd4..4f83beb2 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs @@ -551,7 +551,7 @@ clientSession s@Session{..} sock cnum h = do ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts -> cmd0 $ do - bkts <- atomically $ readTVar dhtBuckets + bkts <- atomically $ readTVar (refreshBuckets dhtBuckets) let r = reportTable bkts hPutClient h $ showReport $ @@ -562,7 +562,7 @@ clientSession s@Session{..} sock cnum h = do ("r", s) | Just DHT{dhtQuery,dhtBuckets} <- Map.lookup netname dhts , Just DHTQuery{qsearch} <- Map.lookup "node" dhtQuery -> cmd0 $ do - ni <- atomically $ thisNode <$> readTVar dhtBuckets + ni <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) let kad = searchSpace qsearch nid = kademliaLocation kad ni b = case readMaybe $ strp s of @@ -768,7 +768,7 @@ clientSession s@Session{..} sock cnum h = do -- arguments: method -- nid -- (optional dest-ni) - self <- atomically $ thisNode <$> readTVar dhtBuckets + self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) let (method,xs) = break isSpace $ dropWhile isSpace s (nidstr,ys) = break isSpace $ dropWhile isSpace xs destination = dropWhile isSpace ys @@ -819,7 +819,7 @@ clientSession s@Session{..} sock cnum h = do -- data (jid or key) data -- dest-rendezvous(r) token -- (optional extra-text) (optional dest-ni) - self <- atomically $ thisNode <$> readTVar dhtBuckets + self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) let (method,xs) = break isSpace $ dropWhile isSpace s (dtastr,ys) = break isSpace $ dropWhile isSpace xs (tokenstr,zs) = break isSpace $ dropWhile isSpace ys @@ -941,7 +941,7 @@ clientSession s@Session{..} sock cnum h = do (\nid -> R.kclosest (searchSpace qsearch) (searchK qsearch) nid - <$> readTVar dhtBuckets) + <$> readTVar (refreshBuckets dhtBuckets)) (announceTarget dta) announceInterval) dta @@ -967,7 +967,7 @@ clientSession s@Session{..} sock cnum h = do (\nid -> R.kclosest (searchSpace qsearch) (searchK qsearch) nid - <$> readTVar dhtBuckets) + <$> readTVar (refreshBuckets dhtBuckets)) (announceTarget dta) announceInterval) dta @@ -1009,7 +1009,7 @@ clientSession s@Session{..} sock cnum h = do join $ atomically $ do schs <- readTVar dhtSearches case Map.lookup (method,nid) schs of - Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar + Nothing -> do forkSearch method nid qry dhtSearches (refreshBuckets dhtBuckets) tid kvar return $ presentSearches Just sch -> do writeTVar kvar (Just $ return ()) return $ reportSearchResults method h sch @@ -1614,7 +1614,7 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of tcpclient = tcpKademliaClient $ Tox.toxOnionRoutes tox tcpRefresher = tcpBucketRefresher $ Tox.toxOnionRoutes tox tcpDHT = DHT - { dhtBuckets = refreshBuckets tcpRefresher + { dhtBuckets = tcpRefresher , dhtSecretKey = return $ Just $ transportSecret (Tox.toxCryptoKeys tox) , dhtPing = Map.singleton "ping" DHTPing { pingQuery = noArgPing $ TCP.tcpPing (TCP.tcpClient tcpclient) @@ -1639,9 +1639,9 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of , dhtShowHexId = Just Tox.showHexId } dhts = Map.fromList $ - ("tox4", toxDHT Tox.routing4 Want_IP4) + ("tox4", toxDHT Tox.refresher4 Want_IP4) : (if ip6tox opts - then ( ("tox6", toxDHT Tox.routing6 Want_IP6) :) + then ( ("tox6", toxDHT Tox.refresher6 Want_IP6) :) else id) (if enableTCPDHT opts then [ ("toxtcp", tcpDHT) ] @@ -1674,7 +1674,7 @@ initJabber opts ssvar announcer mbtox toxchat = case portxmpp opts of let lookupBkts :: String -> Map.Map String DHT -> Maybe (String,TVar (BucketList Tox.NodeInfo)) lookupBkts name m = case Map.lookup name m of Nothing -> Nothing - Just DHT{dhtBuckets} -> cast (name, dhtBuckets) + Just DHT{dhtBuckets} -> cast (name, refreshBuckets dhtBuckets) sv <- xmppServer Tcp.noCleanUp (Just sport) tcp <- xmppConnections sv -- :: IO ( Manager TCPStatus T.Text ) let tman = toxman ssvar announcer <$> mbtox @@ -1795,9 +1795,9 @@ main = do , dhtShowHexId = Nothing } dhts = Map.fromList $ - ("bt4", mainlineDHT Mainline.routing4 Want_IP4) + ("bt4", mainlineDHT Mainline.refresher4 Want_IP4) : if ip6bt opts - then [ ("bt6", mainlineDHT Mainline.routing6 Want_IP6) ] + then [ ("bt6", mainlineDHT Mainline.refresher6 Want_IP6) ] else [] ips :: IO [SockAddr] ips = readExternals Mainline.nodeAddr diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index 08ba3318..c07b3c6c 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs @@ -16,6 +16,8 @@ module Network.Kademlia.Bootstrap where import Data.Function +import qualified Data.IntMap.Strict as IntMap + ;import Data.IntMap.Strict (IntMap) import Data.Maybe import qualified Data.Set as Set import Data.Time.Clock.POSIX (getPOSIXTime) @@ -71,10 +73,12 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher , -- | Timestamp of last bucket event. refreshLastTouch :: TVar POSIXTime , -- | This variable indicates whether or not we are in bootstrapping mode. - bootstrapMode :: TVar Bool + bootstrapMode :: TVar Bool , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on -- every finished refresh. bootstrapCountdown :: TVar (Maybe Int) + -- | Internal state of background searches. Exposed for debugging purposes. + , refreshState :: TVar (IntMap [BucketSearch nid ni]) } newBucketRefresher :: ( Ord addr, Hashable addr @@ -91,6 +95,7 @@ newBucketRefresher bkts sch ping = do lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... bootstrapVar <- newTVar True -- Start in bootstrapping mode. bootstrapCnt <- newTVar Nothing + st <- newTVar IntMap.empty return BucketRefresher { refreshInterval = 15 * 60 , refreshQueue = sched @@ -100,6 +105,7 @@ newBucketRefresher bkts sch ping = do , refreshLastTouch = lasttouch , bootstrapMode = bootstrapVar , bootstrapCountdown = bootstrapCnt + , refreshState = st } -- | This was added to avoid the compile error "Record update for @@ -118,6 +124,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher , refreshLastTouch = refreshLastTouch , bootstrapMode = bootstrapMode , bootstrapCountdown = bootstrapCountdown + , refreshState = refreshState } -- | Fork a refresh loop. Kill the returned thread to terminate it. @@ -228,10 +235,29 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) +data BucketSearch nid ni = forall addr tok. BucketSearch + { bucketSample :: nid + , bucketResults :: TVar (Set.Set ni) + , bucketFinFlag :: TVar Bool + , bucketState :: SearchState nid addr tok ni ni + , bucketThread :: ThreadId + } + +removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] +removeBucketState bst Nothing = Nothing +removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of + [] -> Nothing + ys -> Just ys + +insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] +insertBucketState bst Nothing = Just [bst] +insertBucketState bst (Just xs) = Just (bst : xs) + refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => BucketRefresher nid ni -> Int -> IO Int refreshBucket r@BucketRefresher{ refreshSearch = sch - , refreshBuckets = var } + , refreshBuckets = var + , refreshState = rstate } n = do tbl <- atomically (readTVar var) let count = bktCount tbl @@ -248,13 +274,21 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch dput XRefresh $ "Start refresh " ++ show (n,sample) -- Set 15 minute timeout in order to avoid overlapping refreshes. - s <- search sch tbl sample $ if n+1 == R.defaultBucketCount - then const $ return True -- Never short-circuit the last bucket. - else checkBucketFull (searchSpace sch) var resultCounter fin n + (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount + then const $ return True -- Never short-circuit the last bucket. + else checkBucketFull (searchSpace sch) var resultCounter fin n + let bstate = BucketSearch sample resultCounter fin s thread + atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n _ <- timeout (15*60*1000000) $ do atomically $ searchIsFinished s >>= check atomically $ searchCancel s dput XDHT $ "Finish refresh " ++ show (n,sample) + bg <- forkIO $ do + atomically $ do + searchIsFinished s >>= check + modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n + + labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample) now <- getPOSIXTime join $ atomically $ onFinishedRefresh r n now rcount <- atomically $ do diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs index 4714cecc..6d3fd16c 100644 --- a/kad/src/Network/Kademlia/CommonAPI.hs +++ b/kad/src/Network/Kademlia/CommonAPI.hs @@ -1,5 +1,8 @@ {-# LANGUAGE ExistentialQuantification #-} -module Network.Kademlia.CommonAPI where +module Network.Kademlia.CommonAPI + ( module Network.Kademlia.CommonAPI + , refreshBuckets + ) where import Control.Concurrent @@ -12,6 +15,7 @@ import qualified Data.Set as Set import Data.Time.Clock.POSIX import Data.Typeable +import Network.Kademlia.Bootstrap import Network.Kademlia.Search import Network.Kademlia.Routing as R import Crypto.Tox (SecretKey,PublicKey) @@ -29,7 +33,7 @@ data DHT = forall nid ni. ( Show ni , S.Serialize nid ) => DHT - { dhtBuckets :: TVar (BucketList ni) + { dhtBuckets :: BucketRefresher nid ni , dhtSecretKey :: STM (Maybe SecretKey) , dhtPing :: Map.Map String (DHTPing ni) , dhtQuery :: Map.Map String (DHTQuery nid ni) diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs index 32ec169d..f89287fe 100644 --- a/kad/src/Network/Kademlia/Persistence.hs +++ b/kad/src/Network/Kademlia/Persistence.hs @@ -16,7 +16,7 @@ import System.IO.Error saveNodes :: String -> DHT -> IO () saveNodes netname DHT{dhtBuckets} = do - bkts <- atomically $ readTVar dhtBuckets + bkts <- atomically $ readTVar (refreshBuckets dhtBuckets) let ns = map fst $ concat $ R.toList bkts bs = J.encode ns fname = nodesFileName netname diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 5b60c303..856a7cfc 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs @@ -194,12 +194,15 @@ search :: , PSQKey nid , PSQKey ni , Show nid - ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) + ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) search sch buckets target result = do let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets st <- atomically $ newSearch sch target ns - t <- forkIO $ searchLoop sch target result st - return st + v <- newTVarIO False + t <- forkIO $ atomically (check =<< readTVar v) >> searchLoop sch target result st + labelThread t ("search.pending." ++ show target) + atomically $ writeTVar v True + return (st,t) searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) => Search nid addr tok ni r -- ^ Query and distance methods. -- cgit v1.2.3