{-# LANGUAGE CPP #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} module Network.Kademlia.Search where import Control.Concurrent.Tasks import Control.Concurrent.STM import Control.Monad import Data.Function import Data.Maybe import qualified Data.Set as Set ;import Data.Set (Set) import Data.Hashable (Hashable(..)) -- for type sigs import System.IO.Error import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) import Network.Kademlia.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)) } data SearchState nid addr tok ni r = SearchState {- { searchParams :: Search nid addr ni r , searchTarget :: nid -- | This action will be performed at least once on each search result. -- It may be invoked multiple times since different nodes may report the -- same result. If the action returns 'False', the search will be -- aborted, otherwise it will continue until it is decided that we've -- asked the closest K nodes to the target. , searchResult :: r -> STM Bool -} { -- | The number of pending queries. Incremented before any query is sent -- and decremented when we get a reply. searchPendingCount :: TVar Int -- | Nodes scheduled to be queried. , searchQueued :: TVar (MinMaxPSQ ni nid) -- | The nearest (K - α) nodes that issued a reply. , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) -- | This tracks already-queried addresses so we avoid bothering them -- again. XXX: We could probably keep only the pending queries in this -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha -- should limit the number of outstanding queries. , searchVisited :: TVar (Set addr) } newSearch :: ( Ord addr , PSQKey nid , PSQKey ni ) => {- KademliaSpace nid ni -> (ni -> addr) -> (ni -> IO ([ni], [r])) -- the query action. -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} Search nid addr tok ni r -> nid -> [ni] -- Initial nodes to query. -> STM (SearchState nid addr tok ni r) newSearch (Search space nAddr qry) target ns = do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty v <- newTVar Set.empty return -- (Search space nAddr qry) , r , target ( SearchState c q i v ) -- | Discard a value from a key-priority-value tuple. This is useful for -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". stripValue :: Binding' k p v -> Binding k p stripValue (Binding ni _ nid) = (ni :-> nid) -- | Reset a 'SearchState' object to ready it for a repeated search. reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => (nid -> STM [ni]) -> Search nid addr1 tok1 ni r1 -> nid -> SearchState nid addr tok ni r -> STM (SearchState nid addr tok ni r) reset nearestNodes qsearch target st = do searchIsFinished st >>= check -- Wait for a search to finish before resetting. bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) <$> nearestNodes target priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes writeTVar (searchInformant st) MM.empty writeTVar (searchVisited st) Set.empty writeTVar (searchPendingCount st) 0 return st searchAlpha :: Int searchAlpha = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we -- need to ensure that closer non-responding queries don't completely push out -- farther away queries. -- -- For BitTorrent, setting them both 8 was not an issue, but that is no longer -- supported because now the number of remembered informants is now the -- difference between these two numbers. So, if searchK = 16 and searchAlpha = -- 4, then the number of remembered query responses is 12. searchK :: Int searchK = 16 sendQuery :: forall addr nid tok ni r. ( Ord addr , PSQKey nid , PSQKey ni , Show nid ) => Search nid addr tok ni r -> nid -> (r -> STM Bool) -- ^ return False to terminate the search. -> SearchState nid addr tok ni r -> Binding ni nid -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) -- (ns,rs) let tok = error "TODO: token" atomically $ do modifyTVar searchPendingCount pred maybe (return ()) go reply where go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let insertFoundNode :: Int -> ni -> MinMaxPSQ ni nid -> MinMaxPSQ ni nid insertFoundNode k n q | searchNodeAddress n `Set.member` vs = q | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget $ kademliaLocation searchSpace n ) q qsize0 <- MM.size <$> readTVar searchQueued let qsize = if qsize0 < searchK then searchK else qsize0 modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d flip fix rs $ \loop -> \case r:rs' -> do wanting <- searchResult r if wanting then loop rs' else searchCancel sch [] -> return () searchIsFinished :: ( PSQKey nid , PSQKey ni ) => SearchState nid addr tok ni r -> STM Bool searchIsFinished SearchState{ ..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount informants <- readTVar searchInformant return $ cnt == 0 && ( MM.null q || ( MM.size informants >= (searchK - searchAlpha) && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) searchCancel :: SearchState nid addr tok ni r -> STM () searchCancel SearchState{..} = do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty search :: ( Ord r , Ord addr , PSQKey nid , PSQKey ni , Show nid ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) search sch buckets target result = do let ns = R.kclosest (searchSpace sch) searchK target buckets st <- atomically $ newSearch sch target ns fork $ searchLoop sch target result st return st searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) => Search nid addr tok ni r -- ^ Query and distance methods. -> nid -- ^ The target we are searching for. -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. -> SearchState nid addr tok ni r -- ^ Search-related state. -> IO () searchLoop sch@Search{..} target result s@SearchState{..} = do myThreadId >>= flip labelThread ("search."++show target) withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount check (cnt <= 8) -- Only 8 pending queries at a time. informants <- readTVar searchInformant found <- MM.minView <$> readTVar searchQueued case found of Just (ni :-> d, q) | -- If there's fewer than /k/ informants and there's any -- node we haven't yet got a response from. (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) -- Or there's no informants yet at all. || MM.null informants -- Or if the closest scheduled node is nearer than the -- nearest /k/ informants. || (d < PSQ.prio (fromJust $ MM.findMax informants)) -> -- Then the search continues, send a query. do writeTVar searchQueued q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do forkTask g "searchQuery" $ sendQuery sch target result s (ni :-> d) again _ -> -- Otherwise, we are finished. do check (cnt == 0) return $ return ()