summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2019-12-30 20:19:57 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commit5ea2de4e858cc89282561922bae257b6f9041d2e (patch)
tree075e72ee8409c315cbdb281a6faba32c18f9ab4c
parent15ab3290ad04280764968ba4760474a8c0cbfa52 (diff)
Switch to async search query design.
-rw-r--r--kad/kad.cabal2
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs36
-rw-r--r--kad/src/Network/Kademlia/CommonAPI.hs15
-rw-r--r--kad/src/Network/Kademlia/Search.hs76
-rw-r--r--kad/tests/searchCancel.hs12
5 files changed, 82 insertions, 59 deletions
diff --git a/kad/kad.cabal b/kad/kad.cabal
index 7c92f809..0483ded4 100644
--- a/kad/kad.cabal
+++ b/kad/kad.cabal
@@ -92,6 +92,6 @@ library
92 92
93executable testSearch 93executable testSearch
94 hs-source-dirs: tests 94 hs-source-dirs: tests
95 build-depends: kad, base, stm, containers, minmax-psq 95 build-depends: kad, base, stm, containers, minmax-psq, server
96 main-is: searchCancel.hs 96 main-is: searchCancel.hs
97 97
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
index c07b3c6c..bd09214f 100644
--- a/kad/src/Network/Kademlia/Bootstrap.hs
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -52,7 +52,7 @@ type SensibleNodeId nid ni =
52 , Hashable nid 52 , Hashable nid
53 , Hashable ni ) 53 , Hashable ni )
54 54
55data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher 55data BucketRefresher nid ni qk = forall tok addr. Ord addr => BucketRefresher
56 { -- | A staleness threshold (if a bucket goes this long without being 56 { -- | A staleness threshold (if a bucket goes this long without being
57 -- touched, a refresh will be triggered). 57 -- touched, a refresh will be triggered).
58 refreshInterval :: POSIXTime 58 refreshInterval :: POSIXTime
@@ -63,7 +63,7 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
63 -- priority in this priority search queue. 63 -- priority in this priority search queue.
64 , refreshQueue :: TVar (Int.PSQ POSIXTime) 64 , refreshQueue :: TVar (Int.PSQ POSIXTime)
65 -- | This is the kademlia node search specification. 65 -- | This is the kademlia node search specification.
66 , refreshSearch :: Search nid addr tok ni ni 66 , refreshSearch :: Search nid addr tok ni ni qk
67 -- | The current kademlia routing table buckets. 67 -- | The current kademlia routing table buckets.
68 , refreshBuckets :: TVar (R.BucketList ni) 68 , refreshBuckets :: TVar (R.BucketList ni)
69 -- | Action to ping a node. This is used only during initial bootstrap 69 -- | Action to ping a node. This is used only during initial bootstrap
@@ -84,9 +84,9 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
84newBucketRefresher :: ( Ord addr, Hashable addr 84newBucketRefresher :: ( Ord addr, Hashable addr
85 , SensibleNodeId nid ni ) 85 , SensibleNodeId nid ni )
86 => TVar (R.BucketList ni) 86 => TVar (R.BucketList ni)
87 -> Search nid addr tok ni ni 87 -> Search nid addr tok ni ni qk
88 -> (ni -> IO Bool) 88 -> (ni -> IO Bool)
89 -> STM (BucketRefresher nid ni) 89 -> STM (BucketRefresher nid ni qk)
90newBucketRefresher bkts sch ping = do 90newBucketRefresher bkts sch ping = do
91 let spc = searchSpace sch 91 let spc = searchSpace sch
92 nodeId = kademliaLocation spc 92 nodeId = kademliaLocation spc
@@ -112,9 +112,9 @@ newBucketRefresher bkts sch ping = do
112-- insufficiently polymorphic field" when trying to update the existentially 112-- insufficiently polymorphic field" when trying to update the existentially
113-- quantified field 'refreshSearch'. 113-- quantified field 'refreshSearch'.
114updateRefresherIO :: Ord addr 114updateRefresherIO :: Ord addr
115 => Search nid addr tok ni ni 115 => Search nid addr tok ni ni qk
116 -> (ni -> IO Bool) 116 -> (ni -> IO Bool)
117 -> BucketRefresher nid ni -> BucketRefresher nid ni 117 -> BucketRefresher nid ni qk -> BucketRefresher nid ni qk
118updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher 118updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
119 { refreshSearch = sch 119 { refreshSearch = sch
120 , refreshPing = ping 120 , refreshPing = ping
@@ -128,7 +128,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
128 } 128 }
129 129
130-- | Fork a refresh loop. Kill the returned thread to terminate it. 130-- | Fork a refresh loop. Kill the returned thread to terminate it.
131forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId 131forkPollForRefresh :: Ord qk => SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ThreadId
132forkPollForRefresh r@BucketRefresher{ refreshInterval 132forkPollForRefresh r@BucketRefresher{ refreshInterval
133 , refreshQueue 133 , refreshQueue
134 , refreshBuckets 134 , refreshBuckets
@@ -194,7 +194,7 @@ checkBucketFull space var resultCounter fin n found_node = do
194 194
195-- | Called from 'refreshBucket' with the current time when a refresh of the 195-- | Called from 'refreshBucket' with the current time when a refresh of the
196-- supplied bucket number finishes. 196-- supplied bucket number finishes.
197onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) 197onFinishedRefresh :: BucketRefresher nid ni qk -> Int -> POSIXTime -> STM (IO ())
198onFinishedRefresh BucketRefresher { bootstrapCountdown 198onFinishedRefresh BucketRefresher { bootstrapCountdown
199 , bootstrapMode 199 , bootstrapMode
200 , refreshQueue 200 , refreshQueue
@@ -235,11 +235,11 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown
235 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." 235 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
236 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) 236 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
237 237
238data BucketSearch nid ni = forall addr tok. BucketSearch 238data BucketSearch nid ni = forall addr tok qk. BucketSearch
239 { bucketSample :: nid 239 { bucketSample :: nid
240 , bucketResults :: TVar (Set.Set ni) 240 , bucketResults :: TVar (Set.Set ni)
241 , bucketFinFlag :: TVar Bool 241 , bucketFinFlag :: TVar Bool
242 , bucketState :: SearchState nid addr tok ni ni 242 , bucketState :: SearchState nid addr tok ni ni qk
243 , bucketThread :: ThreadId 243 , bucketThread :: ThreadId
244 } 244 }
245 245
@@ -253,8 +253,8 @@ insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe
253insertBucketState bst Nothing = Just [bst] 253insertBucketState bst Nothing = Just [bst]
254insertBucketState bst (Just xs) = Just (bst : xs) 254insertBucketState bst (Just xs) = Just (bst : xs)
255 255
256refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => 256refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni, Ord qk) =>
257 BucketRefresher nid ni -> Int -> IO Int 257 BucketRefresher nid ni qk -> Int -> IO Int
258refreshBucket r@BucketRefresher{ refreshSearch = sch 258refreshBucket r@BucketRefresher{ refreshSearch = sch
259 , refreshBuckets = var 259 , refreshBuckets = var
260 , refreshState = rstate } 260 , refreshState = rstate }
@@ -297,7 +297,7 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch
297 return $ if b then 1 else c 297 return $ if b then 1 else c
298 return rcount 298 return rcount
299 299
300refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () 300refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ()
301refreshLastBucket r@BucketRefresher { refreshBuckets 301refreshLastBucket r@BucketRefresher { refreshBuckets
302 , refreshQueue } = do 302 , refreshQueue } = do
303 303
@@ -308,7 +308,7 @@ refreshLastBucket r@BucketRefresher { refreshBuckets
308 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) 308 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
309 309
310restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => 310restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
311 BucketRefresher nid ni -> STM (IO ()) 311 BucketRefresher nid ni qk -> STM (IO ())
312restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do 312restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
313 unchanged <- readTVar bootstrapMode 313 unchanged <- readTVar bootstrapMode
314 writeTVar bootstrapMode True 314 writeTVar bootstrapMode True
@@ -319,7 +319,7 @@ restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
319 else return $ dput XRefresh "BOOTSTRAP already bootstrapping" 319 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
320 320
321bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => 321bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
322 BucketRefresher nid ni 322 BucketRefresher nid ni qk
323 -> t1 ni -- ^ Nodes to bootstrap from. 323 -> t1 ni -- ^ Nodes to bootstrap from.
324 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. 324 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
325 -> IO () 325 -> IO ()
@@ -356,7 +356,7 @@ bootstrap r@BucketRefresher { refreshSearch = sch
356 -- maintenance. 356 -- maintenance.
357 357
358 358
359effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime 359effectiveRefreshInterval :: BucketRefresher nid ni qk -> Int -> STM POSIXTime
360effectiveRefreshInterval BucketRefresher{ refreshInterval 360effectiveRefreshInterval BucketRefresher{ refreshInterval
361 , refreshBuckets 361 , refreshBuckets
362 , bootstrapMode } num = do 362 , bootstrapMode } num = do
@@ -429,7 +429,7 @@ effectiveRefreshInterval BucketRefresher{ refreshInterval
429-- We embed the result in the STM monad but currently, no STM state changes 429-- We embed the result in the STM monad but currently, no STM state changes
430-- occur until the returned IO action is invoked. TODO: simplify? 430-- occur until the returned IO action is invoked. TODO: simplify?
431touchBucket :: SensibleNodeId nid ni 431touchBucket :: SensibleNodeId nid ni
432 => BucketRefresher nid ni 432 => BucketRefresher nid ni qk
433 -> RoutingTransition ni -- ^ What happened to the bucket? 433 -> RoutingTransition ni -- ^ What happened to the bucket?
434 -> STM (IO ()) 434 -> STM (IO ())
435touchBucket r@BucketRefresher{ refreshSearch 435touchBucket r@BucketRefresher{ refreshSearch
@@ -461,7 +461,7 @@ touchBucket r@BucketRefresher{ refreshSearch
461 writeTVar refreshLastTouch now 461 writeTVar refreshLastTouch now
462 return action 462 return action
463 463
464refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni 464refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> Kademlia nid ni
465refreshKademlia r@BucketRefresher { refreshSearch = sch 465refreshKademlia r@BucketRefresher { refreshSearch = sch
466 , refreshPing = ping 466 , refreshPing = ping
467 , refreshBuckets = bkts 467 , refreshBuckets = bkts
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs
index 6d3fd16c..bcbfe9d8 100644
--- a/kad/src/Network/Kademlia/CommonAPI.hs
+++ b/kad/src/Network/Kademlia/CommonAPI.hs
@@ -20,7 +20,8 @@ import Network.Kademlia.Search
20import Network.Kademlia.Routing as R 20import Network.Kademlia.Routing as R
21import Crypto.Tox (SecretKey,PublicKey) 21import Crypto.Tox (SecretKey,PublicKey)
22 22
23data DHT = forall nid ni. ( Show ni 23data DHT = forall nid ni qk.
24 ( Show ni
24 , Read ni 25 , Read ni
25 , ToJSON ni 26 , ToJSON ni
26 , FromJSON ni 27 , FromJSON ni
@@ -31,9 +32,10 @@ data DHT = forall nid ni. ( Show ni
31 , Hashable nid 32 , Hashable nid
32 , Typeable ni 33 , Typeable ni
33 , S.Serialize nid 34 , S.Serialize nid
35 , Ord qk
34 ) => 36 ) =>
35 DHT 37 DHT
36 { dhtBuckets :: BucketRefresher nid ni 38 { dhtBuckets :: BucketRefresher nid ni qk
37 , dhtSecretKey :: STM (Maybe SecretKey) 39 , dhtSecretKey :: STM (Maybe SecretKey)
38 , dhtPing :: Map.Map String (DHTPing ni) 40 , dhtPing :: Map.Map String (DHTPing ni)
39 , dhtQuery :: Map.Map String (DHTQuery nid ni) 41 , dhtQuery :: Map.Map String (DHTQuery nid ni)
@@ -45,13 +47,14 @@ data DHT = forall nid ni. ( Show ni
45 , dhtShowHexId :: Maybe (nid -> String) 47 , dhtShowHexId :: Maybe (nid -> String)
46 } 48 }
47 49
48data DHTQuery nid ni = forall addr r tok. 50data DHTQuery nid ni = forall addr r tok qk.
49 ( Ord addr 51 ( Ord addr
50 , Typeable r 52 , Typeable r
51 , Typeable tok 53 , Typeable tok
52 , Typeable ni 54 , Typeable ni
55 , Ord qk
53 ) => DHTQuery 56 ) => DHTQuery
54 { qsearch :: Search nid addr tok ni r 57 { qsearch :: Search nid addr tok ni r qk
55 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. 58 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination.
56 , qshowR :: r -> String 59 , qshowR :: r -> String
57 , qshowTok :: tok -> Maybe String 60 , qshowTok :: tok -> Maybe String
@@ -75,9 +78,9 @@ data DHTAnnouncable nid = forall dta tok ni r.
75 , announceTarget :: dta -> nid 78 , announceTarget :: dta -> nid
76 } 79 }
77 80
78data DHTSearch nid ni = forall addr tok r. DHTSearch 81data DHTSearch nid ni = forall addr tok r qk. DHTSearch
79 { searchThread :: ThreadId 82 { searchThread :: ThreadId
80 , searchState :: SearchState nid addr tok ni r 83 , searchState :: SearchState nid addr tok ni r qk
81 , searchShowTok :: tok -> Maybe String 84 , searchShowTok :: tok -> Maybe String
82 , searchResults :: TVar (Set.Set String) 85 , searchResults :: TVar (Set.Set String)
83 } 86 }
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index 8d9c997b..e30c40da 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -19,6 +19,8 @@ import Control.Concurrent.Tasks
19import Control.Concurrent.STM 19import Control.Concurrent.STM
20import Control.Monad 20import Control.Monad
21import Data.Function 21import Data.Function
22import qualified Data.Map.Strict as Map
23 ;import Data.Map.Strict (Map)
22import Data.Maybe 24import Data.Maybe
23import qualified Data.Set as Set 25import qualified Data.Set as Set
24 ;import Data.Set (Set) 26 ;import Data.Set (Set)
@@ -38,10 +40,11 @@ import Control.Concurrent.Lifted
38import GHC.Conc (labelThread) 40import GHC.Conc (labelThread)
39#endif 41#endif
40 42
41data Search nid addr tok ni r = Search 43data Search nid addr tok ni r qk = Search
42 { searchSpace :: KademliaSpace nid ni 44 { searchSpace :: KademliaSpace nid ni
43 , searchNodeAddress :: ni -> addr 45 , searchNodeAddress :: ni -> addr
44 , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) 46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk
47 , searchQueryCancel :: qk -> STM ()
45 , searchAlpha :: Int -- α = 8 48 , searchAlpha :: Int -- α = 8
46 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on 49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
47 -- how fast the queries are. For Tox's much slower onion-routed queries, we 50 -- how fast the queries are. For Tox's much slower onion-routed queries, we
@@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search
55 , searchK :: Int -- K = 16 58 , searchK :: Int -- K = 16
56 } 59 }
57 60
58data SearchState nid addr tok ni r = SearchState 61data SearchState nid addr tok ni r qk = SearchState
59 { -- | The number of pending queries. Incremented before any query is sent 62 { -- | The number of pending queries. Incremented before any query is sent
60 -- and decremented when we get a reply. 63 -- and decremented when we get a reply.
61 searchPendingCount :: TVar Int 64 searchPendingCount :: TVar Int
65 , searchPending :: TVar (Map qk (STM ()))
62 -- | Nodes scheduled to be queried (roughly at most K). 66 -- | Nodes scheduled to be queried (roughly at most K).
63 -- 67 --
64 -- This will be set to Nothing when a search is canceled. 68 -- This will be set to Nothing when a search is canceled.
@@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState
72 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha 76 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
73 -- should limit the number of outstanding queries. 77 -- should limit the number of outstanding queries.
74 , searchVisited :: TVar (Set addr) 78 , searchVisited :: TVar (Set addr)
75 , searchSpec :: Search nid addr tok ni r 79 , searchSpec :: Search nid addr tok ni r qk
76 } 80 }
77 81
78 82
@@ -87,12 +91,13 @@ newSearch :: ( Ord addr
87 -> (r -> STM Bool) -- receives search results. 91 -> (r -> STM Bool) -- receives search results.
88 -> nid -- target of search 92 -> nid -- target of search
89 -} 93 -}
90 Search nid addr tok ni r 94 Search nid addr tok ni r qk
91 -> nid 95 -> nid
92 -> [ni] -- Initial nodes to query. 96 -> [ni] -- Initial nodes to query.
93 -> STM (SearchState nid addr tok ni r) 97 -> STM (SearchState nid addr tok ni r qk)
94newSearch s@(Search space nAddr qry _ _) target ns = do 98newSearch s@(Search space nAddr qry _ _ _) target ns = do
95 c <- newTVar 0 99 c <- newTVar 0
100 p <- newTVar Map.empty
96 q <- newTVar $ Just 101 q <- newTVar $ Just
97 $ MM.fromList 102 $ MM.fromList
98 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 103 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
@@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do
100 i <- newTVar MM.empty 105 i <- newTVar MM.empty
101 v <- newTVar Set.empty 106 v <- newTVar Set.empty
102 return -- (Search space nAddr qry) , r , target 107 return -- (Search space nAddr qry) , r , target
103 ( SearchState c q i v s ) 108 ( SearchState c p q i v s )
104 109
105-- | Discard a value from a key-priority-value tuple. This is useful for 110-- | Discard a value from a key-priority-value tuple. This is useful for
106-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". 111-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
@@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid)
110-- | Reset a 'SearchState' object to ready it for a repeated search. 115-- | Reset a 'SearchState' object to ready it for a repeated search.
111reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => 116reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
112 (nid -> STM [ni]) 117 (nid -> STM [ni])
113 -> Search nid addr1 tok1 ni r1 118 -> Search nid addr1 tok1 ni r1 qk
114 -> nid 119 -> nid
115 -> SearchState nid addr tok ni r 120 -> SearchState nid addr tok ni r qk
116 -> STM (SearchState nid addr tok ni r) 121 -> STM (SearchState nid addr tok ni r qk)
117reset nearestNodes qsearch target st = do 122reset nearestNodes qsearch target st = do
123 pc <- readTVar (searchPendingCount st)
124 check (pc == 0)
118 searchIsFinished st >>= check -- Wait for a search to finish before resetting. 125 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
119 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 126 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
120 <$> nearestNodes target 127 <$> nearestNodes target
@@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do
125 writeTVar (searchPendingCount st) 0 132 writeTVar (searchPendingCount st) 0
126 return st 133 return st
127 134
128sendQuery :: forall addr nid tok ni r. 135grokQuery :: forall addr nid tok ni r qk.
129 ( Ord addr 136 ( Ord addr
130 , PSQKey nid 137 , PSQKey nid
131 , PSQKey ni 138 , PSQKey ni
132 , Show nid 139 , Show nid
140 , Ord qk
133 ) => 141 ) =>
134 Search nid addr tok ni r 142 Search nid addr tok ni r qk
135 -> nid 143 -> nid
136 -> (r -> STM Bool) -- ^ return False to terminate the search. 144 -> (r -> STM Bool) -- ^ return False to terminate the search.
137 -> SearchState nid addr tok ni r 145 -> SearchState nid addr tok ni r qk
138 -> Binding ni nid 146 -> Binding ni nid
147 -> qk
148 -> Result ([ni],[r],Maybe tok)
139 -> IO () 149 -> IO ()
140sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 150grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do
141 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) 151 -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
142 reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) 152 -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing)
143 -- (ns,rs) 153 -- (ns,rs)
144 let tok = error "TODO: token" 154 -- let tok = error "TODO: token"
145 atomically $ do 155 atomically $ do
146 modifyTVar searchPendingCount pred 156 modifyTVar' searchPendingCount pred
157 modifyTVar' searchPending $ Map.delete qk
147 case reply of 158 case reply of
148 Success x -> go x 159 Success x -> go x
149 _ -> return () 160 _ -> return ()
@@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
170 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d 181 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
171 flip fix rs $ \loop -> \case 182 flip fix rs $ \loop -> \case
172 r:rs' -> do 183 r:rs' -> do
173 wanting <- searchResult r 184 wanting <- withSearchResult r
174 if wanting then loop rs' 185 if wanting then loop rs'
175 else searchCancel sch 186 else searchCancel sch
176 [] -> return () 187 [] -> return ()
@@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
178 189
179searchIsFinished :: ( PSQKey nid 190searchIsFinished :: ( PSQKey nid
180 , PSQKey ni 191 , PSQKey ni
181 ) => SearchState nid addr tok ni r -> STM Bool 192 ) => SearchState nid addr tok ni r qk -> STM Bool
182searchIsFinished SearchState{..} = do 193searchIsFinished SearchState{..} = do
183 readTVar searchQueued >>= \case 194 readTVar searchQueued >>= \case
184 Just q -> do 195 Just q -> do
@@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do
191 <= PSQ.prio (fromJust $ MM.findMin q)))) 202 <= PSQ.prio (fromJust $ MM.findMin q))))
192 Nothing -> return True 203 Nothing -> return True
193 204
194searchCancel :: SearchState nid addr tok ni r -> STM () 205searchCancel :: SearchState nid addr tok ni r qk -> STM ()
195searchCancel SearchState{..} = do 206searchCancel SearchState{..} = do
196 writeTVar searchQueued Nothing 207 writeTVar searchQueued Nothing
208 m <- readTVar searchPending
209 foldr (>>) (return ()) m
197 210
198search :: 211search ::
199 ( Ord r 212 ( Ord r
@@ -201,7 +214,8 @@ search ::
201 , PSQKey nid 214 , PSQKey nid
202 , PSQKey ni 215 , PSQKey ni
203 , Show nid 216 , Show nid
204 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) 217 , Ord qk
218 ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId)
205search sch buckets target result = do 219search sch buckets target result = do
206 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets 220 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
207 st <- atomically $ newSearch sch target ns 221 st <- atomically $ newSearch sch target ns
@@ -211,11 +225,11 @@ search sch buckets target result = do
211 atomically $ writeTVar v True 225 atomically $ writeTVar v True
212 return (st,t) 226 return (st,t)
213 227
214searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) 228searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk )
215 => Search nid addr tok ni r -- ^ Query and distance methods. 229 => Search nid addr tok ni r qk -- ^ Query and distance methods.
216 -> nid -- ^ The target we are searching for. 230 -> nid -- ^ The target we are searching for.
217 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. 231 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
218 -> SearchState nid addr tok ni r -- ^ Search-related state. 232 -> SearchState nid addr tok ni r qk -- ^ Search-related state.
219 -> IO () 233 -> IO ()
220searchLoop sch@Search{..} target result s@SearchState{..} = do 234searchLoop sch@Search{..} target result s@SearchState{..} = do
221 myThreadId >>= flip labelThread ("search."++show target) 235 myThreadId >>= flip labelThread ("search."++show target)
@@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
240 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 254 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
241 modifyTVar searchPendingCount succ 255 modifyTVar searchPendingCount succ
242 return $ do 256 return $ do
243 forkTask g 257 qk <- searchQuery target ni $
244 "searchQuery" 258 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply
245 $ sendQuery sch target result s (ni :-> d) 259 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk)
246 again 260 again
247 _ -> searchIsFinished s >>= check >> return (return ()) 261 _ -> searchIsFinished s >>= check >> return (return ())
diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs
index 85986674..33860a2f 100644
--- a/kad/tests/searchCancel.hs
+++ b/kad/tests/searchCancel.hs
@@ -10,11 +10,13 @@ import Network.Kademlia.Persistence
10import Network.Kademlia.Routing 10import Network.Kademlia.Routing
11import Network.Kademlia.Search 11import Network.Kademlia.Search
12 12
13import Network.QueryResponse as QR
13import qualified Data.MinMaxPSQ as MM 14import qualified Data.MinMaxPSQ as MM
14import qualified Data.Set as Set 15import qualified Data.Set as Set
15 16
16makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> IO (Maybe ([Int],[Int],Maybe ())) 17makeSchResults :: TVar (Maybe (SearchState Int Int () Int Int Int)) -> TVar Int -> IO (Maybe ([Int],[Int],Maybe ()))
17makeSchResults mbv var = do 18makeSchResults mbv var = do
19 putStrLn "makeSchResults"
18 threadDelay 200000 20 threadDelay 200000
19 (r,io) <- atomically $ do 21 (r,io) <- atomically $ do
20 n <- readTVar var 22 n <- readTVar var
@@ -42,11 +44,15 @@ kad = KademliaSpace
42 , kademliaSample = \_ x _ -> pure x 44 , kademliaSample = \_ x _ -> pure x
43 } 45 }
44 46
45sch :: TVar (Maybe (SearchState Int Int () Int Int)) -> TVar Int -> Search Int Int () Int Int 47sch :: TVar (Maybe (SearchState Int Int () Int Int Int)) -> TVar Int -> Search Int Int () Int Int Int
46sch mbv var = Search 48sch mbv var = Search
47 { searchSpace = kad 49 { searchSpace = kad
48 , searchNodeAddress = id 50 , searchNodeAddress = id
49 , searchQuery = \_ _ -> makeSchResults mbv var 51 , searchQuery = \_ _ f -> do r <- makeSchResults mbv var
52 let qk = maybe 0 (\(ns,_,_) -> head ns) r
53 f qk $ maybe TimedOut Success r
54 return qk
55 , searchQueryCancel = \_ -> return ()
50 , searchAlpha = 4 56 , searchAlpha = 4
51 , searchK = 8 57 , searchK = 8
52 } 58 }