diff options
Diffstat (limited to 'kad/src/Network/Kademlia/Search.hs')
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 76 |
1 files changed, 45 insertions, 31 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 8d9c997b..e30c40da 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -19,6 +19,8 @@ import Control.Concurrent.Tasks | |||
19 | import Control.Concurrent.STM | 19 | import Control.Concurrent.STM |
20 | import Control.Monad | 20 | import Control.Monad |
21 | import Data.Function | 21 | import Data.Function |
22 | import qualified Data.Map.Strict as Map | ||
23 | ;import Data.Map.Strict (Map) | ||
22 | import Data.Maybe | 24 | import Data.Maybe |
23 | import qualified Data.Set as Set | 25 | import qualified Data.Set as Set |
24 | ;import Data.Set (Set) | 26 | ;import Data.Set (Set) |
@@ -38,10 +40,11 @@ import Control.Concurrent.Lifted | |||
38 | import GHC.Conc (labelThread) | 40 | import GHC.Conc (labelThread) |
39 | #endif | 41 | #endif |
40 | 42 | ||
41 | data Search nid addr tok ni r = Search | 43 | data Search nid addr tok ni r qk = Search |
42 | { searchSpace :: KademliaSpace nid ni | 44 | { searchSpace :: KademliaSpace nid ni |
43 | , searchNodeAddress :: ni -> addr | 45 | , searchNodeAddress :: ni -> addr |
44 | , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) | 46 | , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk |
47 | , searchQueryCancel :: qk -> STM () | ||
45 | , searchAlpha :: Int -- α = 8 | 48 | , searchAlpha :: Int -- α = 8 |
46 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | 49 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on |
47 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | 50 | -- how fast the queries are. For Tox's much slower onion-routed queries, we |
@@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search | |||
55 | , searchK :: Int -- K = 16 | 58 | , searchK :: Int -- K = 16 |
56 | } | 59 | } |
57 | 60 | ||
58 | data SearchState nid addr tok ni r = SearchState | 61 | data SearchState nid addr tok ni r qk = SearchState |
59 | { -- | The number of pending queries. Incremented before any query is sent | 62 | { -- | The number of pending queries. Incremented before any query is sent |
60 | -- and decremented when we get a reply. | 63 | -- and decremented when we get a reply. |
61 | searchPendingCount :: TVar Int | 64 | searchPendingCount :: TVar Int |
65 | , searchPending :: TVar (Map qk (STM ())) | ||
62 | -- | Nodes scheduled to be queried (roughly at most K). | 66 | -- | Nodes scheduled to be queried (roughly at most K). |
63 | -- | 67 | -- |
64 | -- This will be set to Nothing when a search is canceled. | 68 | -- This will be set to Nothing when a search is canceled. |
@@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState | |||
72 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | 76 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha |
73 | -- should limit the number of outstanding queries. | 77 | -- should limit the number of outstanding queries. |
74 | , searchVisited :: TVar (Set addr) | 78 | , searchVisited :: TVar (Set addr) |
75 | , searchSpec :: Search nid addr tok ni r | 79 | , searchSpec :: Search nid addr tok ni r qk |
76 | } | 80 | } |
77 | 81 | ||
78 | 82 | ||
@@ -87,12 +91,13 @@ newSearch :: ( Ord addr | |||
87 | -> (r -> STM Bool) -- receives search results. | 91 | -> (r -> STM Bool) -- receives search results. |
88 | -> nid -- target of search | 92 | -> nid -- target of search |
89 | -} | 93 | -} |
90 | Search nid addr tok ni r | 94 | Search nid addr tok ni r qk |
91 | -> nid | 95 | -> nid |
92 | -> [ni] -- Initial nodes to query. | 96 | -> [ni] -- Initial nodes to query. |
93 | -> STM (SearchState nid addr tok ni r) | 97 | -> STM (SearchState nid addr tok ni r qk) |
94 | newSearch s@(Search space nAddr qry _ _) target ns = do | 98 | newSearch s@(Search space nAddr qry _ _ _) target ns = do |
95 | c <- newTVar 0 | 99 | c <- newTVar 0 |
100 | p <- newTVar Map.empty | ||
96 | q <- newTVar $ Just | 101 | q <- newTVar $ Just |
97 | $ MM.fromList | 102 | $ MM.fromList |
98 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | 103 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) |
@@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do | |||
100 | i <- newTVar MM.empty | 105 | i <- newTVar MM.empty |
101 | v <- newTVar Set.empty | 106 | v <- newTVar Set.empty |
102 | return -- (Search space nAddr qry) , r , target | 107 | return -- (Search space nAddr qry) , r , target |
103 | ( SearchState c q i v s ) | 108 | ( SearchState c p q i v s ) |
104 | 109 | ||
105 | -- | Discard a value from a key-priority-value tuple. This is useful for | 110 | -- | Discard a value from a key-priority-value tuple. This is useful for |
106 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". | 111 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". |
@@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid) | |||
110 | -- | Reset a 'SearchState' object to ready it for a repeated search. | 115 | -- | Reset a 'SearchState' object to ready it for a repeated search. |
111 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | 116 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => |
112 | (nid -> STM [ni]) | 117 | (nid -> STM [ni]) |
113 | -> Search nid addr1 tok1 ni r1 | 118 | -> Search nid addr1 tok1 ni r1 qk |
114 | -> nid | 119 | -> nid |
115 | -> SearchState nid addr tok ni r | 120 | -> SearchState nid addr tok ni r qk |
116 | -> STM (SearchState nid addr tok ni r) | 121 | -> STM (SearchState nid addr tok ni r qk) |
117 | reset nearestNodes qsearch target st = do | 122 | reset nearestNodes qsearch target st = do |
123 | pc <- readTVar (searchPendingCount st) | ||
124 | check (pc == 0) | ||
118 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | 125 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. |
119 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 126 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
120 | <$> nearestNodes target | 127 | <$> nearestNodes target |
@@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do | |||
125 | writeTVar (searchPendingCount st) 0 | 132 | writeTVar (searchPendingCount st) 0 |
126 | return st | 133 | return st |
127 | 134 | ||
128 | sendQuery :: forall addr nid tok ni r. | 135 | grokQuery :: forall addr nid tok ni r qk. |
129 | ( Ord addr | 136 | ( Ord addr |
130 | , PSQKey nid | 137 | , PSQKey nid |
131 | , PSQKey ni | 138 | , PSQKey ni |
132 | , Show nid | 139 | , Show nid |
140 | , Ord qk | ||
133 | ) => | 141 | ) => |
134 | Search nid addr tok ni r | 142 | Search nid addr tok ni r qk |
135 | -> nid | 143 | -> nid |
136 | -> (r -> STM Bool) -- ^ return False to terminate the search. | 144 | -> (r -> STM Bool) -- ^ return False to terminate the search. |
137 | -> SearchState nid addr tok ni r | 145 | -> SearchState nid addr tok ni r qk |
138 | -> Binding ni nid | 146 | -> Binding ni nid |
147 | -> qk | ||
148 | -> Result ([ni],[r],Maybe tok) | ||
139 | -> IO () | 149 | -> IO () |
140 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do | 150 | grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do |
141 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | 151 | -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) |
142 | reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) | 152 | -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) |
143 | -- (ns,rs) | 153 | -- (ns,rs) |
144 | let tok = error "TODO: token" | 154 | -- let tok = error "TODO: token" |
145 | atomically $ do | 155 | atomically $ do |
146 | modifyTVar searchPendingCount pred | 156 | modifyTVar' searchPendingCount pred |
157 | modifyTVar' searchPending $ Map.delete qk | ||
147 | case reply of | 158 | case reply of |
148 | Success x -> go x | 159 | Success x -> go x |
149 | _ -> return () | 160 | _ -> return () |
@@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
170 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | 181 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d |
171 | flip fix rs $ \loop -> \case | 182 | flip fix rs $ \loop -> \case |
172 | r:rs' -> do | 183 | r:rs' -> do |
173 | wanting <- searchResult r | 184 | wanting <- withSearchResult r |
174 | if wanting then loop rs' | 185 | if wanting then loop rs' |
175 | else searchCancel sch | 186 | else searchCancel sch |
176 | [] -> return () | 187 | [] -> return () |
@@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
178 | 189 | ||
179 | searchIsFinished :: ( PSQKey nid | 190 | searchIsFinished :: ( PSQKey nid |
180 | , PSQKey ni | 191 | , PSQKey ni |
181 | ) => SearchState nid addr tok ni r -> STM Bool | 192 | ) => SearchState nid addr tok ni r qk -> STM Bool |
182 | searchIsFinished SearchState{..} = do | 193 | searchIsFinished SearchState{..} = do |
183 | readTVar searchQueued >>= \case | 194 | readTVar searchQueued >>= \case |
184 | Just q -> do | 195 | Just q -> do |
@@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do | |||
191 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 202 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
192 | Nothing -> return True | 203 | Nothing -> return True |
193 | 204 | ||
194 | searchCancel :: SearchState nid addr tok ni r -> STM () | 205 | searchCancel :: SearchState nid addr tok ni r qk -> STM () |
195 | searchCancel SearchState{..} = do | 206 | searchCancel SearchState{..} = do |
196 | writeTVar searchQueued Nothing | 207 | writeTVar searchQueued Nothing |
208 | m <- readTVar searchPending | ||
209 | foldr (>>) (return ()) m | ||
197 | 210 | ||
198 | search :: | 211 | search :: |
199 | ( Ord r | 212 | ( Ord r |
@@ -201,7 +214,8 @@ search :: | |||
201 | , PSQKey nid | 214 | , PSQKey nid |
202 | , PSQKey ni | 215 | , PSQKey ni |
203 | , Show nid | 216 | , Show nid |
204 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) | 217 | , Ord qk |
218 | ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId) | ||
205 | search sch buckets target result = do | 219 | search sch buckets target result = do |
206 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | 220 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets |
207 | st <- atomically $ newSearch sch target ns | 221 | st <- atomically $ newSearch sch target ns |
@@ -211,11 +225,11 @@ search sch buckets target result = do | |||
211 | atomically $ writeTVar v True | 225 | atomically $ writeTVar v True |
212 | return (st,t) | 226 | return (st,t) |
213 | 227 | ||
214 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | 228 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk ) |
215 | => Search nid addr tok ni r -- ^ Query and distance methods. | 229 | => Search nid addr tok ni r qk -- ^ Query and distance methods. |
216 | -> nid -- ^ The target we are searching for. | 230 | -> nid -- ^ The target we are searching for. |
217 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | 231 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. |
218 | -> SearchState nid addr tok ni r -- ^ Search-related state. | 232 | -> SearchState nid addr tok ni r qk -- ^ Search-related state. |
219 | -> IO () | 233 | -> IO () |
220 | searchLoop sch@Search{..} target result s@SearchState{..} = do | 234 | searchLoop sch@Search{..} target result s@SearchState{..} = do |
221 | myThreadId >>= flip labelThread ("search."++show target) | 235 | myThreadId >>= flip labelThread ("search."++show target) |
@@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
240 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 254 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |
241 | modifyTVar searchPendingCount succ | 255 | modifyTVar searchPendingCount succ |
242 | return $ do | 256 | return $ do |
243 | forkTask g | 257 | qk <- searchQuery target ni $ |
244 | "searchQuery" | 258 | \qk reply -> grokQuery sch target result s (ni :-> d) qk reply |
245 | $ sendQuery sch target result s (ni :-> d) | 259 | atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) |
246 | again | 260 | again |
247 | _ -> searchIsFinished s >>= check >> return (return ()) | 261 | _ -> searchIsFinished s >>= check >> return (return ()) |