{-# LANGUAGE CPP #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} module Network.Kademlia.Search ( Search(..) , SearchState(..) , searchCancel , searchIsFinished , search , newSearch , reset , searchLoop ) where import Control.Concurrent.Tasks import Control.Concurrent.STM import Control.Monad import Data.Function import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Maybe import qualified Data.Set as Set ;import Data.Set (Set) import Data.Hashable (Hashable(..)) -- for type sigs import System.IO.Error import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) import Network.Kademlia.Routing as R import Network.QueryResponse (Result(..)) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif data Search nid addr tok ni r qk = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk , searchQueryCancel :: qk -> STM () , searchAlpha :: Int -- α = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we -- need to ensure that closer non-responding queries don't completely push out -- farther away queries. -- -- For BitTorrent, setting them both 8 was not an issue, but that is no longer -- supported because now the number of remembered informants is now the -- difference between these two numbers. So, if searchK = 16 and searchAlpha = -- 4, then the number of remembered query responses is 12. , searchK :: Int -- K = 16 } data SearchState nid addr tok ni r qk = SearchState { -- | The number of pending queries. Incremented before any query is sent -- and decremented when we get a reply. searchPendingCount :: TVar Int , searchPending :: TVar (Map qk (STM ())) -- | Nodes scheduled to be queried (roughly at most K). -- -- This will be set to Nothing when a search is canceled. , searchQueued :: TVar (Maybe (MinMaxPSQ ni nid)) -- | The nearest (K - α) nodes that issued a reply. -- -- α is the maximum number of simultaneous queries. , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) -- | This tracks already-queried addresses so we avoid bothering them -- again. XXX: We could probably keep only the pending queries in this -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha -- should limit the number of outstanding queries. , searchVisited :: TVar (Set addr) , searchSpec :: Search nid addr tok ni r qk } newSearch :: ( Ord addr , PSQKey nid , PSQKey ni ) => {- KademliaSpace nid ni -> (ni -> addr) -> (ni -> IO ([ni], [r])) -- the query action. -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} Search nid addr tok ni r qk -> nid -> [ni] -- Initial nodes to query. -> STM (SearchState nid addr tok ni r qk) newSearch s@(Search space nAddr qry _ _ _) target ns = do c <- newTVar 0 p <- newTVar Map.empty q <- newTVar $ Just $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty v <- newTVar Set.empty return -- (Search space nAddr qry) , r , target ( SearchState c p q i v s ) -- | Discard a value from a key-priority-value tuple. This is useful for -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". stripValue :: Binding' k p v -> Binding k p stripValue (Binding ni _ nid) = (ni :-> nid) -- | Reset a 'SearchState' object to ready it for a repeated search. reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => (nid -> STM [ni]) -> Search nid addr1 tok1 ni r1 qk -> nid -> SearchState nid addr tok ni r qk -> STM (SearchState nid addr tok ni r qk) reset nearestNodes qsearch target st = do pc <- readTVar (searchPendingCount st) check (pc == 0) searchIsFinished st >>= check -- Wait for a search to finish before resetting. bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) <$> nearestNodes target priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes writeTVar (searchInformant st) MM.empty writeTVar (searchVisited st) Set.empty writeTVar (searchPendingCount st) 0 return st grokQuery :: forall addr nid tok ni r qk. ( Ord addr , PSQKey nid , PSQKey ni , Show nid , Ord qk ) => Search nid addr tok ni r qk -> nid -> (r -> STM Bool) -- ^ return False to terminate the search. -> SearchState nid addr tok ni r qk -> Binding ni nid -> qk -> Result ([ni],[r],Maybe tok) -> IO () grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) -- (ns,rs) -- let tok = error "TODO: token" atomically $ do modifyTVar' searchPendingCount pred modifyTVar' searchPending $ Map.delete qk case reply of Success x -> go x _ -> return () where go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let xor = kademliaXor searchSpace loc = kademliaLocation searchSpace insertFoundNode :: Int -> ni -> MinMaxPSQ ni nid -> MinMaxPSQ ni nid insertFoundNode k n q | searchNodeAddress n `Set.member` vs = q | otherwise = MM.insertTake k n ( xor searchTarget $ loc n ) q qsize0 <- maybe 0 MM.size <$> readTVar searchQueued let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow -- only when there's fewer than -- K elements. modifyTVar searchQueued $ fmap $ \q -> foldr (insertFoundNode qsize) q ns modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d flip fix rs $ \loop -> \case r:rs' -> do wanting <- withSearchResult r if wanting then loop rs' else searchCancel sch [] -> return () searchIsFinished :: ( PSQKey nid , PSQKey ni ) => SearchState nid addr tok ni r qk -> STM Bool searchIsFinished SearchState{..} = do readTVar searchQueued >>= \case Just q -> do cnt <- readTVar searchPendingCount informants <- readTVar searchInformant return $ cnt == 0 && ( MM.null q || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) Nothing -> return True searchCancel :: SearchState nid addr tok ni r qk -> STM () searchCancel SearchState{..} = do writeTVar searchQueued Nothing m <- readTVar searchPending foldr (>>) (return ()) m search :: ( Ord r , Ord addr , PSQKey nid , PSQKey ni , Show nid , Ord qk ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId) search sch buckets target result = do let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets st <- atomically $ newSearch sch target ns v <- newTVarIO False t <- forkIO $ atomically (check =<< readTVar v) >> searchLoop sch target result st labelThread t ("search.pending." ++ show target) atomically $ writeTVar v True return (st,t) searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk ) => Search nid addr tok ni r qk -- ^ Query and distance methods. -> nid -- ^ The target we are searching for. -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. -> SearchState nid addr tok ni r qk -- ^ Search-related state. -> IO () searchLoop sch@Search{..} target result s@SearchState{..} = do myThreadId >>= flip labelThread ("search."++show target) withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. informants <- readTVar searchInformant found <- fmap MM.minView <$> readTVar searchQueued case found of Just (Just (ni :-> d, q)) | -- If there's fewer than /k - α/ informants and there's any -- node we haven't yet got a response from. (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) -- Or there's no informants yet at all. || MM.null informants -- Or if the closest scheduled node is nearer than the -- nearest /k/ informants. || (d < PSQ.prio (fromJust $ MM.findMax informants)) -> -- Then the search continues, send a query. do writeTVar searchQueued $ Just q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do qk <- searchQuery target ni $ \qk reply -> grokQuery sch target result s (ni :-> d) qk reply atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) again _ -> searchIsFinished s >>= check >> return (return ())