From 36b07bb1396244b8b4ed8ad5b0c81351195d8428 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 28 Dec 2019 20:08:03 -0500 Subject: Fix searchCancel to actually stop the search loop. --- kad/kad.cabal | 2 +- kad/src/Network/Kademlia/Search.hs | 54 +++++++++++++++++---------------- kad/tests/searchCancel.hs | 61 +++++++++++++++++++++++++++++--------- 3 files changed, 76 insertions(+), 41 deletions(-) (limited to 'kad') diff --git a/kad/kad.cabal b/kad/kad.cabal index ee3754b1..4a86bc4f 100644 --- a/kad/kad.cabal +++ b/kad/kad.cabal @@ -91,6 +91,6 @@ library executable testSearch hs-source-dirs: tests - build-depends: kad, base, stm + build-depends: kad, base, stm, containers, minmax-psq main-is: searchCancel.hs diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 856a7cfc..03c18d0e 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs @@ -59,7 +59,9 @@ data SearchState nid addr tok ni r = SearchState -- and decremented when we get a reply. searchPendingCount :: TVar Int -- | Nodes scheduled to be queried (roughly at most K). - , searchQueued :: TVar (MinMaxPSQ ni nid) + -- + -- This will be set to Nothing when a search is canceled. + , searchQueued :: TVar (Maybe (MinMaxPSQ ni nid)) -- | The nearest (K - α) nodes that issued a reply. -- -- α is the maximum number of simultaneous queries. @@ -90,7 +92,8 @@ newSearch :: ( Ord addr -> STM (SearchState nid addr tok ni r) newSearch s@(Search space nAddr qry _ _) target ns = do c <- newTVar 0 - q <- newTVar $ MM.fromList + q <- newTVar $ Just + $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty @@ -115,7 +118,7 @@ reset nearestNodes qsearch target st = do bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) <$> nearestNodes target priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) - writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes + writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes writeTVar (searchInformant st) MM.empty writeTVar (searchVisited st) Set.empty writeTVar (searchPendingCount st) 0 @@ -145,22 +148,22 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited - let insertFoundNode :: Int + let xor = kademliaXor searchSpace + loc = kademliaLocation searchSpace + insertFoundNode :: Int -> ni -> MinMaxPSQ ni nid -> MinMaxPSQ ni nid insertFoundNode k n q | searchNodeAddress n `Set.member` vs = q - | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget - $ kademliaLocation searchSpace n ) - q + | otherwise = MM.insertTake k n ( xor searchTarget $ loc n ) q - qsize0 <- MM.size <$> readTVar searchQueued + qsize0 <- maybe 0 MM.size <$> readTVar searchQueued let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow -- only when there's fewer than -- K elements. - modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns + modifyTVar searchQueued $ fmap $ \q -> foldr (insertFoundNode qsize) q ns modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d flip fix rs $ \loop -> \case r:rs' -> do @@ -174,19 +177,20 @@ searchIsFinished :: ( PSQKey nid , PSQKey ni ) => SearchState nid addr tok ni r -> STM Bool searchIsFinished SearchState{..} = do - q <- readTVar searchQueued - cnt <- readTVar searchPendingCount - informants <- readTVar searchInformant - return $ cnt == 0 - && ( MM.null q - || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) - && ( PSQ.prio (fromJust $ MM.findMax informants) - <= PSQ.prio (fromJust $ MM.findMin q)))) + readTVar searchQueued >>= \case + Just q -> do + cnt <- readTVar searchPendingCount + informants <- readTVar searchInformant + return $ cnt == 0 + && ( MM.null q + || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) + && ( PSQ.prio (fromJust $ MM.findMax informants) + <= PSQ.prio (fromJust $ MM.findMin q)))) + Nothing -> return True searchCancel :: SearchState nid addr tok ni r -> STM () searchCancel SearchState{..} = do - writeTVar searchPendingCount 0 - writeTVar searchQueued MM.empty + writeTVar searchQueued Nothing search :: ( Ord r @@ -215,11 +219,11 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount - check (cnt <= 8) -- Only 8 pending queries at a time. + check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. informants <- readTVar searchInformant - found <- MM.minView <$> readTVar searchQueued + found <- fmap MM.minView <$> readTVar searchQueued case found of - Just (ni :-> d, q) + Just (Just (ni :-> d, q)) | -- If there's fewer than /k - α/ informants and there's any -- node we haven't yet got a response from. (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) @@ -229,7 +233,7 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do -- nearest /k/ informants. || (d < PSQ.prio (fromJust $ MM.findMax informants)) -> -- Then the search continues, send a query. - do writeTVar searchQueued q + do writeTVar searchQueued $ Just q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do @@ -237,6 +241,4 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do "searchQuery" $ sendQuery sch target result s (ni :-> d) again - _ -> -- Otherwise, we are finished. - do check (cnt == 0) - return $ return () + _ -> searchIsFinished s >>= check >> return (return ()) diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs index 3458ab37..85986674 100644 --- a/kad/tests/searchCancel.hs +++ b/kad/tests/searchCancel.hs @@ -10,14 +10,29 @@ import Network.Kademlia.Persistence import Network.Kademlia.Routing import Network.Kademlia.Search -makeSchResults :: TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) -makeSchResults var = do - threadDelay 100000 - atomically $ do +import qualified Data.MinMaxPSQ as MM +import qualified Data.Set as Set + +makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) +makeSchResults mbv var = do + threadDelay 200000 + (r,io) <- atomically $ do n <- readTVar var let ns = take 4 [n .. ] writeTVar var $! n + 4 - return $ Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) + let r = Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) + stmio = if n > 490 + then do + ms <- readTVar mbv + case ms of + Just s -> do report <- showSearchState s + return $ putStrLn $ "cnt=" ++ show n ++ " " ++ report + _ -> return $ return () + else return $ return () + io <- stmio + return (r,io) + io + return r kad :: KademliaSpace Int Int kad = KademliaSpace @@ -27,11 +42,11 @@ kad = KademliaSpace , kademliaSample = \_ x _ -> pure x } -sch :: TVar Int -> Search Int Int () Int Int -sch var = Search +sch :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> Search Int Int () Int Int +sch mbv var = Search { searchSpace = kad , searchNodeAddress = id - , searchQuery = \_ _ -> makeSchResults var + , searchQuery = \_ _ -> makeSchResults mbv var , searchAlpha = 4 , searchK = 8 } @@ -39,11 +54,25 @@ sch var = Search onResult :: Int -> STM Bool onResult k = return (k < 50000) +showSearchState st = do + pc <- readTVar (searchPendingCount st) + q <- readTVar (searchQueued st) + inf <- readTVar (searchInformant st) + vset <- readTVar (searchVisited st) + return $ unwords + [ "pending=" ++ show pc + , "|q|=" ++ show (maybe 0 MM.size q) + , "|inf|=" ++ show (MM.size inf) + , "|visited|=" ++ show (Set.size vset) + ] + main = do - var <- newTVarIO 0 + var <- newTVarIO 5 fin <- newTVarIO False - s <- atomically $ newSearch (sch var) maxBound [1..4] - t <- forkIO $ do searchLoop (sch var) maxBound onResult s + mbstvar <- newTVarIO Nothing + s <- atomically $ newSearch (sch mbstvar var) maxBound [1..4] + atomically $ writeTVar mbstvar (Just s) + t <- forkIO $ do searchLoop (sch mbstvar var) maxBound onResult s atomically $ writeTVar fin True putStrLn "Waiting on counter." @@ -54,13 +83,17 @@ main = do then retry else return (done,n) putStrLn $ "(done,n) = " ++ show (done,n) - atomically $ searchCancel s + report <- atomically $ do + report <- showSearchState s + searchCancel s + return report + putStrLn report putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this." forkIO $ let loop = do - x <- atomically $ readTVar var - putStrLn $ "query after cancel! " ++ show x + (x,report) <- atomically $ (,) <$> readTVar var <*> showSearchState s + putStrLn $ "query after cancel! " ++ show x ++ " " ++ report atomically $ readTVar var >>= \y -> if x == y then retry else return () loop in loop -- cgit v1.2.3