{-# LANGUAGE CPP #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent.Tasks import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.Bool import Data.Function import Data.List import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Maybe import qualified Data.Set as Set ;import Data.Set (Set) import System.IO import System.IO.Error import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) import Network.DHT.Routing as R #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) } data SearchState nid addr tok ni r = SearchState {- { searchParams :: Search nid addr ni r , searchTarget :: nid -- | This action will be performed at least once on each search result. -- It may be invoked multiple times since different nodes may report the -- same result. If the action returns 'False', the search will be -- aborted, otherwise it will continue until it is decided that we've -- asked the closest K nodes to the target. , searchResult :: r -> STM Bool -} { -- | The number of pending queries. Incremented before any query is sent -- and decremented when we get a reply. searchPendingCount :: TVar Int -- | Nodes scheduled to be queried. , searchQueued :: TVar (MinMaxPSQ ni nid) -- | The nearest K nodes that issued a reply. , searchInformant :: TVar (MinMaxPSQ' ni nid tok) -- | This tracks already-queried addresses so we avoid bothering them -- again. XXX: We could probably keep only the pending queries in this -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha -- should limit the number of outstanding queries. , searchVisited :: TVar (Set addr) } newSearch :: ( Ord addr , PSQKey nid , PSQKey ni ) => {- KademliaSpace nid ni -> (ni -> addr) -> (ni -> IO ([ni], [r])) -- the query action. -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} Search nid addr tok ni r -> nid -> [ni] -- Initial nodes to query. -> STM (SearchState nid addr tok ni r) newSearch (Search space nAddr qry) target ns = do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty v <- newTVar Set.empty return -- (Search space nAddr qry) , r , target ( SearchState c q i v ) searchAlpha :: Int searchAlpha = 8 searchK :: Int searchK = 8 sendQuery :: forall addr nid tok ni r. ( Ord addr , PSQKey nid , PSQKey ni , Show nid ) => Search nid addr tok ni r -> nid -> (r -> STM Bool) -> SearchState nid addr tok ni r -> Binding ni nid -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) -- (ns,rs) let tok = error "TODO: token" atomically $ do modifyTVar searchPendingCount pred maybe (return ()) go reply where go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let insertFoundNode :: ni -> MinMaxPSQ ni nid -> MinMaxPSQ ni nid insertFoundNode n q | searchNodeAddress n `Set.member` vs = q | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget $ kademliaLocation searchSpace n ) q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns modifyTVar searchInformant $ MM.insertTake' searchK ni tok d flip fix rs $ \loop -> \case r:rs' -> do wanting <- searchResult r if wanting then loop rs' else searchCancel sch [] -> return () searchIsFinished :: ( PSQKey nid , PSQKey ni ) => SearchState nid addr tok ni r -> STM Bool searchIsFinished SearchState{ ..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount informants <- readTVar searchInformant return $ cnt == 0 && ( MM.null q || ( MM.size informants >= searchK && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) searchCancel :: SearchState nid addr tok ni r -> STM () searchCancel SearchState{..} = do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty search :: ( Ord r , Ord addr , PSQKey nid , PSQKey ni , Show nid ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) search sch buckets target result = do let ns = R.kclosest (searchSpace sch) searchK target buckets st <- atomically $ newSearch sch target ns fork $ searchLoop sch target result st return st searchLoop sch@Search{..} target result s@SearchState{..} = do myThreadId >>= flip labelThread ("search."++show target) withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount informants <- readTVar searchInformant found <- MM.minView <$> readTVar searchQueued case found of Just (ni :-> d, q) | -- If there's fewer than /k/ informants and there's any -- node we haven't yet got a response from. (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) -- Or there's no informants yet at all. || MM.null informants -- Or if the closest scheduled node is nearer than the -- nearest /k/ informants. || (d < PSQ.prio (fromJust $ MM.findMax informants)) -> -- Then the search continues, send a query. do writeTVar searchQueued q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ do forkTask g "searchQuery" $ sendQuery sch target result s (ni :-> d) again _ -> -- Otherwise, we are finished. do check (cnt == 0) return $ return ()