summaryrefslogtreecommitdiff
path: root/kad/src/Network/Kademlia
diff options
context:
space:
mode:
Diffstat (limited to 'kad/src/Network/Kademlia')
-rw-r--r--kad/src/Network/Kademlia/Bootstrap.hs36
-rw-r--r--kad/src/Network/Kademlia/CommonAPI.hs15
-rw-r--r--kad/src/Network/Kademlia/Search.hs76
3 files changed, 72 insertions, 55 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs
index c07b3c6c..bd09214f 100644
--- a/kad/src/Network/Kademlia/Bootstrap.hs
+++ b/kad/src/Network/Kademlia/Bootstrap.hs
@@ -52,7 +52,7 @@ type SensibleNodeId nid ni =
52 , Hashable nid 52 , Hashable nid
53 , Hashable ni ) 53 , Hashable ni )
54 54
55data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher 55data BucketRefresher nid ni qk = forall tok addr. Ord addr => BucketRefresher
56 { -- | A staleness threshold (if a bucket goes this long without being 56 { -- | A staleness threshold (if a bucket goes this long without being
57 -- touched, a refresh will be triggered). 57 -- touched, a refresh will be triggered).
58 refreshInterval :: POSIXTime 58 refreshInterval :: POSIXTime
@@ -63,7 +63,7 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
63 -- priority in this priority search queue. 63 -- priority in this priority search queue.
64 , refreshQueue :: TVar (Int.PSQ POSIXTime) 64 , refreshQueue :: TVar (Int.PSQ POSIXTime)
65 -- | This is the kademlia node search specification. 65 -- | This is the kademlia node search specification.
66 , refreshSearch :: Search nid addr tok ni ni 66 , refreshSearch :: Search nid addr tok ni ni qk
67 -- | The current kademlia routing table buckets. 67 -- | The current kademlia routing table buckets.
68 , refreshBuckets :: TVar (R.BucketList ni) 68 , refreshBuckets :: TVar (R.BucketList ni)
69 -- | Action to ping a node. This is used only during initial bootstrap 69 -- | Action to ping a node. This is used only during initial bootstrap
@@ -84,9 +84,9 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher
84newBucketRefresher :: ( Ord addr, Hashable addr 84newBucketRefresher :: ( Ord addr, Hashable addr
85 , SensibleNodeId nid ni ) 85 , SensibleNodeId nid ni )
86 => TVar (R.BucketList ni) 86 => TVar (R.BucketList ni)
87 -> Search nid addr tok ni ni 87 -> Search nid addr tok ni ni qk
88 -> (ni -> IO Bool) 88 -> (ni -> IO Bool)
89 -> STM (BucketRefresher nid ni) 89 -> STM (BucketRefresher nid ni qk)
90newBucketRefresher bkts sch ping = do 90newBucketRefresher bkts sch ping = do
91 let spc = searchSpace sch 91 let spc = searchSpace sch
92 nodeId = kademliaLocation spc 92 nodeId = kademliaLocation spc
@@ -112,9 +112,9 @@ newBucketRefresher bkts sch ping = do
112-- insufficiently polymorphic field" when trying to update the existentially 112-- insufficiently polymorphic field" when trying to update the existentially
113-- quantified field 'refreshSearch'. 113-- quantified field 'refreshSearch'.
114updateRefresherIO :: Ord addr 114updateRefresherIO :: Ord addr
115 => Search nid addr tok ni ni 115 => Search nid addr tok ni ni qk
116 -> (ni -> IO Bool) 116 -> (ni -> IO Bool)
117 -> BucketRefresher nid ni -> BucketRefresher nid ni 117 -> BucketRefresher nid ni qk -> BucketRefresher nid ni qk
118updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher 118updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
119 { refreshSearch = sch 119 { refreshSearch = sch
120 , refreshPing = ping 120 , refreshPing = ping
@@ -128,7 +128,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher
128 } 128 }
129 129
130-- | Fork a refresh loop. Kill the returned thread to terminate it. 130-- | Fork a refresh loop. Kill the returned thread to terminate it.
131forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId 131forkPollForRefresh :: Ord qk => SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ThreadId
132forkPollForRefresh r@BucketRefresher{ refreshInterval 132forkPollForRefresh r@BucketRefresher{ refreshInterval
133 , refreshQueue 133 , refreshQueue
134 , refreshBuckets 134 , refreshBuckets
@@ -194,7 +194,7 @@ checkBucketFull space var resultCounter fin n found_node = do
194 194
195-- | Called from 'refreshBucket' with the current time when a refresh of the 195-- | Called from 'refreshBucket' with the current time when a refresh of the
196-- supplied bucket number finishes. 196-- supplied bucket number finishes.
197onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) 197onFinishedRefresh :: BucketRefresher nid ni qk -> Int -> POSIXTime -> STM (IO ())
198onFinishedRefresh BucketRefresher { bootstrapCountdown 198onFinishedRefresh BucketRefresher { bootstrapCountdown
199 , bootstrapMode 199 , bootstrapMode
200 , refreshQueue 200 , refreshQueue
@@ -235,11 +235,11 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown
235 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." 235 return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")."
236 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) 236 else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt)
237 237
238data BucketSearch nid ni = forall addr tok. BucketSearch 238data BucketSearch nid ni = forall addr tok qk. BucketSearch
239 { bucketSample :: nid 239 { bucketSample :: nid
240 , bucketResults :: TVar (Set.Set ni) 240 , bucketResults :: TVar (Set.Set ni)
241 , bucketFinFlag :: TVar Bool 241 , bucketFinFlag :: TVar Bool
242 , bucketState :: SearchState nid addr tok ni ni 242 , bucketState :: SearchState nid addr tok ni ni qk
243 , bucketThread :: ThreadId 243 , bucketThread :: ThreadId
244 } 244 }
245 245
@@ -253,8 +253,8 @@ insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe
253insertBucketState bst Nothing = Just [bst] 253insertBucketState bst Nothing = Just [bst]
254insertBucketState bst (Just xs) = Just (bst : xs) 254insertBucketState bst (Just xs) = Just (bst : xs)
255 255
256refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => 256refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni, Ord qk) =>
257 BucketRefresher nid ni -> Int -> IO Int 257 BucketRefresher nid ni qk -> Int -> IO Int
258refreshBucket r@BucketRefresher{ refreshSearch = sch 258refreshBucket r@BucketRefresher{ refreshSearch = sch
259 , refreshBuckets = var 259 , refreshBuckets = var
260 , refreshState = rstate } 260 , refreshState = rstate }
@@ -297,7 +297,7 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch
297 return $ if b then 1 else c 297 return $ if b then 1 else c
298 return rcount 298 return rcount
299 299
300refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () 300refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ()
301refreshLastBucket r@BucketRefresher { refreshBuckets 301refreshLastBucket r@BucketRefresher { refreshBuckets
302 , refreshQueue } = do 302 , refreshQueue } = do
303 303
@@ -308,7 +308,7 @@ refreshLastBucket r@BucketRefresher { refreshBuckets
308 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) 308 modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1)
309 309
310restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => 310restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) =>
311 BucketRefresher nid ni -> STM (IO ()) 311 BucketRefresher nid ni qk -> STM (IO ())
312restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do 312restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
313 unchanged <- readTVar bootstrapMode 313 unchanged <- readTVar bootstrapMode
314 writeTVar bootstrapMode True 314 writeTVar bootstrapMode True
@@ -319,7 +319,7 @@ restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do
319 else return $ dput XRefresh "BOOTSTRAP already bootstrapping" 319 else return $ dput XRefresh "BOOTSTRAP already bootstrapping"
320 320
321bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => 321bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) =>
322 BucketRefresher nid ni 322 BucketRefresher nid ni qk
323 -> t1 ni -- ^ Nodes to bootstrap from. 323 -> t1 ni -- ^ Nodes to bootstrap from.
324 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. 324 -> t ni -- ^ Fallback nodes; used only if the others are unresponsive.
325 -> IO () 325 -> IO ()
@@ -356,7 +356,7 @@ bootstrap r@BucketRefresher { refreshSearch = sch
356 -- maintenance. 356 -- maintenance.
357 357
358 358
359effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime 359effectiveRefreshInterval :: BucketRefresher nid ni qk -> Int -> STM POSIXTime
360effectiveRefreshInterval BucketRefresher{ refreshInterval 360effectiveRefreshInterval BucketRefresher{ refreshInterval
361 , refreshBuckets 361 , refreshBuckets
362 , bootstrapMode } num = do 362 , bootstrapMode } num = do
@@ -429,7 +429,7 @@ effectiveRefreshInterval BucketRefresher{ refreshInterval
429-- We embed the result in the STM monad but currently, no STM state changes 429-- We embed the result in the STM monad but currently, no STM state changes
430-- occur until the returned IO action is invoked. TODO: simplify? 430-- occur until the returned IO action is invoked. TODO: simplify?
431touchBucket :: SensibleNodeId nid ni 431touchBucket :: SensibleNodeId nid ni
432 => BucketRefresher nid ni 432 => BucketRefresher nid ni qk
433 -> RoutingTransition ni -- ^ What happened to the bucket? 433 -> RoutingTransition ni -- ^ What happened to the bucket?
434 -> STM (IO ()) 434 -> STM (IO ())
435touchBucket r@BucketRefresher{ refreshSearch 435touchBucket r@BucketRefresher{ refreshSearch
@@ -461,7 +461,7 @@ touchBucket r@BucketRefresher{ refreshSearch
461 writeTVar refreshLastTouch now 461 writeTVar refreshLastTouch now
462 return action 462 return action
463 463
464refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni 464refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> Kademlia nid ni
465refreshKademlia r@BucketRefresher { refreshSearch = sch 465refreshKademlia r@BucketRefresher { refreshSearch = sch
466 , refreshPing = ping 466 , refreshPing = ping
467 , refreshBuckets = bkts 467 , refreshBuckets = bkts
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs
index 6d3fd16c..bcbfe9d8 100644
--- a/kad/src/Network/Kademlia/CommonAPI.hs
+++ b/kad/src/Network/Kademlia/CommonAPI.hs
@@ -20,7 +20,8 @@ import Network.Kademlia.Search
20import Network.Kademlia.Routing as R 20import Network.Kademlia.Routing as R
21import Crypto.Tox (SecretKey,PublicKey) 21import Crypto.Tox (SecretKey,PublicKey)
22 22
23data DHT = forall nid ni. ( Show ni 23data DHT = forall nid ni qk.
24 ( Show ni
24 , Read ni 25 , Read ni
25 , ToJSON ni 26 , ToJSON ni
26 , FromJSON ni 27 , FromJSON ni
@@ -31,9 +32,10 @@ data DHT = forall nid ni. ( Show ni
31 , Hashable nid 32 , Hashable nid
32 , Typeable ni 33 , Typeable ni
33 , S.Serialize nid 34 , S.Serialize nid
35 , Ord qk
34 ) => 36 ) =>
35 DHT 37 DHT
36 { dhtBuckets :: BucketRefresher nid ni 38 { dhtBuckets :: BucketRefresher nid ni qk
37 , dhtSecretKey :: STM (Maybe SecretKey) 39 , dhtSecretKey :: STM (Maybe SecretKey)
38 , dhtPing :: Map.Map String (DHTPing ni) 40 , dhtPing :: Map.Map String (DHTPing ni)
39 , dhtQuery :: Map.Map String (DHTQuery nid ni) 41 , dhtQuery :: Map.Map String (DHTQuery nid ni)
@@ -45,13 +47,14 @@ data DHT = forall nid ni. ( Show ni
45 , dhtShowHexId :: Maybe (nid -> String) 47 , dhtShowHexId :: Maybe (nid -> String)
46 } 48 }
47 49
48data DHTQuery nid ni = forall addr r tok. 50data DHTQuery nid ni = forall addr r tok qk.
49 ( Ord addr 51 ( Ord addr
50 , Typeable r 52 , Typeable r
51 , Typeable tok 53 , Typeable tok
52 , Typeable ni 54 , Typeable ni
55 , Ord qk
53 ) => DHTQuery 56 ) => DHTQuery
54 { qsearch :: Search nid addr tok ni r 57 { qsearch :: Search nid addr tok ni r qk
55 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. 58 , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination.
56 , qshowR :: r -> String 59 , qshowR :: r -> String
57 , qshowTok :: tok -> Maybe String 60 , qshowTok :: tok -> Maybe String
@@ -75,9 +78,9 @@ data DHTAnnouncable nid = forall dta tok ni r.
75 , announceTarget :: dta -> nid 78 , announceTarget :: dta -> nid
76 } 79 }
77 80
78data DHTSearch nid ni = forall addr tok r. DHTSearch 81data DHTSearch nid ni = forall addr tok r qk. DHTSearch
79 { searchThread :: ThreadId 82 { searchThread :: ThreadId
80 , searchState :: SearchState nid addr tok ni r 83 , searchState :: SearchState nid addr tok ni r qk
81 , searchShowTok :: tok -> Maybe String 84 , searchShowTok :: tok -> Maybe String
82 , searchResults :: TVar (Set.Set String) 85 , searchResults :: TVar (Set.Set String)
83 } 86 }
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs
index 8d9c997b..e30c40da 100644
--- a/kad/src/Network/Kademlia/Search.hs
+++ b/kad/src/Network/Kademlia/Search.hs
@@ -19,6 +19,8 @@ import Control.Concurrent.Tasks
19import Control.Concurrent.STM 19import Control.Concurrent.STM
20import Control.Monad 20import Control.Monad
21import Data.Function 21import Data.Function
22import qualified Data.Map.Strict as Map
23 ;import Data.Map.Strict (Map)
22import Data.Maybe 24import Data.Maybe
23import qualified Data.Set as Set 25import qualified Data.Set as Set
24 ;import Data.Set (Set) 26 ;import Data.Set (Set)
@@ -38,10 +40,11 @@ import Control.Concurrent.Lifted
38import GHC.Conc (labelThread) 40import GHC.Conc (labelThread)
39#endif 41#endif
40 42
41data Search nid addr tok ni r = Search 43data Search nid addr tok ni r qk = Search
42 { searchSpace :: KademliaSpace nid ni 44 { searchSpace :: KademliaSpace nid ni
43 , searchNodeAddress :: ni -> addr 45 , searchNodeAddress :: ni -> addr
44 , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) 46 , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk
47 , searchQueryCancel :: qk -> STM ()
45 , searchAlpha :: Int -- α = 8 48 , searchAlpha :: Int -- α = 8
46 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on 49 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on
47 -- how fast the queries are. For Tox's much slower onion-routed queries, we 50 -- how fast the queries are. For Tox's much slower onion-routed queries, we
@@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search
55 , searchK :: Int -- K = 16 58 , searchK :: Int -- K = 16
56 } 59 }
57 60
58data SearchState nid addr tok ni r = SearchState 61data SearchState nid addr tok ni r qk = SearchState
59 { -- | The number of pending queries. Incremented before any query is sent 62 { -- | The number of pending queries. Incremented before any query is sent
60 -- and decremented when we get a reply. 63 -- and decremented when we get a reply.
61 searchPendingCount :: TVar Int 64 searchPendingCount :: TVar Int
65 , searchPending :: TVar (Map qk (STM ()))
62 -- | Nodes scheduled to be queried (roughly at most K). 66 -- | Nodes scheduled to be queried (roughly at most K).
63 -- 67 --
64 -- This will be set to Nothing when a search is canceled. 68 -- This will be set to Nothing when a search is canceled.
@@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState
72 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha 76 -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha
73 -- should limit the number of outstanding queries. 77 -- should limit the number of outstanding queries.
74 , searchVisited :: TVar (Set addr) 78 , searchVisited :: TVar (Set addr)
75 , searchSpec :: Search nid addr tok ni r 79 , searchSpec :: Search nid addr tok ni r qk
76 } 80 }
77 81
78 82
@@ -87,12 +91,13 @@ newSearch :: ( Ord addr
87 -> (r -> STM Bool) -- receives search results. 91 -> (r -> STM Bool) -- receives search results.
88 -> nid -- target of search 92 -> nid -- target of search
89 -} 93 -}
90 Search nid addr tok ni r 94 Search nid addr tok ni r qk
91 -> nid 95 -> nid
92 -> [ni] -- Initial nodes to query. 96 -> [ni] -- Initial nodes to query.
93 -> STM (SearchState nid addr tok ni r) 97 -> STM (SearchState nid addr tok ni r qk)
94newSearch s@(Search space nAddr qry _ _) target ns = do 98newSearch s@(Search space nAddr qry _ _ _) target ns = do
95 c <- newTVar 0 99 c <- newTVar 0
100 p <- newTVar Map.empty
96 q <- newTVar $ Just 101 q <- newTVar $ Just
97 $ MM.fromList 102 $ MM.fromList
98 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) 103 $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n))
@@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do
100 i <- newTVar MM.empty 105 i <- newTVar MM.empty
101 v <- newTVar Set.empty 106 v <- newTVar Set.empty
102 return -- (Search space nAddr qry) , r , target 107 return -- (Search space nAddr qry) , r , target
103 ( SearchState c q i v s ) 108 ( SearchState c p q i v s )
104 109
105-- | Discard a value from a key-priority-value tuple. This is useful for 110-- | Discard a value from a key-priority-value tuple. This is useful for
106-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". 111-- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ".
@@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid)
110-- | Reset a 'SearchState' object to ready it for a repeated search. 115-- | Reset a 'SearchState' object to ready it for a repeated search.
111reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => 116reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) =>
112 (nid -> STM [ni]) 117 (nid -> STM [ni])
113 -> Search nid addr1 tok1 ni r1 118 -> Search nid addr1 tok1 ni r1 qk
114 -> nid 119 -> nid
115 -> SearchState nid addr tok ni r 120 -> SearchState nid addr tok ni r qk
116 -> STM (SearchState nid addr tok ni r) 121 -> STM (SearchState nid addr tok ni r qk)
117reset nearestNodes qsearch target st = do 122reset nearestNodes qsearch target st = do
123 pc <- readTVar (searchPendingCount st)
124 check (pc == 0)
118 searchIsFinished st >>= check -- Wait for a search to finish before resetting. 125 searchIsFinished st >>= check -- Wait for a search to finish before resetting.
119 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) 126 bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni)
120 <$> nearestNodes target 127 <$> nearestNodes target
@@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do
125 writeTVar (searchPendingCount st) 0 132 writeTVar (searchPendingCount st) 0
126 return st 133 return st
127 134
128sendQuery :: forall addr nid tok ni r. 135grokQuery :: forall addr nid tok ni r qk.
129 ( Ord addr 136 ( Ord addr
130 , PSQKey nid 137 , PSQKey nid
131 , PSQKey ni 138 , PSQKey ni
132 , Show nid 139 , Show nid
140 , Ord qk
133 ) => 141 ) =>
134 Search nid addr tok ni r 142 Search nid addr tok ni r qk
135 -> nid 143 -> nid
136 -> (r -> STM Bool) -- ^ return False to terminate the search. 144 -> (r -> STM Bool) -- ^ return False to terminate the search.
137 -> SearchState nid addr tok ni r 145 -> SearchState nid addr tok ni r qk
138 -> Binding ni nid 146 -> Binding ni nid
147 -> qk
148 -> Result ([ni],[r],Maybe tok)
139 -> IO () 149 -> IO ()
140sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do 150grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do
141 myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) 151 -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget)
142 reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) 152 -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing)
143 -- (ns,rs) 153 -- (ns,rs)
144 let tok = error "TODO: token" 154 -- let tok = error "TODO: token"
145 atomically $ do 155 atomically $ do
146 modifyTVar searchPendingCount pred 156 modifyTVar' searchPendingCount pred
157 modifyTVar' searchPending $ Map.delete qk
147 case reply of 158 case reply of
148 Success x -> go x 159 Success x -> go x
149 _ -> return () 160 _ -> return ()
@@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
170 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d 181 modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d
171 flip fix rs $ \loop -> \case 182 flip fix rs $ \loop -> \case
172 r:rs' -> do 183 r:rs' -> do
173 wanting <- searchResult r 184 wanting <- withSearchResult r
174 if wanting then loop rs' 185 if wanting then loop rs'
175 else searchCancel sch 186 else searchCancel sch
176 [] -> return () 187 [] -> return ()
@@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) =
178 189
179searchIsFinished :: ( PSQKey nid 190searchIsFinished :: ( PSQKey nid
180 , PSQKey ni 191 , PSQKey ni
181 ) => SearchState nid addr tok ni r -> STM Bool 192 ) => SearchState nid addr tok ni r qk -> STM Bool
182searchIsFinished SearchState{..} = do 193searchIsFinished SearchState{..} = do
183 readTVar searchQueued >>= \case 194 readTVar searchQueued >>= \case
184 Just q -> do 195 Just q -> do
@@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do
191 <= PSQ.prio (fromJust $ MM.findMin q)))) 202 <= PSQ.prio (fromJust $ MM.findMin q))))
192 Nothing -> return True 203 Nothing -> return True
193 204
194searchCancel :: SearchState nid addr tok ni r -> STM () 205searchCancel :: SearchState nid addr tok ni r qk -> STM ()
195searchCancel SearchState{..} = do 206searchCancel SearchState{..} = do
196 writeTVar searchQueued Nothing 207 writeTVar searchQueued Nothing
208 m <- readTVar searchPending
209 foldr (>>) (return ()) m
197 210
198search :: 211search ::
199 ( Ord r 212 ( Ord r
@@ -201,7 +214,8 @@ search ::
201 , PSQKey nid 214 , PSQKey nid
202 , PSQKey ni 215 , PSQKey ni
203 , Show nid 216 , Show nid
204 ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) 217 , Ord qk
218 ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId)
205search sch buckets target result = do 219search sch buckets target result = do
206 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets 220 let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets
207 st <- atomically $ newSearch sch target ns 221 st <- atomically $ newSearch sch target ns
@@ -211,11 +225,11 @@ search sch buckets target result = do
211 atomically $ writeTVar v True 225 atomically $ writeTVar v True
212 return (st,t) 226 return (st,t)
213 227
214searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) 228searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk )
215 => Search nid addr tok ni r -- ^ Query and distance methods. 229 => Search nid addr tok ni r qk -- ^ Query and distance methods.
216 -> nid -- ^ The target we are searching for. 230 -> nid -- ^ The target we are searching for.
217 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. 231 -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching.
218 -> SearchState nid addr tok ni r -- ^ Search-related state. 232 -> SearchState nid addr tok ni r qk -- ^ Search-related state.
219 -> IO () 233 -> IO ()
220searchLoop sch@Search{..} target result s@SearchState{..} = do 234searchLoop sch@Search{..} target result s@SearchState{..} = do
221 myThreadId >>= flip labelThread ("search."++show target) 235 myThreadId >>= flip labelThread ("search."++show target)
@@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do
240 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) 254 modifyTVar searchVisited $ Set.insert (searchNodeAddress ni)
241 modifyTVar searchPendingCount succ 255 modifyTVar searchPendingCount succ
242 return $ do 256 return $ do
243 forkTask g 257 qk <- searchQuery target ni $
244 "searchQuery" 258 \qk reply -> grokQuery sch target result s (ni :-> d) qk reply
245 $ sendQuery sch target result s (ni :-> d) 259 atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk)
246 again 260 again
247 _ -> searchIsFinished s >>= check >> return (return ()) 261 _ -> searchIsFinished s >>= check >> return (return ())