From 4060c2c717eeac95dd16f9222184d6b4e998cb7f Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 25 Jul 2017 06:17:46 -0400 Subject: Fixes to IPv4 bootstrap. --- src/Network/BitTorrent/DHT/Search.hs | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) (limited to 'src/Network/BitTorrent') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index a9efba89..8c441cb0 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -6,7 +6,6 @@ {-# LANGUAGE LambdaCase #-} module Network.BitTorrent.DHT.Search where -import Control.Concurrent import Control.Concurrent.Async.Pool import Control.Concurrent.STM import Control.Exception @@ -28,11 +27,17 @@ import qualified Data.Wrapper.PSQ as PSQ import Network.Address hiding (NodeId) import Network.DatagramServer.Types import Network.DHT.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif data Search nid addr ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: ni -> IO ([ni], [r]) + , searchQuery :: nid -> ni -> IO ([ni], [r]) } data SearchState nid addr ni r = SearchState @@ -89,7 +94,7 @@ newSearch (Search space nAddr qry) target ns = atomically $ do ( SearchState c q i v ) searchAlpha :: Int -searchAlpha = 3 +searchAlpha = 8 searchK :: Int searchK = 8 @@ -99,6 +104,7 @@ sendQuery :: forall addr nid ni r. , Ord r , PSQKey nid , PSQKey ni + , Show nid ) => Search nid addr ni r -> nid @@ -107,8 +113,9 @@ sendQuery :: forall addr nid ni r. -> Binding ni nid -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do + myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) (ns,rs) <- handle (\(SomeException e) -> return ([],[])) - (searchQuery ni) + (searchQuery searchTarget ni) atomically $ do modifyTVar searchPendingCount pred vs <- readTVar searchVisited @@ -156,11 +163,17 @@ search :: , Ord addr , PSQKey nid , PSQKey ni - ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO () -search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do + , Show nid + ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) +search sch@Search{..} buckets target result = do let ns = R.kclosest searchSpace searchK target buckets - s@SearchState{..} <- newSearch sch target ns - fix $ \again -> do + st <- newSearch sch target ns + fork $ go st + return st + where + go s@SearchState{..} = do + myThreadId >>= flip labelThread ("search."++show target) + withTaskGroup searchAlpha $ \g -> fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount informants <- readTVar searchInformant @@ -170,9 +183,11 @@ search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> | -- If there's fewer than /k/ informants and there's any -- node we haven't yet got a response from. (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) + -- Or there's no informants yet at all. + || MM.null informants -- Or if the closest scheduled node is nearer than the -- nearest /k/ informants. - || (PSQ.prio (fromJust $ MM.findMax informants) > d) + || (d < PSQ.prio (fromJust $ MM.findMax informants)) -> -- Then the search continues, send a query. do writeTVar searchQueued q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) -- cgit v1.2.3