From fdfe7279339f91bad5ceb0cab8e699415686ab3f Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 17:36:53 -0400 Subject: Moved Network.BitTorrent.DHT.Search -> Network.Kademlia.Search --- src/Network/BitTorrent/DHT/Search.hs | 204 ---------------------------------- src/Network/BitTorrent/MainlineDHT.hs | 2 +- src/Network/Kademlia.hs | 2 +- src/Network/Kademlia/Search.hs | 204 ++++++++++++++++++++++++++++++++++ src/Network/Tox.hs | 2 +- src/Network/Tox/DHT/Handlers.hs | 2 +- 6 files changed, 208 insertions(+), 208 deletions(-) delete mode 100644 src/Network/BitTorrent/DHT/Search.hs create mode 100644 src/Network/Kademlia/Search.hs (limited to 'src') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs deleted file mode 100644 index 29e91633..00000000 --- a/src/Network/BitTorrent/DHT/Search.hs +++ /dev/null @@ -1,204 +0,0 @@ -{-# LANGUAGE CPP #-} -{-# LANGUAGE PatternSynonyms #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE LambdaCase #-} -module Network.BitTorrent.DHT.Search where - -import Control.Concurrent.Tasks -import Control.Concurrent.STM -import Control.Exception -import Control.Monad -import Data.Bool -import Data.Function -import Data.List -import qualified Data.Map.Strict as Map - ;import Data.Map.Strict (Map) -import Data.Maybe -import qualified Data.Set as Set - ;import Data.Set (Set) -import System.IO -import System.IO.Error - -import qualified Data.MinMaxPSQ as MM - ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') -import qualified Data.Wrapper.PSQ as PSQ - ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) -import Network.Address hiding (NodeId) -import Network.DHT.Routing as R -#ifdef THREAD_DEBUG -import Control.Concurrent.Lifted.Instrument -#else -import Control.Concurrent.Lifted -import GHC.Conc (labelThread) -#endif - -data Search nid addr tok ni r = Search - { searchSpace :: KademliaSpace nid ni - , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) - } - -data SearchState nid addr tok ni r = SearchState - {- - { searchParams :: Search nid addr ni r - - , searchTarget :: nid - -- | This action will be performed at least once on each search result. - -- It may be invoked multiple times since different nodes may report the - -- same result. If the action returns 'False', the search will be - -- aborted, otherwise it will continue until it is decided that we've - -- asked the closest K nodes to the target. - , searchResult :: r -> STM Bool - - -} - - { -- | The number of pending queries. Incremented before any query is sent - -- and decremented when we get a reply. - searchPendingCount :: TVar Int - -- | Nodes scheduled to be queried. - , searchQueued :: TVar (MinMaxPSQ ni nid) - -- | The nearest K nodes that issued a reply. - , searchInformant :: TVar (MinMaxPSQ' ni nid tok) - -- | This tracks already-queried addresses so we avoid bothering them - -- again. XXX: We could probably keep only the pending queries in this - -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha - -- should limit the number of outstanding queries. - , searchVisited :: TVar (Set addr) - } - -newSearch :: ( Ord addr - , PSQKey nid - , PSQKey ni - ) => - {- - KademliaSpace nid ni - -> (ni -> addr) - -> (ni -> IO ([ni], [r])) -- the query action. - -> (r -> STM Bool) -- receives search results. - -> nid -- target of search - -} - Search nid addr tok ni r - -> nid - -> [ni] -- Initial nodes to query. - -> STM (SearchState nid addr tok ni r) -newSearch (Search space nAddr qry) target ns = do - c <- newTVar 0 - q <- newTVar $ MM.fromList - $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) - $ ns - i <- newTVar MM.empty - v <- newTVar Set.empty - return -- (Search space nAddr qry) , r , target - ( SearchState c q i v ) - -searchAlpha :: Int -searchAlpha = 8 - -searchK :: Int -searchK = 8 - -sendQuery :: forall addr nid tok ni r. - ( Ord addr - , PSQKey nid - , PSQKey ni - , Show nid - ) => - Search nid addr tok ni r - -> nid - -> (r -> STM Bool) - -> SearchState nid addr tok ni r - -> Binding ni nid - -> IO () -sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do - myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) - -- (ns,rs) - let tok = error "TODO: token" - atomically $ do - modifyTVar searchPendingCount pred - maybe (return ()) go reply - where - go (ns,rs,tok) = do - vs <- readTVar searchVisited - -- We only queue a node if it is not yet visited - let insertFoundNode :: ni - -> MinMaxPSQ ni nid - -> MinMaxPSQ ni nid - insertFoundNode n q - | searchNodeAddress n `Set.member` vs - = q - | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget - $ kademliaLocation searchSpace n ) - q - modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns - modifyTVar searchInformant $ MM.insertTake' searchK ni tok d - flip fix rs $ \loop -> \case - r:rs' -> do - wanting <- searchResult r - if wanting then loop rs' - else searchCancel sch - [] -> return () - - -searchIsFinished :: ( PSQKey nid - , PSQKey ni - ) => SearchState nid addr tok ni r -> STM Bool -searchIsFinished SearchState{ ..} = do - q <- readTVar searchQueued - cnt <- readTVar searchPendingCount - informants <- readTVar searchInformant - return $ cnt == 0 - && ( MM.null q - || ( MM.size informants >= searchK - && ( PSQ.prio (fromJust $ MM.findMax informants) - <= PSQ.prio (fromJust $ MM.findMin q)))) - -searchCancel :: SearchState nid addr tok ni r -> STM () -searchCancel SearchState{..} = do - writeTVar searchPendingCount 0 - writeTVar searchQueued MM.empty - -search :: - ( Ord r - , Ord addr - , PSQKey nid - , PSQKey ni - , Show nid - ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) -search sch buckets target result = do - let ns = R.kclosest (searchSpace sch) searchK target buckets - st <- atomically $ newSearch sch target ns - fork $ searchLoop sch target result st - return st - -searchLoop sch@Search{..} target result s@SearchState{..} = do - myThreadId >>= flip labelThread ("search."++show target) - withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do - join $ atomically $ do - cnt <- readTVar $ searchPendingCount - informants <- readTVar searchInformant - found <- MM.minView <$> readTVar searchQueued - case found of - Just (ni :-> d, q) - | -- If there's fewer than /k/ informants and there's any - -- node we haven't yet got a response from. - (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) - -- Or there's no informants yet at all. - || MM.null informants - -- Or if the closest scheduled node is nearer than the - -- nearest /k/ informants. - || (d < PSQ.prio (fromJust $ MM.findMax informants)) - -> -- Then the search continues, send a query. - do writeTVar searchQueued q - modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) - modifyTVar searchPendingCount succ - return $ do - forkTask g - "searchQuery" - $ sendQuery sch target result s (ni :-> d) - again - _ -> -- Otherwise, we are finished. - do check (cnt == 0) - return $ return () diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index 6f03ef60..e7d702c3 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs @@ -58,7 +58,7 @@ import Network.Address (Address, fromAddr, fromSockAddr, toSockAddr, genBucketSample', WantIP(..), un4map,either4or6,ipFamily) import Network.BitTorrent.DHT.ContactInfo as Peers -import Network.BitTorrent.DHT.Search (Search (..)) +import Network.Kademlia.Search (Search (..)) import Network.BitTorrent.DHT.Token as Token import qualified Network.DHT.Routing as R ;import Network.DHT.Routing (Timestamp, getTimestamp) diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs index 315cc652..5fb1e334 100644 --- a/src/Network/Kademlia.hs +++ b/src/Network/Kademlia.hs @@ -29,7 +29,7 @@ import Data.Time.Clock.POSIX (POSIXTime) import qualified Data.Wrapper.PSQInt as Int ;import Data.Wrapper.PSQInt (pattern (:->)) import Network.Address (bucketRange,genBucketSample) -import Network.BitTorrent.DHT.Search +import Network.Kademlia.Search import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) diff --git a/src/Network/Kademlia/Search.hs b/src/Network/Kademlia/Search.hs new file mode 100644 index 00000000..195bed14 --- /dev/null +++ b/src/Network/Kademlia/Search.hs @@ -0,0 +1,204 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} +module Network.Kademlia.Search where + +import Control.Concurrent.Tasks +import Control.Concurrent.STM +import Control.Exception +import Control.Monad +import Data.Bool +import Data.Function +import Data.List +import qualified Data.Map.Strict as Map + ;import Data.Map.Strict (Map) +import Data.Maybe +import qualified Data.Set as Set + ;import Data.Set (Set) +import System.IO +import System.IO.Error + +import qualified Data.MinMaxPSQ as MM + ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') +import qualified Data.Wrapper.PSQ as PSQ + ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) +import Network.Address hiding (NodeId) +import Network.DHT.Routing as R +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif + +data Search nid addr tok ni r = Search + { searchSpace :: KademliaSpace nid ni + , searchNodeAddress :: ni -> addr + , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) + } + +data SearchState nid addr tok ni r = SearchState + {- + { searchParams :: Search nid addr ni r + + , searchTarget :: nid + -- | This action will be performed at least once on each search result. + -- It may be invoked multiple times since different nodes may report the + -- same result. If the action returns 'False', the search will be + -- aborted, otherwise it will continue until it is decided that we've + -- asked the closest K nodes to the target. + , searchResult :: r -> STM Bool + + -} + + { -- | The number of pending queries. Incremented before any query is sent + -- and decremented when we get a reply. + searchPendingCount :: TVar Int + -- | Nodes scheduled to be queried. + , searchQueued :: TVar (MinMaxPSQ ni nid) + -- | The nearest K nodes that issued a reply. + , searchInformant :: TVar (MinMaxPSQ' ni nid tok) + -- | This tracks already-queried addresses so we avoid bothering them + -- again. XXX: We could probably keep only the pending queries in this + -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha + -- should limit the number of outstanding queries. + , searchVisited :: TVar (Set addr) + } + +newSearch :: ( Ord addr + , PSQKey nid + , PSQKey ni + ) => + {- + KademliaSpace nid ni + -> (ni -> addr) + -> (ni -> IO ([ni], [r])) -- the query action. + -> (r -> STM Bool) -- receives search results. + -> nid -- target of search + -} + Search nid addr tok ni r + -> nid + -> [ni] -- Initial nodes to query. + -> STM (SearchState nid addr tok ni r) +newSearch (Search space nAddr qry) target ns = do + c <- newTVar 0 + q <- newTVar $ MM.fromList + $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) + $ ns + i <- newTVar MM.empty + v <- newTVar Set.empty + return -- (Search space nAddr qry) , r , target + ( SearchState c q i v ) + +searchAlpha :: Int +searchAlpha = 8 + +searchK :: Int +searchK = 8 + +sendQuery :: forall addr nid tok ni r. + ( Ord addr + , PSQKey nid + , PSQKey ni + , Show nid + ) => + Search nid addr tok ni r + -> nid + -> (r -> STM Bool) + -> SearchState nid addr tok ni r + -> Binding ni nid + -> IO () +sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do + myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) + reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) + -- (ns,rs) + let tok = error "TODO: token" + atomically $ do + modifyTVar searchPendingCount pred + maybe (return ()) go reply + where + go (ns,rs,tok) = do + vs <- readTVar searchVisited + -- We only queue a node if it is not yet visited + let insertFoundNode :: ni + -> MinMaxPSQ ni nid + -> MinMaxPSQ ni nid + insertFoundNode n q + | searchNodeAddress n `Set.member` vs + = q + | otherwise = MM.insertTake searchK n ( kademliaXor searchSpace searchTarget + $ kademliaLocation searchSpace n ) + q + modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns + modifyTVar searchInformant $ MM.insertTake' searchK ni tok d + flip fix rs $ \loop -> \case + r:rs' -> do + wanting <- searchResult r + if wanting then loop rs' + else searchCancel sch + [] -> return () + + +searchIsFinished :: ( PSQKey nid + , PSQKey ni + ) => SearchState nid addr tok ni r -> STM Bool +searchIsFinished SearchState{ ..} = do + q <- readTVar searchQueued + cnt <- readTVar searchPendingCount + informants <- readTVar searchInformant + return $ cnt == 0 + && ( MM.null q + || ( MM.size informants >= searchK + && ( PSQ.prio (fromJust $ MM.findMax informants) + <= PSQ.prio (fromJust $ MM.findMin q)))) + +searchCancel :: SearchState nid addr tok ni r -> STM () +searchCancel SearchState{..} = do + writeTVar searchPendingCount 0 + writeTVar searchQueued MM.empty + +search :: + ( Ord r + , Ord addr + , PSQKey nid + , PSQKey ni + , Show nid + ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) +search sch buckets target result = do + let ns = R.kclosest (searchSpace sch) searchK target buckets + st <- atomically $ newSearch sch target ns + fork $ searchLoop sch target result st + return st + +searchLoop sch@Search{..} target result s@SearchState{..} = do + myThreadId >>= flip labelThread ("search."++show target) + withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do + join $ atomically $ do + cnt <- readTVar $ searchPendingCount + informants <- readTVar searchInformant + found <- MM.minView <$> readTVar searchQueued + case found of + Just (ni :-> d, q) + | -- If there's fewer than /k/ informants and there's any + -- node we haven't yet got a response from. + (MM.size informants < searchK) && (cnt > 0 || not (MM.null q)) + -- Or there's no informants yet at all. + || MM.null informants + -- Or if the closest scheduled node is nearer than the + -- nearest /k/ informants. + || (d < PSQ.prio (fromJust $ MM.findMax informants)) + -> -- Then the search continues, send a query. + do writeTVar searchQueued q + modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) + modifyTVar searchPendingCount succ + return $ do + forkTask g + "searchQuery" + $ sendQuery sch target result s (ni :-> d) + again + _ -> -- Otherwise, we are finished. + do check (cnt == 0) + return $ return () diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index c88dbcd4..7279c2e3 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs @@ -70,7 +70,7 @@ import Network.Address (Address, WantIP (..), either4or6, fromSockAddr, ipFamily, setPort, sockAddrPort, testIdBit, toSockAddr, un4map) -import Network.BitTorrent.DHT.Search (Search (..)) +import Network.Kademlia.Search (Search (..)) import qualified Network.DHT.Routing as R import Network.QueryResponse import Network.Socket diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index f22f9ffe..2dc183cd 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs @@ -7,7 +7,7 @@ import Network.Tox.DHT.Transport as DHTTransport import Network.QueryResponse as QR hiding (Client) import qualified Network.QueryResponse as QR (Client) import Crypto.Tox -import Network.BitTorrent.DHT.Search +import Network.Kademlia.Search import qualified Data.Wrapper.PSQInt as Int import Network.Kademlia import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) -- cgit v1.2.3