summaryrefslogtreecommitdiff
path: root/kad
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-28 20:08:03 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-01 23:28:00 -0500
commit36b07bb1396244b8b4ed8ad5b0c81351195d8428 (patch)
tree4e5d8fe0322c2b63cbf90ad5b7956370e1dec991 /kad
parentcad7670b1f62ea03627e8cff009f598bb76ca067 (diff)
Fix searchCancel to actually stop the search loop.
Diffstat (limited to 'kad')
-rw-r--r--kad/kad.cabal2
-rw-r--r--kad/src/Network/Kademlia/Search.hs54
-rw-r--r--kad/tests/searchCancel.hs61
3 files changed, 76 insertions, 41 deletions
diff --git a/kad/kad.cabal b/kad/kad.cabal
index ee3754b1..4a86bc4f 100644
--- a/kad/kad.cabal
+++ b/kad/kad.cabal
@@ -91,6 +91,6 @@ library
91 91
92executable testSearch 92executable testSearch
93 hs-source-dirs: tests 93 hs-source-dirs: tests
94 build-depends: kad, base, stm 94 build-depends: kad, base, stm, containers, minmax-psq
95 main-is: searchCancel.hs 95 main-is: searchCancel.hs
96 96
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index 856a7cfc..03c18d0e 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -59,7 +59,9 @@ data SearchState nid addr tok ni r = SearchState
59 -- and decremented when we get a reply. 59 -- and decremented when we get a reply.
60 searchPendingCount :: TVar Int 60 searchPendingCount :: TVar Int
61 -- | Nodes scheduled to be queried (roughly at most K). 61 -- | Nodes scheduled to be queried (roughly at most K).
62 , searchQueued :: TVar (MinMaxPSQ ni nid) 62 --
63 -- This will be set to Nothing when a search is canceled.
64 , searchQueued :: TVar (Maybe (MinMaxPSQ ni nid))
63 -- | The nearest (K - α) nodes that issued a reply. 65 -- | The nearest (K - α) nodes that issued a reply.
64 -- 66 --
65 -- α is the maximum number of simultaneous queries. 67 -- α is the maximum number of simultaneous queries.
@@ -90,7 +92,8 @@ newSearch :: ( Ord addr
90 -> STM (SearchState nid addr tok ni r) 92 -> STM (SearchState nid addr tok ni r)
91newSearch s@(Search space nAddr qry _ _) target ns = do 93newSearch s@(Search space nAddr qry _ _) target ns = do
92 c <- newTVar 0 94 c <- newTVar 0
93 q <- newTVar $ MM.fromList 95 q <- newTVar $ Just
96 $ MM.fromList
94 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 97 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
95 $ ns 98 $ ns
96 i <- newTVar MM.empty 99 i <- newTVar MM.empty
@@ -115,7 +118,7 @@ reset nearestNodes qsearch target st = do
115 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 118 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
116 <$> nearestNodes target 119 <$> nearestNodes target
117 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) 120 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
118 writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes 121 writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes
119 writeTVar (searchInformant st) MM.empty 122 writeTVar (searchInformant st) MM.empty
120 writeTVar (searchVisited st) Set.empty 123 writeTVar (searchVisited st) Set.empty
121 writeTVar (searchPendingCount st) 0 124 writeTVar (searchPendingCount st) 0
@@ -145,22 +148,22 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
145 go (ns,rs,tok) = do 148 go (ns,rs,tok) = do
146 vs <- readTVar searchVisited 149 vs <- readTVar searchVisited
147 -- We only queue a node if it is not yet visited 150 -- We only queue a node if it is not yet visited
148 let insertFoundNode :: Int 151 let xor = kademliaXor searchSpace
152 loc = kademliaLocation searchSpace
153 insertFoundNode :: Int
149 -> ni 154 -> ni
150 -> MinMaxPSQ ni nid 155 -> MinMaxPSQ ni nid
151 -> MinMaxPSQ ni nid 156 -> MinMaxPSQ ni nid
152 insertFoundNode k n q 157 insertFoundNode k n q
153 | searchNodeAddress n `Set.member` vs 158 | searchNodeAddress n `Set.member` vs
154 = q 159 = q
155 | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget 160 | otherwise = MM.insertTake k n ( xor searchTarget $ loc n ) q
156 $ kademliaLocation searchSpace n )
157 q
158 161
159 qsize0 <- MM.size <$> readTVar searchQueued 162 qsize0 <- maybe 0 MM.size <$> readTVar searchQueued
160 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow 163 let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow
161 -- only when there's fewer than 164 -- only when there's fewer than
162 -- K elements. 165 -- K elements.
163 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns 166 modifyTVar searchQueued $ fmap $ \q -> foldr (insertFoundNode qsize) q ns
164 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d 167 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
165 flip fix rs $ \loop -> \case 168 flip fix rs $ \loop -> \case
166 r:rs' -> do 169 r:rs' -> do
@@ -174,19 +177,20 @@ searchIsFinished :: ( PSQKey nid
174 , PSQKey ni 177 , PSQKey ni
175 ) => SearchState nid addr tok ni r -> STM Bool 178 ) => SearchState nid addr tok ni r -> STM Bool
176searchIsFinished SearchState{..} = do 179searchIsFinished SearchState{..} = do
177 q <- readTVar searchQueued 180 readTVar searchQueued >>= \case
178 cnt <- readTVar searchPendingCount 181 Just q -> do
179 informants <- readTVar searchInformant 182 cnt <- readTVar searchPendingCount
180 return $ cnt == 0 183 informants <- readTVar searchInformant
181 && ( MM.null q 184 return $ cnt == 0
182 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) 185 && ( MM.null q
183 && ( PSQ.prio (fromJust $ MM.findMax informants) 186 || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec)
184 <= PSQ.prio (fromJust $ MM.findMin q)))) 187 && ( PSQ.prio (fromJust $ MM.findMax informants)
188 <= PSQ.prio (fromJust $ MM.findMin q))))
189 Nothing -> return True
185 190
186searchCancel :: SearchState nid addr tok ni r -> STM () 191searchCancel :: SearchState nid addr tok ni r -> STM ()
187searchCancel SearchState{..} = do 192searchCancel SearchState{..} = do
188 writeTVar searchPendingCount 0 193 writeTVar searchQueued Nothing
189 writeTVar searchQueued MM.empty
190 194
191search :: 195search ::
192 ( Ord r 196 ( Ord r
@@ -215,11 +219,11 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
215 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do 219 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do
216 join $ atomically $ do 220 join $ atomically $ do
217 cnt <- readTVar $ searchPendingCount 221 cnt <- readTVar $ searchPendingCount
218 check (cnt <= 8) -- Only 8 pending queries at a time. 222 check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time.
219 informants <- readTVar searchInformant 223 informants <- readTVar searchInformant
220 found <- MM.minView <$> readTVar searchQueued 224 found <- fmap MM.minView <$> readTVar searchQueued
221 case found of 225 case found of
222 Just (ni :-> d, q) 226 Just (Just (ni :-> d, q))
223 | -- If there's fewer than /k - α/ informants and there's any 227 | -- If there's fewer than /k - α/ informants and there's any
224 -- node we haven't yet got a response from. 228 -- node we haven't yet got a response from.
225 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) 229 (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q))
@@ -229,7 +233,7 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
229 -- nearest /k/ informants. 233 -- nearest /k/ informants.
230 || (d < PSQ.prio (fromJust $ MM.findMax informants)) 234 || (d < PSQ.prio (fromJust $ MM.findMax informants))
231 -> -- Then the search continues, send a query. 235 -> -- Then the search continues, send a query.
232 do writeTVar searchQueued q 236 do writeTVar searchQueued $ Just q
233 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 237 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
234 modifyTVar searchPendingCount succ 238 modifyTVar searchPendingCount succ
235 return $ do 239 return $ do
@@ -237,6 +241,4 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
237 "searchQuery" 241 "searchQuery"
238 $ sendQuery sch target result s (ni :-> d) 242 $ sendQuery sch target result s (ni :-> d)
239 again 243 again
240 _ -> -- Otherwise, we are finished. 244 _ -> searchIsFinished s >>= check >> return (return ())
241 do check (cnt == 0)
242 return $ return ()
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs
index 3458ab37..85986674 100644
--- a/kad/tests/searchCancel.hs
+++ b/kad/tests/searchCancel.hs
@@ -10,14 +10,29 @@ import Network.Kademlia.Persistence
10import Network.Kademlia.Routing 10import Network.Kademlia.Routing
11import Network.Kademlia.Search 11import Network.Kademlia.Search
12 12
13makeSchResults :: TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) 13import qualified Data.MinMaxPSQ as MM
14makeSchResults var = do 14import qualified Data.Set as Set
15 threadDelay 100000 15
16 atomically $ do 16makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> IO (Maybe ([Int],[Int],Maybe ()))
17makeSchResults mbv var = do
18 threadDelay 200000
19 (r,io) <- atomically $ do
17 n <- readTVar var 20 n <- readTVar var
18 let ns = take 4 [n .. ] 21 let ns = take 4 [n .. ]
19 writeTVar var $! n + 4 22 writeTVar var $! n + 4
20 return $ Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok) 23 let r = Just (ns, ns, Just ()) -- Maybe ([ni], [r], Maybe tok)
24 stmio = if n > 490
25 then do
26 ms <- readTVar mbv
27 case ms of
28 Just s -> do report <- showSearchState s
29 return $ putStrLn $ "cnt=" ++ show n ++ " " ++ report
30 _ -> return $ return ()
31 else return $ return ()
32 io <- stmio
33 return (r,io)
34 io
35 return r
21 36
22kad :: KademliaSpace Int Int 37kad :: KademliaSpace Int Int
23kad = KademliaSpace 38kad = KademliaSpace
@@ -27,11 +42,11 @@ kad = KademliaSpace
27 , kademliaSample = \_ x _ -> pure x 42 , kademliaSample = \_ x _ -> pure x
28 } 43 }
29 44
30sch :: TVar Int -> Search Int Int () Int Int 45sch :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> Search Int Int () Int Int
31sch var = Search 46sch mbv var = Search
32 { searchSpace = kad 47 { searchSpace = kad
33 , searchNodeAddress = id 48 , searchNodeAddress = id
34 , searchQuery = \_ _ -> makeSchResults var 49 , searchQuery = \_ _ -> makeSchResults mbv var
35 , searchAlpha = 4 50 , searchAlpha = 4
36 , searchK = 8 51 , searchK = 8
37 } 52 }
@@ -39,11 +54,25 @@ sch var = Search
39onResult :: Int -> STM Bool 54onResult :: Int -> STM Bool
40onResult k = return (k < 50000) 55onResult k = return (k < 50000)
41 56
57showSearchState st = do
58 pc <- readTVar (searchPendingCount st)
59 q <- readTVar (searchQueued st)
60 inf <- readTVar (searchInformant st)
61 vset <- readTVar (searchVisited st)
62 return $ unwords
63 [ "pending=" ++ show pc
64 , "|q|=" ++ show (maybe 0 MM.size q)
65 , "|inf|=" ++ show (MM.size inf)
66 , "|visited|=" ++ show (Set.size vset)
67 ]
68
42main = do 69main = do
43 var <- newTVarIO 0 70 var <- newTVarIO 5
44 fin <- newTVarIO False 71 fin <- newTVarIO False
45 s <- atomically $ newSearch (sch var) maxBound [1..4] 72 mbstvar <- newTVarIO Nothing
46 t <- forkIO $ do searchLoop (sch var) maxBound onResult s 73 s <- atomically $ newSearch (sch mbstvar var) maxBound [1..4]
74 atomically $ writeTVar mbstvar (Just s)
75 t <- forkIO $ do searchLoop (sch mbstvar var) maxBound onResult s
47 atomically $ writeTVar fin True 76 atomically $ writeTVar fin True
48 77
49 putStrLn "Waiting on counter." 78 putStrLn "Waiting on counter."
@@ -54,13 +83,17 @@ main = do
54 then retry 83 then retry
55 else return (done,n) 84 else return (done,n)
56 putStrLn $ "(done,n) = " ++ show (done,n) 85 putStrLn $ "(done,n) = " ++ show (done,n)
57 atomically $ searchCancel s 86 report <- atomically $ do
87 report <- showSearchState s
88 searchCancel s
89 return report
90 putStrLn report
58 putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this." 91 putStrLn "Canceled. Awaiting fin. The program should quit shortly without much output after this."
59 92
60 forkIO $ 93 forkIO $
61 let loop = do 94 let loop = do
62 x <- atomically $ readTVar var 95 (x,report) <- atomically $ (,) <$> readTVar var <*> showSearchState s
63 putStrLn $ "query after cancel! " ++ show x 96 putStrLn $ "query after cancel! " ++ show x ++ " " ++ report
64 atomically $ readTVar var >>= \y -> if x == y then retry else return () 97 atomically $ readTVar var >>= \y -> if x == y then retry else return ()
65 loop 98 loop
66 in loop 99 in loop