diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-28 20:08:03 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:28:00 -0500 |
commit | 36b07bb1396244b8b4ed8ad5b0c81351195d8428 (patch) | |
tree | 4e5d8fe0322c2b63cbf90ad5b7956370e1dec991 /kad/src | |
parent | cad7670b1f62ea03627e8cff009f598bb76ca067 (diff) |
Fix searchCancel to actually stop the search loop.
Diffstat (limited to 'kad/src')
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 54 |
1 files changed, 28 insertions, 26 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 856a7cfc..03c18d0e 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -59,7 +59,9 @@ data SearchState nid addr tok ni r = SearchState | |||
59 | -- and decremented when we get a reply. | 59 | -- and decremented when we get a reply. |
60 | searchPendingCount :: TVar Int | 60 | searchPendingCount :: TVar Int |
61 | -- | Nodes scheduled to be queried (roughly at most K). | 61 | -- | Nodes scheduled to be queried (roughly at most K). |
62 | , searchQueued :: TVar (MinMaxPSQ ni nid) | 62 | -- |
63 | -- This will be set to Nothing when a search is canceled. | ||
64 | , searchQueued :: TVar (Maybe (MinMaxPSQ ni nid)) | ||
63 | -- | The nearest (K - α) nodes that issued a reply. | 65 | -- | The nearest (K - α) nodes that issued a reply. |
64 | -- | 66 | -- |
65 | -- α is the maximum number of simultaneous queries. | 67 | -- α is the maximum number of simultaneous queries. |
@@ -90,7 +92,8 @@ newSearch :: ( Ord addr | |||
90 | -> STM (SearchState nid addr tok ni r) | 92 | -> STM (SearchState nid addr tok ni r) |
91 | newSearch s@(Search space nAddr qry _ _) target ns = do | 93 | newSearch s@(Search space nAddr qry _ _) target ns = do |
92 | c <- newTVar 0 | 94 | c <- newTVar 0 |
93 | q <- newTVar $ MM.fromList | 95 | q <- newTVar $ Just |
96 | $ MM.fromList | ||
94 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | 97 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) |
95 | $ ns | 98 | $ ns |
96 | i <- newTVar MM.empty | 99 | i <- newTVar MM.empty |
@@ -115,7 +118,7 @@ reset nearestNodes qsearch target st = do | |||
115 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 118 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
116 | <$> nearestNodes target | 119 | <$> nearestNodes target |
117 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | 120 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) |
118 | writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes | 121 | writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes |
119 | writeTVar (searchInformant st) MM.empty | 122 | writeTVar (searchInformant st) MM.empty |
120 | writeTVar (searchVisited st) Set.empty | 123 | writeTVar (searchVisited st) Set.empty |
121 | writeTVar (searchPendingCount st) 0 | 124 | writeTVar (searchPendingCount st) 0 |
@@ -145,22 +148,22 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
145 | go (ns,rs,tok) = do | 148 | go (ns,rs,tok) = do |
146 | vs <- readTVar searchVisited | 149 | vs <- readTVar searchVisited |
147 | -- We only queue a node if it is not yet visited | 150 | -- We only queue a node if it is not yet visited |
148 | let insertFoundNode :: Int | 151 | let xor = kademliaXor searchSpace |
152 | loc = kademliaLocation searchSpace | ||
153 | insertFoundNode :: Int | ||
149 | -> ni | 154 | -> ni |
150 | -> MinMaxPSQ ni nid | 155 | -> MinMaxPSQ ni nid |
151 | -> MinMaxPSQ ni nid | 156 | -> MinMaxPSQ ni nid |
152 | insertFoundNode k n q | 157 | insertFoundNode k n q |
153 | | searchNodeAddress n `Set.member` vs | 158 | | searchNodeAddress n `Set.member` vs |
154 | = q | 159 | = q |
155 | | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget | 160 | | otherwise = MM.insertTake k n ( xor searchTarget $ loc n ) q |
156 | $ kademliaLocation searchSpace n ) | ||
157 | q | ||
158 | 161 | ||
159 | qsize0 <- MM.size <$> readTVar searchQueued | 162 | qsize0 <- maybe 0 MM.size <$> readTVar searchQueued |
160 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow | 163 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow |
161 | -- only when there's fewer than | 164 | -- only when there's fewer than |
162 | -- K elements. | 165 | -- K elements. |
163 | modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns | 166 | modifyTVar searchQueued $ fmap $ \q -> foldr (insertFoundNode qsize) q ns |
164 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | 167 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d |
165 | flip fix rs $ \loop -> \case | 168 | flip fix rs $ \loop -> \case |
166 | r:rs' -> do | 169 | r:rs' -> do |
@@ -174,19 +177,20 @@ searchIsFinished :: ( PSQKey nid | |||
174 | , PSQKey ni | 177 | , PSQKey ni |
175 | ) => SearchState nid addr tok ni r -> STM Bool | 178 | ) => SearchState nid addr tok ni r -> STM Bool |
176 | searchIsFinished SearchState{..} = do | 179 | searchIsFinished SearchState{..} = do |
177 | q <- readTVar searchQueued | 180 | readTVar searchQueued >>= \case |
178 | cnt <- readTVar searchPendingCount | 181 | Just q -> do |
179 | informants <- readTVar searchInformant | 182 | cnt <- readTVar searchPendingCount |
180 | return $ cnt == 0 | 183 | informants <- readTVar searchInformant |
181 | && ( MM.null q | 184 | return $ cnt == 0 |
182 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) | 185 | && ( MM.null q |
183 | && ( PSQ.prio (fromJust $ MM.findMax informants) | 186 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) |
184 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 187 | && ( PSQ.prio (fromJust $ MM.findMax informants) |
188 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
189 | Nothing -> return True | ||
185 | 190 | ||
186 | searchCancel :: SearchState nid addr tok ni r -> STM () | 191 | searchCancel :: SearchState nid addr tok ni r -> STM () |
187 | searchCancel SearchState{..} = do | 192 | searchCancel SearchState{..} = do |
188 | writeTVar searchPendingCount 0 | 193 | writeTVar searchQueued Nothing |
189 | writeTVar searchQueued MM.empty | ||
190 | 194 | ||
191 | search :: | 195 | search :: |
192 | ( Ord r | 196 | ( Ord r |
@@ -215,11 +219,11 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
215 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | 219 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do |
216 | join $ atomically $ do | 220 | join $ atomically $ do |
217 | cnt <- readTVar $ searchPendingCount | 221 | cnt <- readTVar $ searchPendingCount |
218 | check (cnt <= 8) -- Only 8 pending queries at a time. | 222 | check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. |
219 | informants <- readTVar searchInformant | 223 | informants <- readTVar searchInformant |
220 | found <- MM.minView <$> readTVar searchQueued | 224 | found <- fmap MM.minView <$> readTVar searchQueued |
221 | case found of | 225 | case found of |
222 | Just (ni :-> d, q) | 226 | Just (Just (ni :-> d, q)) |
223 | | -- If there's fewer than /k - α/ informants and there's any | 227 | | -- If there's fewer than /k - α/ informants and there's any |
224 | -- node we haven't yet got a response from. | 228 | -- node we haven't yet got a response from. |
225 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) | 229 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) |
@@ -229,7 +233,7 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
229 | -- nearest /k/ informants. | 233 | -- nearest /k/ informants. |
230 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | 234 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) |
231 | -> -- Then the search continues, send a query. | 235 | -> -- Then the search continues, send a query. |
232 | do writeTVar searchQueued q | 236 | do writeTVar searchQueued $ Just q |
233 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 237 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |
234 | modifyTVar searchPendingCount succ | 238 | modifyTVar searchPendingCount succ |
235 | return $ do | 239 | return $ do |
@@ -237,6 +241,4 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
237 | "searchQuery" | 241 | "searchQuery" |
238 | $ sendQuery sch target result s (ni :-> d) | 242 | $ sendQuery sch target result s (ni :-> d) |
239 | again | 243 | again |
240 | _ -> -- Otherwise, we are finished. | 244 | _ -> searchIsFinished s >>= check >> return (return ()) |
241 | do check (cnt == 0) | ||
242 | return $ return () | ||