{-# LANGUAGE CPP #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE FlexibleContexts #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent import Control.Concurrent.Async.Pool import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.Bool import Data.Function import Data.List import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Maybe import qualified Data.Set as Set ;import Data.Set (Set) import System.IO import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ) import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) import Network.DatagramServer.Types import Network.DHT.Routing as R data IterativeSearch nid addr ni r = IterativeSearch { searchTarget :: nid , searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: ni -> IO ([ni], [r]) , searchPendingCount :: TVar Int , searchQueued :: TVar (MinMaxPSQ ni nid) , searchInformant :: TVar (MinMaxPSQ ni nid) , searchVisited :: TVar (Set addr) , searchResults :: TVar (Set r) } newSearch :: ( Ord addr , PSQKey nid , PSQKey ni ) => KademliaSpace nid ni -> (ni -> addr) -> (ni -> IO ([ni], [r])) -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) newSearch space nAddr qry target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty v <- newTVar Set.empty r <- newTVar Set.empty return $ IterativeSearch target space nAddr qry c q i v r searchAlpha :: Int searchAlpha = 3 searchK :: Int searchK = 8 sendQuery :: forall addr nid ni r. ( Ord addr , Ord r , PSQKey nid , PSQKey ni ) => IterativeSearch nid addr ni r -> Binding ni nid -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) (searchQuery ni) atomically $ do modifyTVar searchPendingCount pred vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let insertFoundNode :: ni -> MinMaxPSQ ni nid -> MinMaxPSQ ni nid insertFoundNode n q | searchNodeAddress n `Set.member` vs = q | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget $ kademliaLocation searchSpace n ) q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns modifyTVar searchInformant $ MM.insertTake searchK ni d modifyTVar searchResults $ \s -> foldr Set.insert s rs searchIsFinished :: ( Ord addr , PSQKey nid , PSQKey ni ) => IterativeSearch nid addr ni r -> STM Bool searchIsFinished IterativeSearch{..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount informants <- readTVar searchInformant return $ cnt == 0 && ( MM.null q || ( MM.size informants >= searchK && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) searchCancel :: IterativeSearch nid addr ni r -> IO () searchCancel IterativeSearch{..} = atomically $ do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty search :: ( Ord r , Ord addr , PSQKey nid , PSQKey ni ) => IterativeSearch nid addr ni r -> IO () search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount informants <- readTVar searchInformant found <- MM.minView <$> readTVar searchQueued case found of Just (ni :-> d, q) | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) || (PSQ.prio (fromJust $ MM.findMax informants) > d) -> do writeTVar searchQueued q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ withAsync g (sendQuery s (ni :-> d)) (const again) _ -> do check (cnt == 0) return $ return ()