summaryrefslogtreecommitdiff
path: root/kad
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 21:27:50 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commitc7fb8cfe16f821e4e148d1855a18cb81255743bc (patch)
treec035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3 /kad
parent5ea2de4e858cc89282561922bae257b6f9041d2e (diff)
Async search.
Diffstat (limited to 'kad')
-rw-r--r--kad/src/Network/Kademlia/Search.hs35
-rw-r--r--kad/tests/searchCancel.hs2
2 files changed, 20 insertions, 17 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index e30c40da..19d0df69 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -44,7 +44,7 @@ data Search nid addr tok ni r qk = Search
44 { searchSpace :: KademliaSpace nid ni 44 { searchSpace :: KademliaSpace nid ni
45 , searchNodeAddress :: ni -> addr 45 , searchNodeAddress :: ni -> addr
46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk 46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk
47 , searchQueryCancel :: qk -> STM () 47 , searchQueryCancel :: (IO () -> STM ()) -> qk -> STM ()
48 , searchAlpha :: Int -- α = 8 48 , searchAlpha :: Int -- α = 8
49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on 49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
50 -- how fast the queries are. For Tox's much slower onion-routed queries, we 50 -- how fast the queries are. For Tox's much slower onion-routed queries, we
@@ -118,19 +118,21 @@ reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
118 -> Search nid addr1 tok1 ni r1 qk 118 -> Search nid addr1 tok1 ni r1 qk
119 -> nid 119 -> nid
120 -> SearchState nid addr tok ni r qk 120 -> SearchState nid addr tok ni r qk
121 -> STM (SearchState nid addr tok ni r qk) 121 -> IO (SearchState nid addr tok ni r qk)
122reset nearestNodes qsearch target st = do 122reset nearestNodes qsearch target st = do
123 pc <- readTVar (searchPendingCount st) 123 atomically $ searchCancel st
124 check (pc == 0) 124 atomically $ do
125 searchIsFinished st >>= check -- Wait for a search to finish before resetting. 125 pc <- readTVar (searchPendingCount st)
126 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 126 check (pc == 0)
127 <$> nearestNodes target 127 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
128 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) 128 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
129 writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes 129 <$> nearestNodes target
130 writeTVar (searchInformant st) MM.empty 130 priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st)
131 writeTVar (searchVisited st) Set.empty 131 writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes
132 writeTVar (searchPendingCount st) 0 132 writeTVar (searchInformant st) MM.empty
133 return st 133 writeTVar (searchVisited st) Set.empty
134 writeTVar (searchPendingCount st) 0
135 return st
134 136
135grokQuery :: forall addr nid tok ni r qk. 137grokQuery :: forall addr nid tok ni r qk.
136 ( Ord addr 138 ( Ord addr
@@ -233,8 +235,9 @@ searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni,
233 -> IO () 235 -> IO ()
234searchLoop sch@Search{..} target result s@SearchState{..} = do 236searchLoop sch@Search{..} target result s@SearchState{..} = do
235 myThreadId >>= flip labelThread ("search."++show target) 237 myThreadId >>= flip labelThread ("search."++show target)
236 withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do 238 iochan <- atomically newTChan
237 join $ atomically $ do 239 fix $ \again -> do
240 join $ atomically $ orElse (fmap (>> again) $ readTChan iochan) $ do
238 cnt <- readTVar $ searchPendingCount 241 cnt <- readTVar $ searchPendingCount
239 check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. 242 check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time.
240 informants <- readTVar searchInformant 243 informants <- readTVar searchInformant
@@ -256,6 +259,6 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
256 return $ do 259 return $ do
257 qk <- searchQuery target ni $ 260 qk <- searchQuery target ni $
258 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply 261 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply
259 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) 262 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel (writeTChan iochan) qk)
260 again 263 again
261 _ -> searchIsFinished s >>= check >> return (return ()) 264 _ -> searchIsFinished s >>= check >> return (return ())
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs
index 33860a2f..e8aa33c7 100644
--- a/kad/tests/searchCancel.hs
+++ b/kad/tests/searchCancel.hs
@@ -52,7 +52,7 @@ sch mbv var = Search
52 let qk = maybe 0 (\(ns,_,_) -> head ns) r 52 let qk = maybe 0 (\(ns,_,_) -> head ns) r
53 f qk $ maybe TimedOut Success r 53 f qk $ maybe TimedOut Success r
54 return qk 54 return qk
55 , searchQueryCancel = \_ -> return () 55 , searchQueryCancel = \_ _ -> return ()
56 , searchAlpha = 4 56 , searchAlpha = 4
57 , searchK = 8 57 , searchK = 8
58 } 58 }