diff options
Diffstat (limited to 'kad/src/Network/Kademlia/Search.hs')
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs new file mode 100644 index 00000000..1be1afc1 --- /dev/null +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -0,0 +1,236 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE PatternSynonyms #-} | ||
3 | {-# LANGUAGE RecordWildCards #-} | ||
4 | {-# LANGUAGE ScopedTypeVariables #-} | ||
5 | {-# LANGUAGE FlexibleContexts #-} | ||
6 | {-# LANGUAGE LambdaCase #-} | ||
7 | module Network.Kademlia.Search where | ||
8 | |||
9 | import Control.Concurrent.Tasks | ||
10 | import Control.Concurrent.STM | ||
11 | import Control.Monad | ||
12 | import Data.Function | ||
13 | import Data.Maybe | ||
14 | import qualified Data.Set as Set | ||
15 | ;import Data.Set (Set) | ||
16 | import Data.Hashable (Hashable(..)) -- for type sigs | ||
17 | import System.IO.Error | ||
18 | |||
19 | import qualified Data.MinMaxPSQ as MM | ||
20 | ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') | ||
21 | import qualified Data.Wrapper.PSQ as PSQ | ||
22 | ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) | ||
23 | import Network.Kademlia.Routing as R | ||
24 | #ifdef THREAD_DEBUG | ||
25 | import Control.Concurrent.Lifted.Instrument | ||
26 | #else | ||
27 | import Control.Concurrent.Lifted | ||
28 | import GHC.Conc (labelThread) | ||
29 | #endif | ||
30 | |||
31 | data Search nid addr tok ni r = Search | ||
32 | { searchSpace :: KademliaSpace nid ni | ||
33 | , searchNodeAddress :: ni -> addr | ||
34 | , searchQuery :: Either (nid -> ni -> IO (Maybe ([ni], [r], Maybe tok))) | ||
35 | (nid -> ni -> (Maybe ([ni],[r],Maybe tok) -> IO ()) -> IO ()) | ||
36 | , searchAlpha :: Int -- α = 8 | ||
37 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | ||
38 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | ||
39 | -- need to ensure that closer non-responding queries don't completely push out | ||
40 | -- farther away queries. | ||
41 | -- | ||
42 | -- For BitTorrent, setting them both 8 was not an issue, but that is no longer | ||
43 | -- supported because now the number of remembered informants is now the | ||
44 | -- difference between these two numbers. So, if searchK = 16 and searchAlpha = | ||
45 | -- 4, then the number of remembered query responses is 12. | ||
46 | , searchK :: Int -- K = 16 | ||
47 | } | ||
48 | |||
49 | data SearchState nid addr tok ni r = SearchState | ||
50 | { -- | The number of pending queries. Incremented before any query is sent | ||
51 | -- and decremented when we get a reply. | ||
52 | searchPendingCount :: TVar Int | ||
53 | -- | Nodes scheduled to be queried (roughly at most K). | ||
54 | , searchQueued :: TVar (MinMaxPSQ ni nid) | ||
55 | -- | The nearest (K - α) nodes that issued a reply. | ||
56 | -- | ||
57 | -- α is the maximum number of simultaneous queries. | ||
58 | , searchInformant :: TVar (MinMaxPSQ' ni nid (Maybe tok)) | ||
59 | -- | This tracks already-queried addresses so we avoid bothering them | ||
60 | -- again. XXX: We could probably keep only the pending queries in this | ||
61 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | ||
62 | -- should limit the number of outstanding queries. | ||
63 | , searchVisited :: TVar (Set addr) | ||
64 | , searchSpec :: Search nid addr tok ni r | ||
65 | } | ||
66 | |||
67 | |||
68 | newSearch :: ( Ord addr | ||
69 | , PSQKey nid | ||
70 | , PSQKey ni | ||
71 | ) => | ||
72 | {- | ||
73 | KademliaSpace nid ni | ||
74 | -> (ni -> addr) | ||
75 | -> (ni -> IO ([ni], [r])) -- the query action. | ||
76 | -> (r -> STM Bool) -- receives search results. | ||
77 | -> nid -- target of search | ||
78 | -} | ||
79 | Search nid addr tok ni r | ||
80 | -> nid | ||
81 | -> [ni] -- Initial nodes to query. | ||
82 | -> STM (SearchState nid addr tok ni r) | ||
83 | newSearch s@(Search space nAddr qry _ _) target ns = do | ||
84 | c <- newTVar 0 | ||
85 | q <- newTVar $ MM.fromList | ||
86 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | ||
87 | $ ns | ||
88 | i <- newTVar MM.empty | ||
89 | v <- newTVar Set.empty | ||
90 | return -- (Search space nAddr qry) , r , target | ||
91 | ( SearchState c q i v s ) | ||
92 | |||
93 | -- | Discard a value from a key-priority-value tuple. This is useful for | ||
94 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". | ||
95 | stripValue :: Binding' k p v -> Binding k p | ||
96 | stripValue (Binding ni _ nid) = (ni :-> nid) | ||
97 | |||
98 | -- | Reset a 'SearchState' object to ready it for a repeated search. | ||
99 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | ||
100 | (nid -> STM [ni]) | ||
101 | -> Search nid addr1 tok1 ni r1 | ||
102 | -> nid | ||
103 | -> SearchState nid addr tok ni r | ||
104 | -> STM (SearchState nid addr tok ni r) | ||
105 | reset nearestNodes qsearch target st = do | ||
106 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | ||
107 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | ||
108 | <$> nearestNodes target | ||
109 | priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) | ||
110 | writeTVar (searchQueued st) $ MM.fromList $ priorInformants ++ bktNodes | ||
111 | writeTVar (searchInformant st) MM.empty | ||
112 | writeTVar (searchVisited st) Set.empty | ||
113 | writeTVar (searchPendingCount st) 0 | ||
114 | return st | ||
115 | |||
116 | sendAsyncQuery :: forall addr nid tok ni r. | ||
117 | ( Ord addr | ||
118 | , PSQKey nid | ||
119 | , PSQKey ni | ||
120 | , Show nid | ||
121 | ) => | ||
122 | Search nid addr tok ni r | ||
123 | -> nid | ||
124 | -> (r -> STM Bool) -- ^ return False to terminate the search. | ||
125 | -> SearchState nid addr tok ni r | ||
126 | -> Binding ni nid | ||
127 | -> TaskGroup | ||
128 | -> IO () | ||
129 | sendAsyncQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) g = | ||
130 | case searchQuery of | ||
131 | Left blockingQuery -> | ||
132 | forkTask g "searchQuery" $ do | ||
133 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | ||
134 | reply <- blockingQuery searchTarget ni `catchIOError` const (return Nothing) | ||
135 | atomically $ do | ||
136 | modifyTVar searchPendingCount pred | ||
137 | maybe (return ()) go reply | ||
138 | Right nonblockingQuery -> do | ||
139 | nonblockingQuery searchTarget ni $ \reply -> | ||
140 | atomically $ do | ||
141 | modifyTVar searchPendingCount pred | ||
142 | maybe (return ()) go reply | ||
143 | where | ||
144 | go (ns,rs,tok) = do | ||
145 | vs <- readTVar searchVisited | ||
146 | -- We only queue a node if it is not yet visited | ||
147 | let insertFoundNode :: Int | ||
148 | -> ni | ||
149 | -> MinMaxPSQ ni nid | ||
150 | -> MinMaxPSQ ni nid | ||
151 | insertFoundNode k n q | ||
152 | | searchNodeAddress n `Set.member` vs | ||
153 | = q | ||
154 | | otherwise = MM.insertTake k n ( kademliaXor searchSpace searchTarget | ||
155 | $ kademliaLocation searchSpace n ) | ||
156 | q | ||
157 | |||
158 | qsize0 <- MM.size <$> readTVar searchQueued | ||
159 | let qsize = if qsize0 < searchK then searchK else qsize0 -- Allow searchQueued to grow | ||
160 | -- only when there's fewer than | ||
161 | -- K elements. | ||
162 | modifyTVar searchQueued $ \q -> foldr (insertFoundNode qsize) q ns | ||
163 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | ||
164 | flip fix rs $ \loop -> \case | ||
165 | r:rs' -> do | ||
166 | wanting <- searchResult r | ||
167 | if wanting then loop rs' | ||
168 | else searchCancel sch | ||
169 | [] -> return () | ||
170 | |||
171 | |||
172 | searchIsFinished :: ( PSQKey nid | ||
173 | , PSQKey ni | ||
174 | ) => SearchState nid addr tok ni r -> STM Bool | ||
175 | searchIsFinished SearchState{..} = do | ||
176 | q <- readTVar searchQueued | ||
177 | cnt <- readTVar searchPendingCount | ||
178 | informants <- readTVar searchInformant | ||
179 | return $ cnt == 0 | ||
180 | && ( MM.null q | ||
181 | || ( MM.size informants >= (searchK searchSpec - searchAlpha searchSpec) | ||
182 | && ( PSQ.prio (fromJust $ MM.findMax informants) | ||
183 | <= PSQ.prio (fromJust $ MM.findMin q)))) | ||
184 | |||
185 | searchCancel :: SearchState nid addr tok ni r -> STM () | ||
186 | searchCancel SearchState{..} = do | ||
187 | writeTVar searchPendingCount 0 | ||
188 | writeTVar searchQueued MM.empty | ||
189 | |||
190 | search :: | ||
191 | ( Ord r | ||
192 | , Ord addr | ||
193 | , PSQKey nid | ||
194 | , PSQKey ni | ||
195 | , Show nid | ||
196 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | ||
197 | search sch buckets target result = do | ||
198 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | ||
199 | st <- atomically $ newSearch sch target ns | ||
200 | forkIO $ searchLoop sch target result st | ||
201 | return st | ||
202 | |||
203 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | ||
204 | => Search nid addr tok ni r -- ^ Query and distance methods. | ||
205 | -> nid -- ^ The target we are searching for. | ||
206 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | ||
207 | -> SearchState nid addr tok ni r -- ^ Search-related state. | ||
208 | -> IO () | ||
209 | searchLoop sch@Search{..} target result s@SearchState{..} = do | ||
210 | myThreadId >>= flip labelThread ("search."++show target) | ||
211 | withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do | ||
212 | join $ atomically $ do | ||
213 | cnt <- readTVar $ searchPendingCount | ||
214 | check (cnt <= 8) -- Only 8 pending queries at a time. | ||
215 | informants <- readTVar searchInformant | ||
216 | found <- MM.minView <$> readTVar searchQueued | ||
217 | case found of | ||
218 | Just (ni :-> d, q) | ||
219 | | -- If there's fewer than /k - α/ informants and there's any | ||
220 | -- node we haven't yet got a response from. | ||
221 | (MM.size informants < searchK - searchAlpha) && (cnt > 0 || not (MM.null q)) | ||
222 | -- Or there's no informants yet at all. | ||
223 | || MM.null informants | ||
224 | -- Or if the closest scheduled node is nearer than the | ||
225 | -- nearest /k/ informants. | ||
226 | || (d < PSQ.prio (fromJust $ MM.findMax informants)) | ||
227 | -> -- Then the search continues, send a query. | ||
228 | do writeTVar searchQueued q | ||
229 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | ||
230 | modifyTVar searchPendingCount succ | ||
231 | return $ do | ||
232 | sendAsyncQuery sch target result s (ni :-> d) g | ||
233 | again | ||
234 | _ -> -- Otherwise, we are finished. | ||
235 | do check (cnt == 0) | ||
236 | return $ return () | ||