diff options
Diffstat (limited to 'kad')
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 35 | ||||
-rw-r--r-- | kad/tests/searchCancel.hs | 2 |
2 files changed, 20 insertions, 17 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index e30c40da..19d0df69 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -44,7 +44,7 @@ data Search nid addr tok ni r qk = Search | |||
44 | { searchSpace :: KademliaSpace nid ni | 44 | { searchSpace :: KademliaSpace nid ni |
45 | , searchNodeAddress :: ni -> addr | 45 | , searchNodeAddress :: ni -> addr |
46 | , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk | 46 | , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk |
47 | , searchQueryCancel :: qk -> STM () | 47 | , searchQueryCancel :: (IO () -> STM ()) -> qk -> STM () |
48 | , searchAlpha :: Int -- α = 8 | 48 | , searchAlpha :: Int -- α = 8 |
49 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | 49 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on |
50 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | 50 | -- how fast the queries are. For Tox's much slower onion-routed queries, we |
@@ -118,19 +118,21 @@ reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | |||
118 | -> Search nid addr1 tok1 ni r1 qk | 118 | -> Search nid addr1 tok1 ni r1 qk |
119 | -> nid | 119 | -> nid |
120 | -> SearchState nid addr tok ni r qk | 120 | -> SearchState nid addr tok ni r qk |
121 | -> STM (SearchState nid addr tok ni r qk) | 121 | -> IO (SearchState nid addr tok ni r qk) |
122 | reset nearestNodes qsearch target st = do | 122 | reset nearestNodes qsearch target st = do |
123 | pc <- readTVar (searchPendingCount st) | 123 | atomically $ searchCancel st |
124 | check (pc == 0) | 124 | atomically $ do |
125 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | 125 | pc <- readTVar (searchPendingCount st) |
126 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 126 | check (pc == 0) |
127 | <$> nearestNodes target | 127 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. |
128 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | 128 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
129 | writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes | 129 | <$> nearestNodes target |
130 | writeTVar (searchInformant st) MM.empty | 130 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) |
131 | writeTVar (searchVisited st) Set.empty | 131 | writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes |
132 | writeTVar (searchPendingCount st) 0 | 132 | writeTVar (searchInformant st) MM.empty |
133 | return st | 133 | writeTVar (searchVisited st) Set.empty |
134 | writeTVar (searchPendingCount st) 0 | ||
135 | return st | ||
134 | 136 | ||
135 | grokQuery :: forall addr nid tok ni r qk. | 137 | grokQuery :: forall addr nid tok ni r qk. |
136 | ( Ord addr | 138 | ( Ord addr |
@@ -233,8 +235,9 @@ searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, | |||
233 | -> IO () | 235 | -> IO () |
234 | searchLoop sch@Search{..} target result s@SearchState{..} = do | 236 | searchLoop sch@Search{..} target result s@SearchState{..} = do |
235 | myThreadId >>= flip labelThread ("search."++show target) | 237 | myThreadId >>= flip labelThread ("search."++show target) |
236 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | 238 | iochan <- atomically newTChan |
237 | join $ atomically $ do | 239 | fix $ \again -> do |
240 | join $ atomically $ orElse (fmap (>> again) $ readTChan iochan) $ do | ||
238 | cnt <- readTVar $ searchPendingCount | 241 | cnt <- readTVar $ searchPendingCount |
239 | check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. | 242 | check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. |
240 | informants <- readTVar searchInformant | 243 | informants <- readTVar searchInformant |
@@ -256,6 +259,6 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
256 | return $ do | 259 | return $ do |
257 | qk <- searchQuery target ni $ | 260 | qk <- searchQuery target ni $ |
258 | \qk reply -> grokQuery sch target result s (ni :-> d) qk reply | 261 | \qk reply -> grokQuery sch target result s (ni :-> d) qk reply |
259 | atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) | 262 | atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel (writeTChan iochan) qk) |
260 | again | 263 | again |
261 | _ -> searchIsFinished s >>= check >> return (return ()) | 264 | _ -> searchIsFinished s >>= check >> return (return ()) |
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs index 33860a2f..e8aa33c7 100644 --- a/kad/tests/searchCancel.hs +++ b/kad/tests/searchCancel.hs | |||
@@ -52,7 +52,7 @@ sch mbv var = Search | |||
52 | let qk = maybe 0 (\(ns,_,_) -> head ns) r | 52 | let qk = maybe 0 (\(ns,_,_) -> head ns) r |
53 | f qk $ maybe TimedOut Success r | 53 | f qk $ maybe TimedOut Success r |
54 | return qk | 54 | return qk |
55 | , searchQueryCancel = \_ -> return () | 55 | , searchQueryCancel = \_ _ -> return () |
56 | , searchAlpha = 4 | 56 | , searchAlpha = 4 |
57 | , searchK = 8 | 57 | , searchK = 8 |
58 | } | 58 | } |