From c51e64666b672637843a04c2f279d7d0c9eed01c Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 1 Feb 2017 03:21:52 -0500 Subject: New improved iterative search algorithm. --- src/Network/BitTorrent/DHT/Search.hs | 92 ++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 src/Network/BitTorrent/DHT/Search.hs (limited to 'src/Network/BitTorrent/DHT/Search.hs') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs new file mode 100644 index 00000000..1fe73c30 --- /dev/null +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -0,0 +1,92 @@ +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RecordWildCards #-} +module Network.BitTorrent.DHT.Search where + +import Control.Concurrent +import Control.Concurrent.Async.Pool +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.Bool +import Data.Function +import Data.List +import qualified Data.Map.Strict as Map + ;import Data.Map.Strict (Map) +import Data.Maybe +import qualified Data.Set as Set + ;import Data.Set (Set) +import System.IO + +import qualified Data.MinMaxPSQ as MM + ;import Data.MinMaxPSQ (MinMaxPSQ) +import qualified Data.Wrapper.PSQ as PSQ + ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ) +import Network.BitTorrent.Address + +data IterativeSearch ip r = IterativeSearch + { searchTarget :: NodeId + , searchQuery :: NodeInfo ip -> IO ([NodeInfo ip], [r]) + , searchPendingCount :: TVar Int + , searchQueued :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) + , searchInformant :: TVar (MinMaxPSQ (NodeInfo ip) NodeDistance) + , searchVisited :: TVar (Set (NodeAddr ip)) + , searchResults :: TVar (Set r) + } + +newSearch :: Eq ip => (NodeInfo ip -> IO ([NodeInfo ip], [r])) + -> NodeId -> [NodeInfo ip] -> IO (IterativeSearch ip r) +newSearch qry target ns = atomically $ do + c <- newTVar 0 + q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns + i <- newTVar MM.empty + v <- newTVar Set.empty + r <- newTVar Set.empty + return $ IterativeSearch target qry c q i v r + +searchAlpha :: Int +searchAlpha = 3 + +searchK :: Int +searchK = 8 + +sendQuery :: (Ord a, Ord t) => + IterativeSearch t a + -> Binding (NodeInfo t) NodeDistance + -> IO () +sendQuery IterativeSearch{..} (ni :-> d) = do + (ns,rs) <- handle (\(SomeException e) -> return ([],[])) + (searchQuery ni) + atomically $ do + modifyTVar searchPendingCount pred + vs <- readTVar searchVisited + -- We only queue a node if it is not yet visited + let insertFoundNode n q + | nodeAddr n `Set.member` vs = q + | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q + modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns + modifyTVar searchInformant $ MM.insertTake searchK ni d + modifyTVar searchResults $ \s -> foldr Set.insert s rs + +search :: + (Ord r, Ord ip) => + IterativeSearch ip r -> IO () +search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do + fix $ \again -> do + join $ atomically $ do + found <- MM.minView <$> readTVar searchQueued + cnt <- readTVar $ searchPendingCount + case found of + Nothing -> retry + Just (ni :-> d, q) -> do + informants <- readTVar searchInformant + if MM.size informants < searchK + && (cnt > 0 || not (MM.null q)) + || PSQ.prio (fromJust $ MM.findMax informants) > d + then do + writeTVar searchQueued q + modifyTVar searchVisited $ Set.insert (nodeAddr ni) + modifyTVar searchPendingCount succ + return $ withAsync g (sendQuery s (ni :-> d)) (const again) + else do + check (cnt == 0) + return $ return () -- cgit v1.2.3