summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-07-22 01:15:44 -0400
committerjoe <joe@jerkface.net>2017-07-22 01:15:44 -0400
commit77f6b96492223e7d7b147dac8d026e0b6f6a651b (patch)
tree661e2115a814de82ba251bccf0ab21ae4dfd1ff1 /src
parent7f1eb53d34ea6dda02cae1934b5011e38de248a6 (diff)
Implemented bucket refresh for Mainline.
Diffstat (limited to 'src')
-rw-r--r--src/Network/BitTorrent/DHT/Search.hs78
-rw-r--r--src/Network/DHT/Routing.hs46
2 files changed, 86 insertions, 38 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs
index c562b988..a9efba89 100644
--- a/src/Network/BitTorrent/DHT/Search.hs
+++ b/src/Network/BitTorrent/DHT/Search.hs
@@ -3,6 +3,7 @@
3{-# LANGUAGE RecordWildCards #-} 3{-# LANGUAGE RecordWildCards #-}
4{-# LANGUAGE ScopedTypeVariables #-} 4{-# LANGUAGE ScopedTypeVariables #-}
5{-# LANGUAGE FlexibleContexts #-} 5{-# LANGUAGE FlexibleContexts #-}
6{-# LANGUAGE LambdaCase #-}
6module Network.BitTorrent.DHT.Search where 7module Network.BitTorrent.DHT.Search where
7 8
8import Control.Concurrent 9import Control.Concurrent
@@ -28,39 +29,64 @@ import Network.Address hiding (NodeId)
28import Network.DatagramServer.Types 29import Network.DatagramServer.Types
29import Network.DHT.Routing as R 30import Network.DHT.Routing as R
30 31
31data IterativeSearch nid addr ni r = IterativeSearch 32data Search nid addr ni r = Search
32 { searchTarget :: nid 33 { searchSpace :: KademliaSpace nid ni
33 , searchSpace :: KademliaSpace nid ni
34 , searchNodeAddress :: ni -> addr 34 , searchNodeAddress :: ni -> addr
35 , searchQuery :: ni -> IO ([ni], [r]) 35 , searchQuery :: ni -> IO ([ni], [r])
36 -- | The number of pending queries. Incremented before any query is sent 36 }
37
38data SearchState nid addr ni r = SearchState
39 {-
40 { searchParams :: Search nid addr ni r
41
42 , searchTarget :: nid
43 -- | This action will be performed at least once on each search result.
44 -- It may be invoked multiple times since different nodes may report the
45 -- same result. If the action returns 'False', the search will be
46 -- aborted, otherwise it will continue until it is decided that we've
47 -- asked the closest K nodes to the target.
48 , searchResult :: r -> STM Bool
49
50 -}
51
52 { -- | The number of pending queries. Incremented before any query is sent
37 -- and decremented when we get a reply. 53 -- and decremented when we get a reply.
38 , searchPendingCount :: TVar Int 54 searchPendingCount :: TVar Int
39 -- | Nodes scheduled to be queried. 55 -- | Nodes scheduled to be queried.
40 , searchQueued :: TVar (MinMaxPSQ ni nid) 56 , searchQueued :: TVar (MinMaxPSQ ni nid)
41 -- | The nearest K nodes that issued a reply. 57 -- | The nearest K nodes that issued a reply.
42 , searchInformant :: TVar (MinMaxPSQ ni nid) 58 , searchInformant :: TVar (MinMaxPSQ ni nid)
59 -- | This tracks already-queried addresses so we avoid bothering them
60 -- again. XXX: We could probably keep only the pending queries in this
61 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
62 -- should limit the number of outstanding queries.
43 , searchVisited :: TVar (Set addr) 63 , searchVisited :: TVar (Set addr)
44 , searchResults :: TVar (Set r)
45 } 64 }
46 65
47newSearch :: ( Ord addr 66newSearch :: ( Ord addr
48 , PSQKey nid 67 , PSQKey nid
49 , PSQKey ni 68 , PSQKey ni
50 ) => 69 ) =>
70 {-
51 KademliaSpace nid ni 71 KademliaSpace nid ni
52 -> (ni -> addr) 72 -> (ni -> addr)
53 -> (ni -> IO ([ni], [r])) 73 -> (ni -> IO ([ni], [r])) -- the query action.
54 -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) 74 -> (r -> STM Bool) -- receives search results.
55newSearch space nAddr qry target ns = atomically $ do 75 -> nid -- target of search
76 -}
77 Search nid addr ni r
78 -> nid
79 -> [ni] -- Initial nodes to query.
80 -> IO (SearchState nid addr ni r)
81newSearch (Search space nAddr qry) target ns = atomically $ do
56 c <- newTVar 0 82 c <- newTVar 0
57 q <- newTVar $ MM.fromList 83 q <- newTVar $ MM.fromList
58 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 84 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
59 $ ns 85 $ ns
60 i <- newTVar MM.empty 86 i <- newTVar MM.empty
61 v <- newTVar Set.empty 87 v <- newTVar Set.empty
62 r <- newTVar Set.empty 88 return -- (Search space nAddr qry) , r , target
63 return $ IterativeSearch target space nAddr qry c q i v r 89 ( SearchState c q i v )
64 90
65searchAlpha :: Int 91searchAlpha :: Int
66searchAlpha = 3 92searchAlpha = 3
@@ -74,10 +100,13 @@ sendQuery :: forall addr nid ni r.
74 , PSQKey nid 100 , PSQKey nid
75 , PSQKey ni 101 , PSQKey ni
76 ) => 102 ) =>
77 IterativeSearch nid addr ni r 103 Search nid addr ni r
104 -> nid
105 -> (r -> STM Bool)
106 -> SearchState nid addr ni r
78 -> Binding ni nid 107 -> Binding ni nid
79 -> IO () 108 -> IO ()
80sendQuery IterativeSearch{..} (ni :-> d) = do 109sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do
81 (ns,rs) <- handle (\(SomeException e) -> return ([],[])) 110 (ns,rs) <- handle (\(SomeException e) -> return ([],[]))
82 (searchQuery ni) 111 (searchQuery ni)
83 atomically $ do 112 atomically $ do
@@ -95,14 +124,19 @@ sendQuery IterativeSearch{..} (ni :-> d) = do
95 q 124 q
96 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns 125 modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns
97 modifyTVar searchInformant $ MM.insertTake searchK ni d 126 modifyTVar searchInformant $ MM.insertTake searchK ni d
98 modifyTVar searchResults $ \s -> foldr Set.insert s rs 127 flip fix rs $ \loop -> \case
128 r:rs' -> do
129 wanting <- searchResult r
130 if wanting then loop rs'
131 else searchCancel sch
132 [] -> return ()
99 133
100 134
101searchIsFinished :: ( Ord addr 135searchIsFinished :: ( Ord addr
102 , PSQKey nid 136 , PSQKey nid
103 , PSQKey ni 137 , PSQKey ni
104 ) => IterativeSearch nid addr ni r -> STM Bool 138 ) => SearchState nid addr ni r -> STM Bool
105searchIsFinished IterativeSearch{..} = do 139searchIsFinished SearchState{ ..} = do
106 q <- readTVar searchQueued 140 q <- readTVar searchQueued
107 cnt <- readTVar searchPendingCount 141 cnt <- readTVar searchPendingCount
108 informants <- readTVar searchInformant 142 informants <- readTVar searchInformant
@@ -112,8 +146,8 @@ searchIsFinished IterativeSearch{..} = do
112 && ( PSQ.prio (fromJust $ MM.findMax informants) 146 && ( PSQ.prio (fromJust $ MM.findMax informants)
113 <= PSQ.prio (fromJust $ MM.findMin q)))) 147 <= PSQ.prio (fromJust $ MM.findMin q))))
114 148
115searchCancel :: IterativeSearch nid addr ni r -> IO () 149searchCancel :: SearchState nid addr ni r -> STM ()
116searchCancel IterativeSearch{..} = atomically $ do 150searchCancel SearchState{..} = do
117 writeTVar searchPendingCount 0 151 writeTVar searchPendingCount 0
118 writeTVar searchQueued MM.empty 152 writeTVar searchQueued MM.empty
119 153
@@ -122,8 +156,10 @@ search ::
122 , Ord addr 156 , Ord addr
123 , PSQKey nid 157 , PSQKey nid
124 , PSQKey ni 158 , PSQKey ni
125 ) => IterativeSearch nid addr ni r -> IO () 159 ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO ()
126search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do 160search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do
161 let ns = R.kclosest searchSpace searchK target buckets
162 s@SearchState{..} <- newSearch sch target ns
127 fix $ \again -> do 163 fix $ \again -> do
128 join $ atomically $ do 164 join $ atomically $ do
129 cnt <- readTVar $ searchPendingCount 165 cnt <- readTVar $ searchPendingCount
@@ -141,7 +177,7 @@ search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do
141 do writeTVar searchQueued q 177 do writeTVar searchQueued q
142 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 178 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
143 modifyTVar searchPendingCount succ 179 modifyTVar searchPendingCount succ
144 return $ withAsync g (sendQuery s (ni :-> d)) (const again) 180 return $ withAsync g (sendQuery sch target result s (ni :-> d)) (const again)
145 _ -> -- Otherwise, we are finished. 181 _ -> -- Otherwise, we are finished.
146 do check (cnt == 0) 182 do check (cnt == 0)
147 return $ return () 183 return $ return ()
diff --git a/src/Network/DHT/Routing.hs b/src/Network/DHT/Routing.hs
index 396c4b1d..46ebe472 100644
--- a/src/Network/DHT/Routing.hs
+++ b/src/Network/DHT/Routing.hs
@@ -536,19 +536,31 @@ depth = L.length . shape
536#endif 536#endif
537 537
538lookupBucket :: forall ni nid x. 538lookupBucket :: forall ni nid x.
539 ( FiniteBits nid 539 ( -- FiniteBits nid
540 , Ord nid 540 Ord nid
541 ) => (ni -> nid) -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x 541 ) => KademliaSpace nid ni -> nid -> (forall s. Reifies s (Compare ni) => [Bucket s ni] -> x) -> BucketList ni -> x
542lookupBucket nodeId nid kont (BucketList self _ bkts) = kont $ go 0 [] bkts 542lookupBucket space nid kont (BucketList self _ bkts) = kont $ go 0 [] bkts
543 where 543 where
544 d = nid `xor` nodeId self 544 d = kademliaXor space nid (kademliaLocation space self)
545 545
546 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni] 546 go :: Word -> [Bucket s ni] -> [Bucket s ni] -> [Bucket s ni]
547 go i bs (bucket : buckets) 547 go i bs (bucket : buckets)
548 | testIdBit d i = go (succ i) (bucket:bs) buckets 548 | kademliaTestBit space d i = bucket : buckets ++ bs
549 | otherwise = bucket : buckets ++ bs 549 | otherwise = go (succ i) (bucket:bs) buckets
550 go _ bs [] = bs 550 go _ bs [] = bs
551 551
552bucketNumber :: forall ni nid.
553 KademliaSpace nid ni -> nid -> BucketList ni -> Int
554bucketNumber space nid (BucketList self _ bkts) = fromIntegral $ go 0 bkts
555 where
556 d = kademliaXor space nid (kademliaLocation space self)
557
558 go :: Word -> [Bucket s ni] -> Word
559 go i (bucket : buckets)
560 | kademliaTestBit space d i = i
561 | otherwise = go (succ i) buckets
562 go i [] = i
563
552 564
553compatibleNodeId :: forall ni nid. 565compatibleNodeId :: forall ni nid.
554 ( Serialize nid, FiniteBits nid) => 566 ( Serialize nid, FiniteBits nid) =>
@@ -614,23 +626,23 @@ distance :: Bits nid => nid -> nid -> NodeDistance nid
614distance a b = NodeDistance $ xor a b 626distance a b = NodeDistance $ xor a b
615 627
616-- | Order by closeness: nearest nodes first. 628-- | Order by closeness: nearest nodes first.
617rank :: ( FiniteBits nid 629rank :: ( Ord nid
618 , Ord nid 630 ) => KademliaSpace nid ni -> nid -> [ni] -> [ni]
619 ) => (x -> nid) -> nid -> [x] -> [x] 631rank space nid = L.sortBy (comparing (kademliaXor space nid . kademliaLocation space))
620rank f nid = L.sortBy (comparing (distance nid . f))
621 632
622 633
623-- | Get a list of /K/ closest nodes using XOR metric. Used in 634-- | Get a list of /K/ closest nodes using XOR metric. Used in
624-- 'find_node' and 'get_peers' queries. 635-- 'find_node' and 'get_peers' queries.
625kclosest :: ( FiniteBits nid 636kclosest :: ( -- FiniteBits nid
626 , Ord nid 637 Ord nid
627 ) => (ni -> nid) -> Int -> nid -> BucketList ni -> [ni] 638 ) =>
628kclosest nodeId k nid tbl = take k $ rank nodeId nid (L.concat bucket) 639 KademliaSpace nid ni -> Int -> nid -> BucketList ni -> [ni]
629 ++ rank nodeId nid (L.concat everyone) 640kclosest space k nid tbl = take k $ rank space nid (L.concat bucket)
641 ++ rank space nid (L.concat everyone)
630 where 642 where
631 (bucket,everyone) = 643 (bucket,everyone) =
632 L.splitAt 1 644 L.splitAt 1
633 . lookupBucket nodeId nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes)) 645 . lookupBucket space nid (L.map (coerce . L.map PSQ.key . PSQ.toList . bktNodes))
634 $ tbl 646 $ tbl
635 647
636 648