diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-28 20:08:03 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:28:00 -0500 |
commit | 36b07bb1396244b8b4ed8ad5b0c81351195d8428 (patch) | |
tree | 4e5d8fe0322c2b63cbf90ad5b7956370e1dec991 | |
parent | cad7670b1f62ea03627e8cff009f598bb76ca067 (diff) |
Fix searchCancel to actually stop the search loop.
-rw-r--r-- | kad/kad.cabal | 2 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 54 | ||||
-rw-r--r-- | kad/tests/searchCancel.hs | 61 |
3 files changed, 76 insertions, 41 deletions
diff --git a/kad/kad.cabal b/kad/kad.cabal index ee3754b1..4a86bc4f 100644 --- a/kad/kad.cabal +++ b/kad/kad.cabal | |||
@@ -91,6 +91,6 @@ library | |||
91 | 91 | ||
92 | executable testSearch | 92 | executable testSearch |
93 | hs-source-dirs: tests | 93 | hs-source-dirs: tests |
94 | build-depends: kad, base, stm | 94 | build-depends: kad, base, stm, containers, minmax-psq |
95 | main-is: searchCancel.hs | 95 | main-is: searchCancel.hs |
96 | 96 | ||
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 856a7cfc..03c18d0e 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -59,7 +59,9 @@ data SearchState nid addr tok ni r = SearchState | |||
59 | -- and decremented when we get a reply. | 59 | -- and decremented when we get a reply. |
60 | searchPendingCount :: TVar Int | 60 | searchPendingCount :: TVar Int |
61 | -- | Nodes scheduled to be queried (roughly at most K). | 61 | -- | Nodes scheduled to be queried (roughly at most K). |
62 | , searchQueued :: TVar (MinMaxPSQ ni nid) | 62 | -- |
63 | -- This will be set to Nothing when a search is canceled. | ||
64 | , searchQueued :: TVar (Maybe (MinMaxPSQ ni nid)) | ||
63 | -- | The nearest (K - α) nodes that issued a reply. | 65 | -- | The nearest (K - α) nodes that issued a reply. |
64 | -- | 66 | -- |
65 | -- α is the maximum number of simultaneous queries. | 67 | -- α is the maximum number of simultaneous queries. |
@@ -90,7 +92,8 @@ newSearch :: ( Ord addr | |||
90 | -> STM (SearchState nid addr tok ni r) | 92 | -> STM (SearchState nid addr tok ni r) |
91 | newSearch s@(Search space nAddr qry _ _) target ns = do | 93 | newSearch s@(Search space nAddr qry _ _) target ns = do |
92 | c <- newTVar 0 | 94 | c <- newTVar 0 |
93 | q <- newTVar $ MM.fromList | 95 | q <- newTVar $ Just |
96 | $ MM.fromList | ||
94 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | 97 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) |
95 | $ ns | 98 | $ ns |
96 | i <- newTVar MM.empty | 99 | i <- newTVar MM.empty |
@@ -115,7 +118,7 @@ reset nearestNodes qsearch target st = do | |||
115 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 118 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
116 | <$> nearestNodes target | 119 | <$> nearestNodes target |
117 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | 120 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) |
118 | writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes | 121 | writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes |
119 | writeTVar (searchInformant st) MM.empty | 122 | writeTVar (searchInformant st) MM.empty |
120 | writeTVar (searchVisited st) Set.empty | 123 | writeTVar (searchVisited st) Set.empty |
121 | writeTVar (searchPendingCount st) 0 | 124 | writeTVar (searchPendingCount st) 0 |
@@ -145,22 +148,22 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
145 | go (ns,rs,tok) = do | 148 | go (ns,rs,tok) = do |
146 | vs <- readTVar searchVisited | 149 | vs <- readTVar searchVisited |
147 | -- We only queue a node if it is not yet visited | 150 | -- We only queue a node if it is not yet visited |
148 | let insertFoundNode :: Int | 151 | let xor = kademliaXor searchSpace |
152 | loc = kademliaLocation searchSpace | ||
153 | insertFoundNode :: Int | ||
149 | -> ni | 154 | -> ni |
150 | -> MinMaxPSQ ni nid | 155 | -> MinMaxPSQ ni nid |
151 | -> MinMaxPSQ ni nid | 156 | -> MinMaxPSQ ni nid |
152 | insertFoundNode k n q | 157 | insertFoundNode k n q |
153 | | searchNodeAddress n `Set.member` vs | 158 | | searchNodeAddress n `Set.member` vs |
154 | = q | 159 | = q |
155 | | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget | 160 | | otherwise = MM.insertTake k n ( xor searchTarget $ loc n ) q |
156 | $ kademliaLocation searchSpace n ) | ||
157 | q | ||
158 | 161 | ||
159 | qsize0 <- MM.size <$> readTVar searchQueued | 162 | qsize0 <- maybe 0 MM.size <$> readTVar searchQueued |
160 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow | 163 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow |
161 | -- only when there's fewer than | 164 | -- only when there's fewer than |
162 | -- K elements. | 165 | -- K elements. |
163 | modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns | 166 | modifyTVar searchQueued $ fmap $ \q -> foldr (insertFoundNode qsize) q ns |
164 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | 167 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d |
165 | flip fix rs $ \loop -> \case | 168 | flip fix rs $ \loop -> \case |
166 | r:rs' -> do | 169 | r:rs' -> do |
@@ -174,19 +177,20 @@ searchIsFinished :: ( PSQKey nid | |||
174 | , PSQKey ni | 177 | , PSQKey ni |
175 | ) => SearchState nid addr tok ni r -> STM Bool | 178 | ) => SearchState nid addr tok ni r -> STM Bool |
176 | searchIsFinished SearchState{..} = do | 179 | searchIsFinished SearchState{..} = do |
177 | q <- readTVar searchQueued | 180 | readTVar searchQueued >>= \case |
178 | cnt <- readTVar searchPendingCount | 181 | Just q -> do |
179 | informants <- readTVar searchInformant | 182 | cnt <- readTVar searchPendingCount |
180 | return $ cnt == 0 | 183 | informants <- readTVar searchInformant |
181 | && ( MM.null q | 184 | return $ cnt == 0 |
182 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) | 185 | && ( MM.null q |
183 | && ( PSQ.prio (fromJust $ MM.findMax informants) | 186 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) |
184 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 187 | && ( PSQ.prio (fromJust $ MM.findMax informants) |
188 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
189 | Nothing -> return True | ||
185 | 190 | ||
186 | searchCancel :: SearchState nid addr tok ni r -> STM () | 191 | searchCancel :: SearchState nid addr tok ni r -> STM () |
187 | searchCancel SearchState{..} = do | 192 | searchCancel SearchState{..} = do |
188 | writeTVar searchPendingCount 0 | 193 | writeTVar searchQueued Nothing |
189 | writeTVar searchQueued MM.empty | ||
190 | 194 | ||
191 | search :: | 195 | search :: |
192 | ( Ord r | 196 | ( Ord r |
@@ -215,11 +219,11 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
215 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | 219 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do |
216 | join $ atomically $ do | 220 | join $ atomically $ do |
217 | cnt <- readTVar $ searchPendingCount | 221 | cnt <- readTVar $ searchPendingCount |
218 | check (cnt <= 8) -- Only 8 pending queries at a time. | 222 | check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. |
219 | informants <- readTVar searchInformant | 223 | informants <- readTVar searchInformant |
220 | found <- MM.minView <$> readTVar searchQueued | 224 | found <- fmap MM.minView <$> readTVar searchQueued |
221 | case found of | 225 | case found of |
222 | Just (ni :-> d, q) | 226 | Just (Just (ni :-> d, q)) |
223 | | -- If there's fewer than /k - α/ informants and there's any | 227 | | -- If there's fewer than /k - α/ informants and there's any |
224 | -- node we haven't yet got a response from. | 228 | -- node we haven't yet got a response from. |
225 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) | 229 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) |
@@ -229,7 +233,7 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
229 | -- nearest /k/ informants. | 233 | -- nearest /k/ informants. |
230 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | 234 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) |
231 | -> -- Then the search continues, send a query. | 235 | -> -- Then the search continues, send a query. |
232 | do writeTVar searchQueued q | 236 | do writeTVar searchQueued $ Just q |
233 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 237 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |
234 | modifyTVar searchPendingCount succ | 238 | modifyTVar searchPendingCount succ |
235 | return $ do | 239 | return $ do |
@@ -237,6 +241,4 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
237 | "searchQuery" | 241 | "searchQuery" |
238 | $ sendQuery sch target result s (ni :-> d) | 242 | $ sendQuery sch target result s (ni :-> d) |
239 | again | 243 | again |
240 | _ -> -- Otherwise, we are finished. | 244 | _ -> searchIsFinished s >>= check >> return (return ()) |
241 | do check (cnt == 0) | ||
242 | return $ return () | ||
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs index 3458ab37..85986674 100644 --- a/kad/tests/searchCancel.hs +++ b/kad/tests/searchCancel.hs | |||
@@ -10,14 +10,29 @@ import Network.Kademlia.Persistence | |||
10 | import Network.Kademlia.Routing | 10 | import Network.Kademlia.Routing |
11 | import Network.Kademlia.Search | 11 | import Network.Kademlia.Search |
12 | 12 | ||
13 | makeSchResults :: TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) | 13 | import qualified Data.MinMaxPSQ as MM |
14 | makeSchResults var = do | 14 | import qualified Data.Set as Set |
15 | threadDelay 100000 | 15 | |
16 | atomically $ do | 16 | makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) |
17 | makeSchResults mbv var = do | ||
18 | threadDelay 200000 | ||
19 | (r,io) <- atomically $ do | ||
17 | n <- readTVar var | 20 | n <- readTVar var |
18 | let ns = take 4 [n .. ] | 21 | let ns = take 4 [n .. ] |
19 | writeTVar var $! n + 4 | 22 | writeTVar var $! n + 4 |
20 | return $ Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) | 23 | let r = Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) |
24 | stmio = if n > 490 | ||
25 | then do | ||
26 | ms <- readTVar mbv | ||
27 | case ms of | ||
28 | Just s -> do report <- showSearchState s | ||
29 | return $ putStrLn $ "cnt=" ++ show n ++ " " ++ report | ||
30 | _ -> return $ return () | ||
31 | else return $ return () | ||
32 | io <- stmio | ||
33 | return (r,io) | ||
34 | io | ||
35 | return r | ||
21 | 36 | ||
22 | kad :: KademliaSpace Int Int | 37 | kad :: KademliaSpace Int Int |
23 | kad = KademliaSpace | 38 | kad = KademliaSpace |
@@ -27,11 +42,11 @@ kad = KademliaSpace | |||
27 | , kademliaSample = \_ x _ -> pure x | 42 | , kademliaSample = \_ x _ -> pure x |
28 | } | 43 | } |
29 | 44 | ||
30 | sch :: TVar Int -> Search Int Int () Int Int | 45 | sch :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> Search Int Int () Int Int |
31 | sch var = Search | 46 | sch mbv var = Search |
32 | { searchSpace = kad | 47 | { searchSpace = kad |
33 | , searchNodeAddress = id | 48 | , searchNodeAddress = id |
34 | , searchQuery = \_ _ -> makeSchResults var | 49 | , searchQuery = \_ _ -> makeSchResults mbv var |
35 | , searchAlpha = 4 | 50 | , searchAlpha = 4 |
36 | , searchK = 8 | 51 | , searchK = 8 |
37 | } | 52 | } |
@@ -39,11 +54,25 @@ sch var = Search | |||
39 | onResult :: Int -> STM Bool | 54 | onResult :: Int -> STM Bool |
40 | onResult k = return (k < 50000) | 55 | onResult k = return (k < 50000) |
41 | 56 | ||
57 | showSearchState st = do | ||
58 | pc <- readTVar (searchPendingCount st) | ||
59 | q <- readTVar (searchQueued st) | ||
60 | inf <- readTVar (searchInformant st) | ||
61 | vset <- readTVar (searchVisited st) | ||
62 | return $ unwords | ||
63 | [ "pending=" ++ show pc | ||
64 | , "|q|=" ++ show (maybe 0 MM.size q) | ||
65 | , "|inf|=" ++ show (MM.size inf) | ||
66 | , "|visited|=" ++ show (Set.size vset) | ||
67 | ] | ||
68 | |||
42 | main = do | 69 | main = do |
43 | var <- newTVarIO 0 | 70 | var <- newTVarIO 5 |
44 | fin <- newTVarIO False | 71 | fin <- newTVarIO False |
45 | s <- atomically $ newSearch (sch var) maxBound [1..4] | 72 | mbstvar <- newTVarIO Nothing |
46 | t <- forkIO $ do searchLoop (sch var) maxBound onResult s | 73 | s <- atomically $ newSearch (sch mbstvar var) maxBound [1..4] |
74 | atomically $ writeTVar mbstvar (Just s) | ||
75 | t <- forkIO $ do searchLoop (sch mbstvar var) maxBound onResult s | ||
47 | atomically $ writeTVar fin True | 76 | atomically $ writeTVar fin True |
48 | 77 | ||
49 | putStrLn "Waiting on counter." | 78 | putStrLn "Waiting on counter." |
@@ -54,13 +83,17 @@ main = do | |||
54 | then retry | 83 | then retry |
55 | else return (done,n) | 84 | else return (done,n) |
56 | putStrLn $ "(done,n) = " ++ show (done,n) | 85 | putStrLn $ "(done,n) = " ++ show (done,n) |
57 | atomically $ searchCancel s | 86 | report <- atomically $ do |
87 | report <- showSearchState s | ||
88 | searchCancel s | ||
89 | return report | ||
90 | putStrLn report | ||
58 | putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this." | 91 | putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this." |
59 | 92 | ||
60 | forkIO $ | 93 | forkIO $ |
61 | let loop = do | 94 | let loop = do |
62 | x <- atomically $ readTVar var | 95 | (x,report) <- atomically $ (,) <$> readTVar var <*> showSearchState s |
63 | putStrLn $ "query after cancel! " ++ show x | 96 | putStrLn $ "query after cancel! " ++ show x ++ " " ++ report |
64 | atomically $ readTVar var >>= \y -> if x == y then retry else return () | 97 | atomically $ readTVar var >>= \y -> if x == y then retry else return () |
65 | loop | 98 | loop |
66 | in loop | 99 | in loop |