From 77f6b96492223e7d7b147dac8d026e0b6f6a651b Mon Sep 17 00:00:00 2001 From: joe Date: Sat, 22 Jul 2017 01:15:44 -0400 Subject: Implemented bucket refresh for Mainline. --- src/Network/BitTorrent/DHT/Search.hs | 78 ++++++++++++++++++++++++++---------- src/Network/DHT/Routing.hs | 46 +++++++++++++-------- 2 files changed, 86 insertions(+), 38 deletions(-) (limited to 'src') diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index c562b988..a9efba89 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs @@ -3,6 +3,7 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE LambdaCase #-} module Network.BitTorrent.DHT.Search where import Control.Concurrent @@ -28,39 +29,64 @@ import Network.Address hiding (NodeId) import Network.DatagramServer.Types import Network.DHT.Routing as R -data IterativeSearch nid addr ni r = IterativeSearch - { searchTarget :: nid - , searchSpace :: KademliaSpace nid ni +data Search nid addr ni r = Search + { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: ni -> IO ([ni], [r]) - -- | The number of pending queries. Incremented before any query is sent + } + +data SearchState nid addr ni r = SearchState + {- + { searchParams :: Search nid addr ni r + + , searchTarget :: nid + -- | This action will be performed at least once on each search result. + -- It may be invoked multiple times since different nodes may report the + -- same result. If the action returns 'False', the search will be + -- aborted, otherwise it will continue until it is decided that we've + -- asked the closest K nodes to the target. + , searchResult :: r -> STM Bool + + -} + + { -- | The number of pending queries. Incremented before any query is sent -- and decremented when we get a reply. - , searchPendingCount :: TVar Int + searchPendingCount :: TVar Int -- | Nodes scheduled to be queried. , searchQueued :: TVar (MinMaxPSQ ni nid) -- | The nearest K nodes that issued a reply. , searchInformant :: TVar (MinMaxPSQ ni nid) + -- | This tracks already-queried addresses so we avoid bothering them + -- again. XXX: We could probably keep only the pending queries in this + -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha + -- should limit the number of outstanding queries. , searchVisited :: TVar (Set addr) - , searchResults :: TVar (Set r) } newSearch :: ( Ord addr , PSQKey nid , PSQKey ni ) => + {- KademliaSpace nid ni -> (ni -> addr) - -> (ni -> IO ([ni], [r])) - -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) -newSearch space nAddr qry target ns = atomically $ do + -> (ni -> IO ([ni], [r])) -- the query action. + -> (r -> STM Bool) -- receives search results. + -> nid -- target of search + -} + Search nid addr ni r + -> nid + -> [ni] -- Initial nodes to query. + -> IO (SearchState nid addr ni r) +newSearch (Search space nAddr qry) target ns = atomically $ do c <- newTVar 0 q <- newTVar $ MM.fromList $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) $ ns i <- newTVar MM.empty v <- newTVar Set.empty - r <- newTVar Set.empty - return $ IterativeSearch target space nAddr qry c q i v r + return -- (Search space nAddr qry) , r , target + ( SearchState c q i v ) searchAlpha :: Int searchAlpha = 3 @@ -74,10 +100,13 @@ sendQuery :: forall addr nid ni r. , PSQKey nid , PSQKey ni ) => - IterativeSearch nid addr ni r + Search nid addr ni r + -> nid + -> (r -> STM Bool) + -> SearchState nid addr ni r -> Binding ni nid -> IO () -sendQuery IterativeSearch{..} (ni :-> d) = do +sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do (ns,rs) <- handle (\(SomeException e) -> return ([],[])) (searchQuery ni) atomically $ do @@ -95,14 +124,19 @@ sendQuery IterativeSearch{..} (ni :-> d) = do q modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns modifyTVar searchInformant $ MM.insertTake searchK ni d - modifyTVar searchResults $ \s -> foldr Set.insert s rs + flip fix rs $ \loop -> \case + r:rs' -> do + wanting <- searchResult r + if wanting then loop rs' + else searchCancel sch + [] -> return () searchIsFinished :: ( Ord addr , PSQKey nid , PSQKey ni - ) => IterativeSearch nid addr ni r -> STM Bool -searchIsFinished IterativeSearch{..} = do + ) => SearchState nid addr ni r -> STM Bool +searchIsFinished SearchState{ ..} = do q <- readTVar searchQueued cnt <- readTVar searchPendingCount informants <- readTVar searchInformant @@ -112,8 +146,8 @@ searchIsFinished IterativeSearch{..} = do && ( PSQ.prio (fromJust $ MM.findMax informants) <= PSQ.prio (fromJust $ MM.findMin q)))) -searchCancel :: IterativeSearch nid addr ni r -> IO () -searchCancel IterativeSearch{..} = atomically $ do +searchCancel :: SearchState nid addr ni r -> STM () +searchCancel SearchState{..} = do writeTVar searchPendingCount 0 writeTVar searchQueued MM.empty @@ -122,8 +156,10 @@ search :: , Ord addr , PSQKey nid , PSQKey ni - ) => IterativeSearch nid addr ni r -> IO () -search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do + ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO () +search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do + let ns = R.kclosest searchSpace searchK target buckets + s@SearchState{..} <- newSearch sch target ns fix $ \again -> do join $ atomically $ do cnt <- readTVar $ searchPendingCount @@ -141,7 +177,7 @@ search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do do writeTVar searchQueued q modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) modifyTVar searchPendingCount succ - return $ withAsync g (sendQuery s (ni :-> d)) (const again) + return $ withAsync g (sendQuery sch target result s (ni :-> d)) (const again) _ -> -- Otherwise, we are finished. do check (cnt == 0) return $ return () diff --git a/src/Network/DHT/Routing.hs b/src/Network/DHT/Routing.hs index 396c4b1d..46ebe472 100644 --- a/src/Network/DHT/Routing.hs +++ b/src/Network/DHT/Routing.hs @@ -536,19 +536,31 @@ depth = L.length . shape #endif lookupBucket :: forall ni nid x. - ( FiniteBits nid - , Ord nid - ) => (ni -> nid) -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x -lookupBucket nodeId nid kont (BucketList self _ bkts) = kont $ go 0 [] bkts + ( -- FiniteBits nid + Ord nid + ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x +lookupBucket space nid kont (BucketList self _ bkts) = kont $ go 0 [] bkts where - d = nid `xor` nodeId self + d = kademliaXor space nid (kademliaLocation space self) go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] go i bs (bucket : buckets) - | testIdBit d i = go (succ i) (bucket:bs) buckets - | otherwise = bucket : buckets ++ bs + | kademliaTestBit space d i = bucket : buckets ++ bs + | otherwise = go (succ i) (bucket:bs) buckets go _ bs [] = bs +bucketNumber :: forall ni nid. + KademliaSpace nid ni -> nid -> BucketList ni -> Int +bucketNumber space nid (BucketList self _ bkts) = fromIntegral $ go 0 bkts + where + d = kademliaXor space nid (kademliaLocation space self) + + go :: Word -> [Bucket s ni] -> Word + go i (bucket : buckets) + | kademliaTestBit space d i = i + | otherwise = go (succ i) buckets + go i [] = i + compatibleNodeId :: forall ni nid. ( Serialize nid, FiniteBits nid) => @@ -614,23 +626,23 @@ distance :: Bits nid => nid -> nid -> NodeDistance nid distance a b = NodeDistance $ xor a b -- | Order by closeness: nearest nodes first. -rank :: ( FiniteBits nid - , Ord nid - ) => (x -> nid) -> nid -> [x] -> [x] -rank f nid = L.sortBy (comparing (distance nid . f)) +rank :: ( Ord nid + ) => KademliaSpace nid ni -> nid -> [ni] -> [ni] +rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space)) -- | Get a list of /K/ closest nodes using XOR metric. Used in -- 'find_node' and 'get_peers' queries. -kclosest :: ( FiniteBits nid - , Ord nid - ) => (ni -> nid) -> Int -> nid -> BucketList ni -> [ni] -kclosest nodeId k nid tbl = take k $ rank nodeId nid (L.concat bucket) - ++ rank nodeId nid (L.concat everyone) +kclosest :: ( -- FiniteBits nid + Ord nid + ) => + KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni] +kclosest space k nid tbl = take k $ rank space nid (L.concat bucket) + ++ rank space nid (L.concat everyone) where (bucket,everyone) = L.splitAt 1 - . lookupBucket nodeId nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) + . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) $ tbl -- cgit v1.2.3