From 6ebe91b686ca8bef893f9a3dd704e45c04124b8f Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Wed, 16 Jan 2019 22:53:41 -0500 Subject: kademlia: support for async search queries. --- src/Network/Kademlia/Search.hs | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) (limited to 'src/Network/Kademlia') diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs index e87a8618..1be1afc1 100644 --- a/src/Network/Kademlia/Search.hs +++ b/src/Network/Kademlia/Search.hs @@ -31,7 +31,8 @@ import GHC.Conc (labelThread) data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)) + , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) + (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) , searchAlpha :: Int -- α = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we @@ -112,7 +113,7 @@ reset nearestNodes qsearch target st = do writeTVar (searchPendingCount st) 0 return st -sendQuery :: forall addr nid tok ni r. +sendAsyncQuery :: forall addr nid tok ni r. ( Ord addr , PSQKey nid , PSQKey ni @@ -123,15 +124,22 @@ sendQuery :: forall addr nid tok ni r. -> (r -> STM Bool) -- ^ return False to terminate the search. -> SearchState nid addr tok ni r -> Binding ni nid + -> TaskGroup -> IO () -sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do - myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) - -- (ns,rs) - let tok = error "TODO: token" - atomically $ do - modifyTVar searchPendingCount pred - maybe (return ()) go reply +sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = + case searchQuery of + Left blockingQuery -> + forkTask g "searchQuery" $ do + myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) + reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) + atomically $ do + modifyTVar searchPendingCount pred + maybe (return ()) go reply + Right nonblockingQuery -> do + nonblockingQuery searchTarget ni $ \reply -> + atomically $ do + modifyTVar searchPendingCount pred + maybe (return ()) go reply where go (ns,rs,tok) = do vs <- readTVar searchVisited @@ -221,9 +229,7 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do - forkTask g - "searchQuery" - $ sendQuery sch target result s (ni :-> d) + sendAsyncQuery sch target result s (ni :-> d) g again _ -> -- Otherwise, we are finished. do check (cnt == 0) -- cgit v1.2.3