From 5ea2de4e858cc89282561922bae257b6f9041d2e Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Mon, 30 Dec 2019 20:19:57 -0500 Subject: Switch to async search query design. --- kad/src/Network/Kademlia/Bootstrap.hs | 36 ++++++++--------- kad/src/Network/Kademlia/CommonAPI.hs | 15 ++++--- kad/src/Network/Kademlia/Search.hs | 76 +++++++++++++++++++++-------------- 3 files changed, 72 insertions(+), 55 deletions(-) (limited to 'kad/src/Network/Kademlia') diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index c07b3c6c..bd09214f 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs @@ -52,7 +52,7 @@ type SensibleNodeId nid ni = , Hashable nid , Hashable ni ) -data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher +data BucketRefresher nid ni qk = forall tok addr. Ord addr => BucketRefresher { -- | A staleness threshold (if a bucket goes this long without being -- touched, a refresh will be triggered). refreshInterval :: POSIXTime @@ -63,7 +63,7 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher -- priority in this priority search queue. , refreshQueue :: TVar (Int.PSQ POSIXTime) -- | This is the kademlia node search specification. - , refreshSearch :: Search nid addr tok ni ni + , refreshSearch :: Search nid addr tok ni ni qk -- | The current kademlia routing table buckets. , refreshBuckets :: TVar (R.BucketList ni) -- | Action to ping a node. This is used only during initial bootstrap @@ -84,9 +84,9 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher newBucketRefresher :: ( Ord addr, Hashable addr , SensibleNodeId nid ni ) => TVar (R.BucketList ni) - -> Search nid addr tok ni ni + -> Search nid addr tok ni ni qk -> (ni -> IO Bool) - -> STM (BucketRefresher nid ni) + -> STM (BucketRefresher nid ni qk) newBucketRefresher bkts sch ping = do let spc = searchSpace sch nodeId = kademliaLocation spc @@ -112,9 +112,9 @@ newBucketRefresher bkts sch ping = do -- insufficiently polymorphic field" when trying to update the existentially -- quantified field 'refreshSearch'. updateRefresherIO :: Ord addr - => Search nid addr tok ni ni + => Search nid addr tok ni ni qk -> (ni -> IO Bool) - -> BucketRefresher nid ni -> BucketRefresher nid ni + -> BucketRefresher nid ni qk -> BucketRefresher nid ni qk updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher { refreshSearch = sch , refreshPing = ping @@ -128,7 +128,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher } -- | Fork a refresh loop. Kill the returned thread to terminate it. -forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId +forkPollForRefresh :: Ord qk => SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ThreadId forkPollForRefresh r@BucketRefresher{ refreshInterval , refreshQueue , refreshBuckets @@ -194,7 +194,7 @@ checkBucketFull space var resultCounter fin n found_node = do -- | Called from 'refreshBucket' with the current time when a refresh of the -- supplied bucket number finishes. -onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) +onFinishedRefresh :: BucketRefresher nid ni qk -> Int -> POSIXTime -> STM (IO ()) onFinishedRefresh BucketRefresher { bootstrapCountdown , bootstrapMode , refreshQueue @@ -235,11 +235,11 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) -data BucketSearch nid ni = forall addr tok. BucketSearch +data BucketSearch nid ni = forall addr tok qk. BucketSearch { bucketSample :: nid , bucketResults :: TVar (Set.Set ni) , bucketFinFlag :: TVar Bool - , bucketState :: SearchState nid addr tok ni ni + , bucketState :: SearchState nid addr tok ni ni qk , bucketThread :: ThreadId } @@ -253,8 +253,8 @@ insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe insertBucketState bst Nothing = Just [bst] insertBucketState bst (Just xs) = Just (bst : xs) -refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => - BucketRefresher nid ni -> Int -> IO Int +refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni, Ord qk) => + BucketRefresher nid ni qk -> Int -> IO Int refreshBucket r@BucketRefresher{ refreshSearch = sch , refreshBuckets = var , refreshState = rstate } @@ -297,7 +297,7 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch return $ if b then 1 else c return rcount -refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () +refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO () refreshLastBucket r@BucketRefresher { refreshBuckets , refreshQueue } = do @@ -308,7 +308,7 @@ refreshLastBucket r@BucketRefresher { refreshBuckets modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => - BucketRefresher nid ni -> STM (IO ()) + BucketRefresher nid ni qk -> STM (IO ()) restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do unchanged <- readTVar bootstrapMode writeTVar bootstrapMode True @@ -319,7 +319,7 @@ restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do else return $ dput XRefresh "BOOTSTRAP already bootstrapping" bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => - BucketRefresher nid ni + BucketRefresher nid ni qk -> t1 ni -- ^ Nodes to bootstrap from. -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. -> IO () @@ -356,7 +356,7 @@ bootstrap r@BucketRefresher { refreshSearch = sch -- maintenance. -effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime +effectiveRefreshInterval :: BucketRefresher nid ni qk -> Int -> STM POSIXTime effectiveRefreshInterval BucketRefresher{ refreshInterval , refreshBuckets , bootstrapMode } num = do @@ -429,7 +429,7 @@ effectiveRefreshInterval BucketRefresher{ refreshInterval -- We embed the result in the STM monad but currently, no STM state changes -- occur until the returned IO action is invoked. TODO: simplify? touchBucket :: SensibleNodeId nid ni - => BucketRefresher nid ni + => BucketRefresher nid ni qk -> RoutingTransition ni -- ^ What happened to the bucket? -> STM (IO ()) touchBucket r@BucketRefresher{ refreshSearch @@ -461,7 +461,7 @@ touchBucket r@BucketRefresher{ refreshSearch writeTVar refreshLastTouch now return action -refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni +refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> Kademlia nid ni refreshKademlia r@BucketRefresher { refreshSearch = sch , refreshPing = ping , refreshBuckets = bkts diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs index 6d3fd16c..bcbfe9d8 100644 --- a/kad/src/Network/Kademlia/CommonAPI.hs +++ b/kad/src/Network/Kademlia/CommonAPI.hs @@ -20,7 +20,8 @@ import Network.Kademlia.Search import Network.Kademlia.Routing as R import Crypto.Tox (SecretKey,PublicKey) -data DHT = forall nid ni. ( Show ni +data DHT = forall nid ni qk. + ( Show ni , Read ni , ToJSON ni , FromJSON ni @@ -31,9 +32,10 @@ data DHT = forall nid ni. ( Show ni , Hashable nid , Typeable ni , S.Serialize nid + , Ord qk ) => DHT - { dhtBuckets :: BucketRefresher nid ni + { dhtBuckets :: BucketRefresher nid ni qk , dhtSecretKey :: STM (Maybe SecretKey) , dhtPing :: Map.Map String (DHTPing ni) , dhtQuery :: Map.Map String (DHTQuery nid ni) @@ -45,13 +47,14 @@ data DHT = forall nid ni. ( Show ni , dhtShowHexId :: Maybe (nid -> String) } -data DHTQuery nid ni = forall addr r tok. +data DHTQuery nid ni = forall addr r tok qk. ( Ord addr , Typeable r , Typeable tok , Typeable ni + , Ord qk ) => DHTQuery - { qsearch :: Search nid addr tok ni r + { qsearch :: Search nid addr tok ni r qk , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. , qshowR :: r -> String , qshowTok :: tok -> Maybe String @@ -75,9 +78,9 @@ data DHTAnnouncable nid = forall dta tok ni r. , announceTarget :: dta -> nid } -data DHTSearch nid ni = forall addr tok r. DHTSearch +data DHTSearch nid ni = forall addr tok r qk. DHTSearch { searchThread :: ThreadId - , searchState :: SearchState nid addr tok ni r + , searchState :: SearchState nid addr tok ni r qk , searchShowTok :: tok -> Maybe String , searchResults :: TVar (Set.Set String) } diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 8d9c997b..e30c40da 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs @@ -19,6 +19,8 @@ import Control.Concurrent.Tasks import Control.Concurrent.STM import Control.Monad import Data.Function +import qualified Data.Map.Strict as Map + ;import Data.Map.Strict (Map) import Data.Maybe import qualified Data.Set as Set ;import Data.Set (Set) @@ -38,10 +40,11 @@ import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif -data Search nid addr tok ni r = Search +data Search nid addr tok ni r qk = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) + , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk + , searchQueryCancel :: qk -> STM () , searchAlpha :: Int -- α = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we @@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search , searchK :: Int -- K = 16 } -data SearchState nid addr tok ni r = SearchState +data SearchState nid addr tok ni r qk = SearchState { -- | The number of pending queries. Incremented before any query is sent -- and decremented when we get a reply. searchPendingCount :: TVar Int + , searchPending :: TVar (Map qk (STM ())) -- | Nodes scheduled to be queried (roughly at most K). -- -- This will be set to Nothing when a search is canceled. @@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha -- should limit the number of outstanding queries. , searchVisited :: TVar (Set addr) - , searchSpec :: Search nid addr tok ni r + , searchSpec :: Search nid addr tok ni r qk } @@ -87,12 +91,13 @@ newSearch :: ( Ord addr -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} - Search nid addr tok ni r + Search nid addr tok ni r qk -> nid -> [ni] -- Initial nodes to query. - -> STM (SearchState nid addr tok ni r) -newSearch s@(Search space nAddr qry _ _) target ns = do + -> STM (SearchState nid addr tok ni r qk) +newSearch s@(Search space nAddr qry _ _ _) target ns = do c <- newTVar 0 + p <- newTVar Map.empty q <- newTVar $ Just $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) @@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do i <- newTVar MM.empty v <- newTVar Set.empty return -- (Search space nAddr qry) , r , target - ( SearchState c q i v s ) + ( SearchState c p q i v s ) -- | Discard a value from a key-priority-value tuple. This is useful for -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". @@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid) -- | Reset a 'SearchState' object to ready it for a repeated search. reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => (nid -> STM [ni]) - -> Search nid addr1 tok1 ni r1 + -> Search nid addr1 tok1 ni r1 qk -> nid - -> SearchState nid addr tok ni r - -> STM (SearchState nid addr tok ni r) + -> SearchState nid addr tok ni r qk + -> STM (SearchState nid addr tok ni r qk) reset nearestNodes qsearch target st = do + pc <- readTVar (searchPendingCount st) + check (pc == 0) searchIsFinished st >>= check -- Wait for a search to finish before resetting. bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) <$> nearestNodes target @@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do writeTVar (searchPendingCount st) 0 return st -sendQuery :: forall addr nid tok ni r. +grokQuery :: forall addr nid tok ni r qk. ( Ord addr , PSQKey nid , PSQKey ni , Show nid + , Ord qk ) => - Search nid addr tok ni r + Search nid addr tok ni r qk -> nid -> (r -> STM Bool) -- ^ return False to terminate the search. - -> SearchState nid addr tok ni r + -> SearchState nid addr tok ni r qk -> Binding ni nid + -> qk + -> Result ([ni],[r],Maybe tok) -> IO () -sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do - myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) +grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do + -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) + -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) -- (ns,rs) - let tok = error "TODO: token" + -- let tok = error "TODO: token" atomically $ do - modifyTVar searchPendingCount pred + modifyTVar' searchPendingCount pred + modifyTVar' searchPending $ Map.delete qk case reply of Success x -> go x _ -> return () @@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d flip fix rs $ \loop -> \case r:rs' -> do - wanting <- searchResult r + wanting <- withSearchResult r if wanting then loop rs' else searchCancel sch [] -> return () @@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = searchIsFinished :: ( PSQKey nid , PSQKey ni - ) => SearchState nid addr tok ni r -> STM Bool + ) => SearchState nid addr tok ni r qk -> STM Bool searchIsFinished SearchState{..} = do readTVar searchQueued >>= \case Just q -> do @@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do <= PSQ.prio (fromJust $ MM.findMin q)))) Nothing -> return True -searchCancel :: SearchState nid addr tok ni r -> STM () +searchCancel :: SearchState nid addr tok ni r qk -> STM () searchCancel SearchState{..} = do writeTVar searchQueued Nothing + m <- readTVar searchPending + foldr (>>) (return ()) m search :: ( Ord r @@ -201,7 +214,8 @@ search :: , PSQKey nid , PSQKey ni , Show nid - ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) + , Ord qk + ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId) search sch buckets target result = do let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets st <- atomically $ newSearch sch target ns @@ -211,11 +225,11 @@ search sch buckets target result = do atomically $ writeTVar v True return (st,t) -searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) - => Search nid addr tok ni r -- ^ Query and distance methods. - -> nid -- ^ The target we are searching for. - -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. - -> SearchState nid addr tok ni r -- ^ Search-related state. +searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk ) + => Search nid addr tok ni r qk -- ^ Query and distance methods. + -> nid -- ^ The target we are searching for. + -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. + -> SearchState nid addr tok ni r qk -- ^ Search-related state. -> IO () searchLoop sch@Search{..} target result s@SearchState{..} = do myThreadId >>= flip labelThread ("search."++show target) @@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do - forkTask g - "searchQuery" - $ sendQuery sch target result s (ni :-> d) + qk <- searchQuery target ni $ + \qk reply -> grokQuery sch target result s (ni :-> d) qk reply + atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) again _ -> searchIsFinished s >>= check >> return (return ()) -- cgit v1.2.3