From 0e20eb6683761362ee282e3188fccdab46b02ee4 Mon Sep 17 00:00:00 2001 From: joe Date: Thu, 27 Jul 2017 00:09:36 -0400 Subject: peer search. --- src/Network/BitTorrent/DHT/Search.hs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 8c441cb0..b263e339 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -21,7 +21,7 @@ import qualified Data.Set as Set import System.IO import qualified Data.MinMaxPSQ as MM - ;import Data.MinMaxPSQ (MinMaxPSQ) + ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, PSQ, PSQKey) import Network.Address hiding (NodeId) @@ -34,13 +34,13 @@ import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif -data Search nid addr ni r = Search +data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO ([ni], [r]) + , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], tok)) } -data SearchState nid addr ni r = SearchState +data SearchState nid addr tok ni r = SearchState {- { searchParams :: Search nid addr ni r @@ -60,7 +60,7 @@ data SearchState nid addr ni r = SearchState -- | Nodes scheduled to be queried. , searchQueued :: TVar (MinMaxPSQ ni nid) -- | The nearest K nodes that issued a reply. - , searchInformant :: TVar (MinMaxPSQ ni nid) + , searchInformant :: TVar (MinMaxPSQ' ni nid tok) -- | This tracks already-queried addresses so we avoid bothering them -- again. XXX: We could probably keep only the pending queries in this -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha @@ -79,10 +79,10 @@ newSearch :: ( Ord addr -> (r -> STM Bool) -- receives search results. -> nid -- target of search -} - Search nid addr ni r + Search nid addr tok ni r -> nid -> [ni] -- Initial nodes to query. - -> IO (SearchState nid addr ni r) + -> IO (SearchState nid addr tok ni r) newSearch (Search space nAddr qry) target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList @@ -99,25 +99,31 @@ searchAlpha = 8 searchK :: Int searchK = 8 -sendQuery :: forall addr nid ni r. +sendQuery :: forall addr nid tok ni r. ( Ord addr , Ord r , PSQKey nid , PSQKey ni , Show nid ) => - Search nid addr ni r + Search nid addr tok ni r -> nid -> (r -> STM Bool) - -> SearchState nid addr ni r + -> SearchState nid addr tok ni r -> Binding ni nid -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - (ns,rs) <- handle (\(SomeException e) -> return ([],[])) + -- TODO: Should we really be catching ThreadKilled ? + reply <- handle (\(SomeException e) -> return Nothing) (searchQuery searchTarget ni) + -- (ns,rs) + let tok = error "TODO: token" atomically $ do modifyTVar searchPendingCount pred + maybe (return ()) go reply + where + go (ns,rs,tok) = do vs <- readTVar searchVisited -- We only queue a node if it is not yet visited let insertFoundNode :: ni @@ -130,7 +136,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = $ kademliaLocation searchSpace n ) q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns - modifyTVar searchInformant $ MM.insertTake searchK ni d + modifyTVar searchInformant $ MM.insertTake' searchK ni tok d flip fix rs $ \loop -> \case r:rs' -> do wanting <- searchResult r @@ -142,7 +148,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = searchIsFinished :: ( Ord addr , PSQKey nid , PSQKey ni - ) => SearchState nid addr ni r -> STM Bool + ) => SearchState nid addr tok ni r -> STM Bool searchIsFinished SearchState{ ..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount @@ -153,7 +159,7 @@ searchIsFinished SearchState{ ..} = do && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) -searchCancel :: SearchState nid addr ni r -> STM () +searchCancel :: SearchState nid addr tok ni r -> STM () searchCancel SearchState{..} = do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty @@ -164,7 +170,7 @@ search :: , PSQKey nid , PSQKey ni , Show nid - ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr ni r) + ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) search sch@Search{..} buckets target result = do let ns = R.kclosest searchSpace searchK target buckets st <- newSearch sch target ns -- cgit v1.2.3