summaryrefslogtreecommitdiff
path: root/kad/src/Network/Kademlia/Search.hs
diff options
context:
space:
mode:
Diffstat (limited to 'kad/src/Network/Kademlia/Search.hs')
-rw-r--r--kad/src/Network/Kademlia/Search.hs76
1 files changed, 45 insertions, 31 deletions
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index 8d9c997b..e30c40da 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -19,6 +19,8 @@ import Control.Concurrent.Tasks
19import Control.Concurrent.STM 19import Control.Concurrent.STM
20import Control.Monad 20import Control.Monad
21import Data.Function 21import Data.Function
22import qualified Data.Map.Strict as Map
23 ;import Data.Map.Strict (Map)
22import Data.Maybe 24import Data.Maybe
23import qualified Data.Set as Set 25import qualified Data.Set as Set
24 ;import Data.Set (Set) 26 ;import Data.Set (Set)
@@ -38,10 +40,11 @@ import Control.Concurrent.Lifted
38import GHC.Conc (labelThread) 40import GHC.Conc (labelThread)
39#endif 41#endif
40 42
41data Search nid addr tok ni r = Search 43data Search nid addr tok ni r qk = Search
42 { searchSpace :: KademliaSpace nid ni 44 { searchSpace :: KademliaSpace nid ni
43 , searchNodeAddress :: ni -> addr 45 , searchNodeAddress :: ni -> addr
44 , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) 46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk
47 , searchQueryCancel :: qk -> STM ()
45 , searchAlpha :: Int -- α = 8 48 , searchAlpha :: Int -- α = 8
46 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on 49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
47 -- how fast the queries are. For Tox's much slower onion-routed queries, we 50 -- how fast the queries are. For Tox's much slower onion-routed queries, we
@@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search
55 , searchK :: Int -- K = 16 58 , searchK :: Int -- K = 16
56 } 59 }
57 60
58data SearchState nid addr tok ni r = SearchState 61data SearchState nid addr tok ni r qk = SearchState
59 { -- | The number of pending queries. Incremented before any query is sent 62 { -- | The number of pending queries. Incremented before any query is sent
60 -- and decremented when we get a reply. 63 -- and decremented when we get a reply.
61 searchPendingCount :: TVar Int 64 searchPendingCount :: TVar Int
65 , searchPending :: TVar (Map qk (STM ()))
62 -- | Nodes scheduled to be queried (roughly at most K). 66 -- | Nodes scheduled to be queried (roughly at most K).
63 -- 67 --
64 -- This will be set to Nothing when a search is canceled. 68 -- This will be set to Nothing when a search is canceled.
@@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState
72 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha 76 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
73 -- should limit the number of outstanding queries. 77 -- should limit the number of outstanding queries.
74 , searchVisited :: TVar (Set addr) 78 , searchVisited :: TVar (Set addr)
75 , searchSpec :: Search nid addr tok ni r 79 , searchSpec :: Search nid addr tok ni r qk
76 } 80 }
77 81
78 82
@@ -87,12 +91,13 @@ newSearch :: ( Ord addr
87 -> (r -> STM Bool) -- receives search results. 91 -> (r -> STM Bool) -- receives search results.
88 -> nid -- target of search 92 -> nid -- target of search
89 -} 93 -}
90 Search nid addr tok ni r 94 Search nid addr tok ni r qk
91 -> nid 95 -> nid
92 -> [ni] -- Initial nodes to query. 96 -> [ni] -- Initial nodes to query.
93 -> STM (SearchState nid addr tok ni r) 97 -> STM (SearchState nid addr tok ni r qk)
94newSearch s@(Search space nAddr qry _ _) target ns = do 98newSearch s@(Search space nAddr qry _ _ _) target ns = do
95 c <- newTVar 0 99 c <- newTVar 0
100 p <- newTVar Map.empty
96 q <- newTVar $ Just 101 q <- newTVar $ Just
97 $ MM.fromList 102 $ MM.fromList
98 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 103 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
@@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do
100 i <- newTVar MM.empty 105 i <- newTVar MM.empty
101 v <- newTVar Set.empty 106 v <- newTVar Set.empty
102 return -- (Search space nAddr qry) , r , target 107 return -- (Search space nAddr qry) , r , target
103 ( SearchState c q i v s ) 108 ( SearchState c p q i v s )
104 109
105-- | Discard a value from a key-priority-value tuple. This is useful for 110-- | Discard a value from a key-priority-value tuple. This is useful for
106-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". 111-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
@@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid)
110-- | Reset a 'SearchState' object to ready it for a repeated search. 115-- | Reset a 'SearchState' object to ready it for a repeated search.
111reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => 116reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
112 (nid -> STM [ni]) 117 (nid -> STM [ni])
113 -> Search nid addr1 tok1 ni r1 118 -> Search nid addr1 tok1 ni r1 qk
114 -> nid 119 -> nid
115 -> SearchState nid addr tok ni r 120 -> SearchState nid addr tok ni r qk
116 -> STM (SearchState nid addr tok ni r) 121 -> STM (SearchState nid addr tok ni r qk)
117reset nearestNodes qsearch target st = do 122reset nearestNodes qsearch target st = do
123 pc <- readTVar (searchPendingCount st)
124 check (pc == 0)
118 searchIsFinished st >>= check -- Wait for a search to finish before resetting. 125 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
119 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 126 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
120 <$> nearestNodes target 127 <$> nearestNodes target
@@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do
125 writeTVar (searchPendingCount st) 0 132 writeTVar (searchPendingCount st) 0
126 return st 133 return st
127 134
128sendQuery :: forall addr nid tok ni r. 135grokQuery :: forall addr nid tok ni r qk.
129 ( Ord addr 136 ( Ord addr
130 , PSQKey nid 137 , PSQKey nid
131 , PSQKey ni 138 , PSQKey ni
132 , Show nid 139 , Show nid
140 , Ord qk
133 ) => 141 ) =>
134 Search nid addr tok ni r 142 Search nid addr tok ni r qk
135 -> nid 143 -> nid
136 -> (r -> STM Bool) -- ^ return False to terminate the search. 144 -> (r -> STM Bool) -- ^ return False to terminate the search.
137 -> SearchState nid addr tok ni r 145 -> SearchState nid addr tok ni r qk
138 -> Binding ni nid 146 -> Binding ni nid
147 -> qk
148 -> Result ([ni],[r],Maybe tok)
139 -> IO () 149 -> IO ()
140sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 150grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do
141 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) 151 -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
142 reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) 152 -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing)
143 -- (ns,rs) 153 -- (ns,rs)
144 let tok = error "TODO: token" 154 -- let tok = error "TODO: token"
145 atomically $ do 155 atomically $ do
146 modifyTVar searchPendingCount pred 156 modifyTVar' searchPendingCount pred
157 modifyTVar' searchPending $ Map.delete qk
147 case reply of 158 case reply of
148 Success x -> go x 159 Success x -> go x
149 _ -> return () 160 _ -> return ()
@@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
170 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d 181 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
171 flip fix rs $ \loop -> \case 182 flip fix rs $ \loop -> \case
172 r:rs' -> do 183 r:rs' -> do
173 wanting <- searchResult r 184 wanting <- withSearchResult r
174 if wanting then loop rs' 185 if wanting then loop rs'
175 else searchCancel sch 186 else searchCancel sch
176 [] -> return () 187 [] -> return ()
@@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
178 189
179searchIsFinished :: ( PSQKey nid 190searchIsFinished :: ( PSQKey nid
180 , PSQKey ni 191 , PSQKey ni
181 ) => SearchState nid addr tok ni r -> STM Bool 192 ) => SearchState nid addr tok ni r qk -> STM Bool
182searchIsFinished SearchState{..} = do 193searchIsFinished SearchState{..} = do
183 readTVar searchQueued >>= \case 194 readTVar searchQueued >>= \case
184 Just q -> do 195 Just q -> do
@@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do
191 <= PSQ.prio (fromJust $ MM.findMin q)))) 202 <= PSQ.prio (fromJust $ MM.findMin q))))
192 Nothing -> return True 203 Nothing -> return True
193 204
194searchCancel :: SearchState nid addr tok ni r -> STM () 205searchCancel :: SearchState nid addr tok ni r qk -> STM ()
195searchCancel SearchState{..} = do 206searchCancel SearchState{..} = do
196 writeTVar searchQueued Nothing 207 writeTVar searchQueued Nothing
208 m <- readTVar searchPending
209 foldr (>>) (return ()) m
197 210
198search :: 211search ::
199 ( Ord r 212 ( Ord r
@@ -201,7 +214,8 @@ search ::
201 , PSQKey nid 214 , PSQKey nid
202 , PSQKey ni 215 , PSQKey ni
203 , Show nid 216 , Show nid
204 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) 217 , Ord qk
218 ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId)
205search sch buckets target result = do 219search sch buckets target result = do
206 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets 220 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
207 st <- atomically $ newSearch sch target ns 221 st <- atomically $ newSearch sch target ns
@@ -211,11 +225,11 @@ search sch buckets target result = do
211 atomically $ writeTVar v True 225 atomically $ writeTVar v True
212 return (st,t) 226 return (st,t)
213 227
214searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) 228searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk )
215 => Search nid addr tok ni r -- ^ Query and distance methods. 229 => Search nid addr tok ni r qk -- ^ Query and distance methods.
216 -> nid -- ^ The target we are searching for. 230 -> nid -- ^ The target we are searching for.
217 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. 231 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
218 -> SearchState nid addr tok ni r -- ^ Search-related state. 232 -> SearchState nid addr tok ni r qk -- ^ Search-related state.
219 -> IO () 233 -> IO ()
220searchLoop sch@Search{..} target result s@SearchState{..} = do 234searchLoop sch@Search{..} target result s@SearchState{..} = do
221 myThreadId >>= flip labelThread ("search."++show target) 235 myThreadId >>= flip labelThread ("search."++show target)
@@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
240 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 254 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
241 modifyTVar searchPendingCount succ 255 modifyTVar searchPendingCount succ
242 return $ do 256 return $ do
243 forkTask g 257 qk <- searchQuery target ni $
244 "searchQuery" 258 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply
245 $ sendQuery sch target result s (ni :-> d) 259 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk)
246 again 260 again
247 _ -> searchIsFinished s >>= check >> return (return ()) 261 _ -> searchIsFinished s >>= check >> return (return ())