From eab23d20425d43445a9fdab9d604344172f092f7 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 20 Jul 2017 01:11:19 -0400 Subject: Updated Search module for rewrite. --- src/Network/BitTorrent/DHT/Search.hs | 88 +++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 36 deletions(-) (limited to 'src/Network/BitTorrent/DHT/Search.hs') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 07240755..91a1079b 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -26,31 +26,37 @@ import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) import Network.DatagramServer.Types -import Data.Bits +import Network.DHT.Routing as R -data IterativeSearch dht u ip r = IterativeSearch - { searchTarget :: NodeId dht - , searchQuery :: NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r]) +data IterativeSearch nid addr ni r = IterativeSearch + { searchTarget :: nid + , searchSpace :: KademliaSpace nid ni + , searchNodeAddress :: ni -> addr + , searchQuery :: ni -> IO ([ni], [r]) , searchPendingCount :: TVar Int - , searchQueued :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) - , searchInformant :: TVar (MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht))) - , searchVisited :: TVar (Set (NodeAddr ip)) + , searchQueued :: TVar (MinMaxPSQ ni nid) + , searchInformant :: TVar (MinMaxPSQ ni nid) + , searchVisited :: TVar (Set addr) , searchResults :: TVar (Set r) } -newSearch :: ( Eq ip - , PSQKey (NodeId dht) - , PSQKey (NodeInfo dht ip u) - , FiniteBits (NodeId dht) - ) => (NodeInfo dht ip u -> IO ([NodeInfo dht ip u], [r])) - -> NodeId dht -> [NodeInfo dht ip u] -> IO (IterativeSearch dht u ip r) -newSearch qry target ns = atomically $ do +newSearch :: ( Ord addr + , PSQKey nid + , PSQKey ni + ) => + KademliaSpace nid ni + -> (ni -> addr) + -> (ni -> IO ([ni], [r])) + -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) +newSearch space nAddr qry target ns = atomically $ do c <- newTVar 0 - q <- newTVar $ MM.fromList $ map (\n -> n :-> distance target (nodeId n)) ns + q <- newTVar $ MM.fromList + $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) + $ ns i <- newTVar MM.empty v <- newTVar Set.empty r <- newTVar Set.empty - return $ IterativeSearch target qry c q i v r + return $ IterativeSearch target space nAddr qry c q i v r searchAlpha :: Int searchAlpha = 3 @@ -58,15 +64,14 @@ searchAlpha = 3 searchK :: Int searchK = 8 -sendQuery :: forall a ip dht u. - ( Ord a - , Ord ip - , PSQKey (NodeId dht) - , PSQKey (NodeInfo dht ip u) - , FiniteBits (NodeId dht) +sendQuery :: forall addr nid ni r. + ( Ord addr + , Ord r + , PSQKey nid + , PSQKey ni ) => - IterativeSearch dht u ip a - -> Binding (NodeInfo dht ip u) (NodeDistance (NodeId dht)) + IterativeSearch nid addr ni r + -> Binding ni nid -> IO () sendQuery IterativeSearch{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) @@ -75,21 +80,24 @@ sendQuery IterativeSearch{..} (ni :-> d) = do modifyTVar searchPendingCount pred vs <- readTVar searchVisited -- We only queue a node if it is not yet visited - let insertFoundNode :: NodeInfo dht ip u - -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) - -> MinMaxPSQ (NodeInfo dht ip u) (NodeDistance (NodeId dht)) + let insertFoundNode :: ni + -> MinMaxPSQ ni nid + -> MinMaxPSQ ni nid insertFoundNode n q - | nodeAddr n `Set.member` vs = q - | otherwise = MM.insertTake searchK n (distance searchTarget $ nodeId n) q + | searchNodeAddress n `Set.member` vs + = q + | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget + $ kademliaLocation searchSpace n ) + q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns modifyTVar searchInformant $ MM.insertTake searchK ni d modifyTVar searchResults $ \s -> foldr Set.insert s rs -searchIsFinished :: ( Ord ip - , PSQKey (NodeId dht) - , PSQKey (NodeInfo dht ip u) - ) => IterativeSearch dht u ip r -> STM Bool +searchIsFinished :: ( Ord addr + , PSQKey nid + , PSQKey ni + ) => IterativeSearch nid addr ni r -> STM Bool searchIsFinished IterativeSearch{..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount @@ -100,9 +108,17 @@ searchIsFinished IterativeSearch{..} = do && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) +searchCancel :: IterativeSearch nid addr ni r -> IO () +searchCancel IterativeSearch{..} = atomically $ do + writeTVar searchPendingCount 0 + writeTVar searchQueued MM.empty + search :: - (Ord r, Ord ip, PSQKey (NodeId dht), PSQKey (NodeInfo dht ip u), FiniteBits (NodeId dht)) => - IterativeSearch dht u ip r -> IO () + ( Ord r + , Ord addr + , PSQKey nid + , PSQKey ni + ) => IterativeSearch nid addr ni r -> IO () search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do fix $ \again -> do join $ atomically $ do @@ -114,7 +130,7 @@ search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) || (PSQ.prio (fromJust $ MM.findMax informants) > d) -> do writeTVar searchQueued q - modifyTVar searchVisited $ Set.insert (nodeAddr ni) + modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ return $ withAsync g (sendQuery s (ni :-> d)) (const again) _ -> do check (cnt == 0) -- cgit v1.2.3