diff options
author | joe <joe@jerkface.net> | 2017-07-22 01:15:44 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-07-22 01:15:44 -0400 |
commit | 77f6b96492223e7d7b147dac8d026e0b6f6a651b (patch) | |
tree | 661e2115a814de82ba251bccf0ab21ae4dfd1ff1 /src/Network/BitTorrent/DHT/Search.hs | |
parent | 7f1eb53d34ea6dda02cae1934b5011e38de248a6 (diff) |
Implemented bucket refresh for Mainline.
Diffstat (limited to 'src/Network/BitTorrent/DHT/Search.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 78 |
1 files changed, 57 insertions, 21 deletions
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index c562b988..a9efba89 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -3,6 +3,7 @@ | |||
3 | {-# LANGUAGE RecordWildCards #-} | 3 | {-# LANGUAGE RecordWildCards #-} |
4 | {-# LANGUAGE ScopedTypeVariables #-} | 4 | {-# LANGUAGE ScopedTypeVariables #-} |
5 | {-# LANGUAGE FlexibleContexts #-} | 5 | {-# LANGUAGE FlexibleContexts #-} |
6 | {-# LANGUAGE LambdaCase #-} | ||
6 | module Network.BitTorrent.DHT.Search where | 7 | module Network.BitTorrent.DHT.Search where |
7 | 8 | ||
8 | import Control.Concurrent | 9 | import Control.Concurrent |
@@ -28,39 +29,64 @@ import Network.Address hiding (NodeId) | |||
28 | import Network.DatagramServer.Types | 29 | import Network.DatagramServer.Types |
29 | import Network.DHT.Routing as R | 30 | import Network.DHT.Routing as R |
30 | 31 | ||
31 | data IterativeSearch nid addr ni r = IterativeSearch | 32 | data Search nid addr ni r = Search |
32 | { searchTarget :: nid | 33 | { searchSpace :: KademliaSpace nid ni |
33 | , searchSpace :: KademliaSpace nid ni | ||
34 | , searchNodeAddress :: ni -> addr | 34 | , searchNodeAddress :: ni -> addr |
35 | , searchQuery :: ni -> IO ([ni], [r]) | 35 | , searchQuery :: ni -> IO ([ni], [r]) |
36 | -- | The number of pending queries. Incremented before any query is sent | 36 | } |
37 | |||
38 | data SearchState nid addr ni r = SearchState | ||
39 | {- | ||
40 | { searchParams :: Search nid addr ni r | ||
41 | |||
42 | , searchTarget :: nid | ||
43 | -- | This action will be performed at least once on each search result. | ||
44 | -- It may be invoked multiple times since different nodes may report the | ||
45 | -- same result. If the action returns 'False', the search will be | ||
46 | -- aborted, otherwise it will continue until it is decided that we've | ||
47 | -- asked the closest K nodes to the target. | ||
48 | , searchResult :: r -> STM Bool | ||
49 | |||
50 | -} | ||
51 | |||
52 | { -- | The number of pending queries. Incremented before any query is sent | ||
37 | -- and decremented when we get a reply. | 53 | -- and decremented when we get a reply. |
38 | , searchPendingCount :: TVar Int | 54 | searchPendingCount :: TVar Int |
39 | -- | Nodes scheduled to be queried. | 55 | -- | Nodes scheduled to be queried. |
40 | , searchQueued :: TVar (MinMaxPSQ ni nid) | 56 | , searchQueued :: TVar (MinMaxPSQ ni nid) |
41 | -- | The nearest K nodes that issued a reply. | 57 | -- | The nearest K nodes that issued a reply. |
42 | , searchInformant :: TVar (MinMaxPSQ ni nid) | 58 | , searchInformant :: TVar (MinMaxPSQ ni nid) |
59 | -- | This tracks already-queried addresses so we avoid bothering them | ||
60 | -- again. XXX: We could probably keep only the pending queries in this | ||
61 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | ||
62 | -- should limit the number of outstanding queries. | ||
43 | , searchVisited :: TVar (Set addr) | 63 | , searchVisited :: TVar (Set addr) |
44 | , searchResults :: TVar (Set r) | ||
45 | } | 64 | } |
46 | 65 | ||
47 | newSearch :: ( Ord addr | 66 | newSearch :: ( Ord addr |
48 | , PSQKey nid | 67 | , PSQKey nid |
49 | , PSQKey ni | 68 | , PSQKey ni |
50 | ) => | 69 | ) => |
70 | {- | ||
51 | KademliaSpace nid ni | 71 | KademliaSpace nid ni |
52 | -> (ni -> addr) | 72 | -> (ni -> addr) |
53 | -> (ni -> IO ([ni], [r])) | 73 | -> (ni -> IO ([ni], [r])) -- the query action. |
54 | -> nid -> [ni] -> IO (IterativeSearch nid addr ni r) | 74 | -> (r -> STM Bool) -- receives search results. |
55 | newSearch space nAddr qry target ns = atomically $ do | 75 | -> nid -- target of search |
76 | -} | ||
77 | Search nid addr ni r | ||
78 | -> nid | ||
79 | -> [ni] -- Initial nodes to query. | ||
80 | -> IO (SearchState nid addr ni r) | ||
81 | newSearch (Search space nAddr qry) target ns = atomically $ do | ||
56 | c <- newTVar 0 | 82 | c <- newTVar 0 |
57 | q <- newTVar $ MM.fromList | 83 | q <- newTVar $ MM.fromList |
58 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | 84 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) |
59 | $ ns | 85 | $ ns |
60 | i <- newTVar MM.empty | 86 | i <- newTVar MM.empty |
61 | v <- newTVar Set.empty | 87 | v <- newTVar Set.empty |
62 | r <- newTVar Set.empty | 88 | return -- (Search space nAddr qry) , r , target |
63 | return $ IterativeSearch target space nAddr qry c q i v r | 89 | ( SearchState c q i v ) |
64 | 90 | ||
65 | searchAlpha :: Int | 91 | searchAlpha :: Int |
66 | searchAlpha = 3 | 92 | searchAlpha = 3 |
@@ -74,10 +100,13 @@ sendQuery :: forall addr nid ni r. | |||
74 | , PSQKey nid | 100 | , PSQKey nid |
75 | , PSQKey ni | 101 | , PSQKey ni |
76 | ) => | 102 | ) => |
77 | IterativeSearch nid addr ni r | 103 | Search nid addr ni r |
104 | -> nid | ||
105 | -> (r -> STM Bool) | ||
106 | -> SearchState nid addr ni r | ||
78 | -> Binding ni nid | 107 | -> Binding ni nid |
79 | -> IO () | 108 | -> IO () |
80 | sendQuery IterativeSearch{..} (ni :-> d) = do | 109 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do |
81 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) | 110 | (ns,rs) <- handle (\(SomeException e) -> return ([],[])) |
82 | (searchQuery ni) | 111 | (searchQuery ni) |
83 | atomically $ do | 112 | atomically $ do |
@@ -95,14 +124,19 @@ sendQuery IterativeSearch{..} (ni :-> d) = do | |||
95 | q | 124 | q |
96 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns | 125 | modifyTVar searchQueued $ \q -> foldr insertFoundNode q ns |
97 | modifyTVar searchInformant $ MM.insertTake searchK ni d | 126 | modifyTVar searchInformant $ MM.insertTake searchK ni d |
98 | modifyTVar searchResults $ \s -> foldr Set.insert s rs | 127 | flip fix rs $ \loop -> \case |
128 | r:rs' -> do | ||
129 | wanting <- searchResult r | ||
130 | if wanting then loop rs' | ||
131 | else searchCancel sch | ||
132 | [] -> return () | ||
99 | 133 | ||
100 | 134 | ||
101 | searchIsFinished :: ( Ord addr | 135 | searchIsFinished :: ( Ord addr |
102 | , PSQKey nid | 136 | , PSQKey nid |
103 | , PSQKey ni | 137 | , PSQKey ni |
104 | ) => IterativeSearch nid addr ni r -> STM Bool | 138 | ) => SearchState nid addr ni r -> STM Bool |
105 | searchIsFinished IterativeSearch{..} = do | 139 | searchIsFinished SearchState{ ..} = do |
106 | q <- readTVar searchQueued | 140 | q <- readTVar searchQueued |
107 | cnt <- readTVar searchPendingCount | 141 | cnt <- readTVar searchPendingCount |
108 | informants <- readTVar searchInformant | 142 | informants <- readTVar searchInformant |
@@ -112,8 +146,8 @@ searchIsFinished IterativeSearch{..} = do | |||
112 | && ( PSQ.prio (fromJust $ MM.findMax informants) | 146 | && ( PSQ.prio (fromJust $ MM.findMax informants) |
113 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 147 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
114 | 148 | ||
115 | searchCancel :: IterativeSearch nid addr ni r -> IO () | 149 | searchCancel :: SearchState nid addr ni r -> STM () |
116 | searchCancel IterativeSearch{..} = atomically $ do | 150 | searchCancel SearchState{..} = do |
117 | writeTVar searchPendingCount 0 | 151 | writeTVar searchPendingCount 0 |
118 | writeTVar searchQueued MM.empty | 152 | writeTVar searchQueued MM.empty |
119 | 153 | ||
@@ -122,8 +156,10 @@ search :: | |||
122 | , Ord addr | 156 | , Ord addr |
123 | , PSQKey nid | 157 | , PSQKey nid |
124 | , PSQKey ni | 158 | , PSQKey ni |
125 | ) => IterativeSearch nid addr ni r -> IO () | 159 | ) => Search nid addr ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO () |
126 | search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | 160 | search sch@Search{..} buckets target result = withTaskGroup searchAlpha $ \g -> do |
161 | let ns = R.kclosest searchSpace searchK target buckets | ||
162 | s@SearchState{..} <- newSearch sch target ns | ||
127 | fix $ \again -> do | 163 | fix $ \again -> do |
128 | join $ atomically $ do | 164 | join $ atomically $ do |
129 | cnt <- readTVar $ searchPendingCount | 165 | cnt <- readTVar $ searchPendingCount |
@@ -141,7 +177,7 @@ search s@IterativeSearch{..} = withTaskGroup searchAlpha $ \g -> do | |||
141 | do writeTVar searchQueued q | 177 | do writeTVar searchQueued q |
142 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 178 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |
143 | modifyTVar searchPendingCount succ | 179 | modifyTVar searchPendingCount succ |
144 | return $ withAsync g (sendQuery s (ni :-> d)) (const again) | 180 | return $ withAsync g (sendQuery sch target result s (ni :-> d)) (const again) |
145 | _ -> -- Otherwise, we are finished. | 181 | _ -> -- Otherwise, we are finished. |
146 | do check (cnt == 0) | 182 | do check (cnt == 0) |
147 | return $ return () | 183 | return $ return () |