diff options
author | Joe Crayne <joe@jerkface.net> | 2019-12-27 01:20:59 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 23:28:00 -0500 |
commit | d8a7ad88bfdb76b7c481c0ce89de63528a06e734 (patch) | |
tree | 621f3145cb3b08b5133229372501bef8c84a7cb6 | |
parent | 9f33d972b60959d69318e5f243ffae4252d6d3f5 (diff) |
Made the BucketRefresher state accessible from CommonAPI.
-rw-r--r-- | dht/examples/dhtd.hs | 26 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Bootstrap.hs | 44 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/CommonAPI.hs | 8 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Persistence.hs | 2 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 9 |
5 files changed, 65 insertions, 24 deletions
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index ef3f6bd4..4f83beb2 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs | |||
@@ -551,7 +551,7 @@ clientSession s@Session{..} sock cnum h = do | |||
551 | 551 | ||
552 | ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts | 552 | ("ls", _) | Just DHT{dhtBuckets} <- Map.lookup netname dhts |
553 | -> cmd0 $ do | 553 | -> cmd0 $ do |
554 | bkts <- atomically $ readTVar dhtBuckets | 554 | bkts <- atomically $ readTVar (refreshBuckets dhtBuckets) |
555 | let r = reportTable bkts | 555 | let r = reportTable bkts |
556 | hPutClient h $ | 556 | hPutClient h $ |
557 | showReport $ | 557 | showReport $ |
@@ -562,7 +562,7 @@ clientSession s@Session{..} sock cnum h = do | |||
562 | ("r", s) | Just DHT{dhtQuery,dhtBuckets} <- Map.lookup netname dhts | 562 | ("r", s) | Just DHT{dhtQuery,dhtBuckets} <- Map.lookup netname dhts |
563 | , Just DHTQuery{qsearch} <- Map.lookup "node" dhtQuery | 563 | , Just DHTQuery{qsearch} <- Map.lookup "node" dhtQuery |
564 | -> cmd0 $ do | 564 | -> cmd0 $ do |
565 | ni <- atomically $ thisNode <$> readTVar dhtBuckets | 565 | ni <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) |
566 | let kad = searchSpace qsearch | 566 | let kad = searchSpace qsearch |
567 | nid = kademliaLocation kad ni | 567 | nid = kademliaLocation kad ni |
568 | b = case readMaybe $ strp s of | 568 | b = case readMaybe $ strp s of |
@@ -768,7 +768,7 @@ clientSession s@Session{..} sock cnum h = do | |||
768 | -- arguments: method | 768 | -- arguments: method |
769 | -- nid | 769 | -- nid |
770 | -- (optional dest-ni) | 770 | -- (optional dest-ni) |
771 | self <- atomically $ thisNode <$> readTVar dhtBuckets | 771 | self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) |
772 | let (method,xs) = break isSpace $ dropWhile isSpace s | 772 | let (method,xs) = break isSpace $ dropWhile isSpace s |
773 | (nidstr,ys) = break isSpace $ dropWhile isSpace xs | 773 | (nidstr,ys) = break isSpace $ dropWhile isSpace xs |
774 | destination = dropWhile isSpace ys | 774 | destination = dropWhile isSpace ys |
@@ -819,7 +819,7 @@ clientSession s@Session{..} sock cnum h = do | |||
819 | -- data (jid or key) data | 819 | -- data (jid or key) data |
820 | -- dest-rendezvous(r) token | 820 | -- dest-rendezvous(r) token |
821 | -- (optional extra-text) (optional dest-ni) | 821 | -- (optional extra-text) (optional dest-ni) |
822 | self <- atomically $ thisNode <$> readTVar dhtBuckets | 822 | self <- atomically $ thisNode <$> readTVar (refreshBuckets dhtBuckets) |
823 | let (method,xs) = break isSpace $ dropWhile isSpace s | 823 | let (method,xs) = break isSpace $ dropWhile isSpace s |
824 | (dtastr,ys) = break isSpace $ dropWhile isSpace xs | 824 | (dtastr,ys) = break isSpace $ dropWhile isSpace xs |
825 | (tokenstr,zs) = break isSpace $ dropWhile isSpace ys | 825 | (tokenstr,zs) = break isSpace $ dropWhile isSpace ys |
@@ -941,7 +941,7 @@ clientSession s@Session{..} sock cnum h = do | |||
941 | (\nid -> R.kclosest (searchSpace qsearch) | 941 | (\nid -> R.kclosest (searchSpace qsearch) |
942 | (searchK qsearch) | 942 | (searchK qsearch) |
943 | nid | 943 | nid |
944 | <$> readTVar dhtBuckets) | 944 | <$> readTVar (refreshBuckets dhtBuckets)) |
945 | (announceTarget dta) | 945 | (announceTarget dta) |
946 | announceInterval) | 946 | announceInterval) |
947 | dta | 947 | dta |
@@ -967,7 +967,7 @@ clientSession s@Session{..} sock cnum h = do | |||
967 | (\nid -> R.kclosest (searchSpace qsearch) | 967 | (\nid -> R.kclosest (searchSpace qsearch) |
968 | (searchK qsearch) | 968 | (searchK qsearch) |
969 | nid | 969 | nid |
970 | <$> readTVar dhtBuckets) | 970 | <$> readTVar (refreshBuckets dhtBuckets)) |
971 | (announceTarget dta) | 971 | (announceTarget dta) |
972 | announceInterval) | 972 | announceInterval) |
973 | dta | 973 | dta |
@@ -1009,7 +1009,7 @@ clientSession s@Session{..} sock cnum h = do | |||
1009 | join $ atomically $ do | 1009 | join $ atomically $ do |
1010 | schs <- readTVar dhtSearches | 1010 | schs <- readTVar dhtSearches |
1011 | case Map.lookup (method,nid) schs of | 1011 | case Map.lookup (method,nid) schs of |
1012 | Nothing -> do forkSearch method nid qry dhtSearches dhtBuckets tid kvar | 1012 | Nothing -> do forkSearch method nid qry dhtSearches (refreshBuckets dhtBuckets) tid kvar |
1013 | return $ presentSearches | 1013 | return $ presentSearches |
1014 | Just sch -> do writeTVar kvar (Just $ return ()) | 1014 | Just sch -> do writeTVar kvar (Just $ return ()) |
1015 | return $ reportSearchResults method h sch | 1015 | return $ reportSearchResults method h sch |
@@ -1614,7 +1614,7 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of | |||
1614 | tcpclient = tcpKademliaClient $ Tox.toxOnionRoutes tox | 1614 | tcpclient = tcpKademliaClient $ Tox.toxOnionRoutes tox |
1615 | tcpRefresher = tcpBucketRefresher $ Tox.toxOnionRoutes tox | 1615 | tcpRefresher = tcpBucketRefresher $ Tox.toxOnionRoutes tox |
1616 | tcpDHT = DHT | 1616 | tcpDHT = DHT |
1617 | { dhtBuckets = refreshBuckets tcpRefresher | 1617 | { dhtBuckets = tcpRefresher |
1618 | , dhtSecretKey = return $ Just $ transportSecret (Tox.toxCryptoKeys tox) | 1618 | , dhtSecretKey = return $ Just $ transportSecret (Tox.toxCryptoKeys tox) |
1619 | , dhtPing = Map.singleton "ping" DHTPing | 1619 | , dhtPing = Map.singleton "ping" DHTPing |
1620 | { pingQuery = noArgPing $ TCP.tcpPing (TCP.tcpClient tcpclient) | 1620 | { pingQuery = noArgPing $ TCP.tcpPing (TCP.tcpClient tcpclient) |
@@ -1639,9 +1639,9 @@ initTox opts ssvar keysdb mbxmpp invc = case porttox opts of | |||
1639 | , dhtShowHexId = Just Tox.showHexId | 1639 | , dhtShowHexId = Just Tox.showHexId |
1640 | } | 1640 | } |
1641 | dhts = Map.fromList $ | 1641 | dhts = Map.fromList $ |
1642 | ("tox4", toxDHT Tox.routing4 Want_IP4) | 1642 | ("tox4", toxDHT Tox.refresher4 Want_IP4) |
1643 | : (if ip6tox opts | 1643 | : (if ip6tox opts |
1644 | then ( ("tox6", toxDHT Tox.routing6 Want_IP6) :) | 1644 | then ( ("tox6", toxDHT Tox.refresher6 Want_IP6) :) |
1645 | else id) | 1645 | else id) |
1646 | (if enableTCPDHT opts | 1646 | (if enableTCPDHT opts |
1647 | then [ ("toxtcp", tcpDHT) ] | 1647 | then [ ("toxtcp", tcpDHT) ] |
@@ -1674,7 +1674,7 @@ initJabber opts ssvar announcer mbtox toxchat = case portxmpp opts of | |||
1674 | let lookupBkts :: String -> Map.Map String DHT -> Maybe (String,TVar (BucketList Tox.NodeInfo)) | 1674 | let lookupBkts :: String -> Map.Map String DHT -> Maybe (String,TVar (BucketList Tox.NodeInfo)) |
1675 | lookupBkts name m = case Map.lookup name m of | 1675 | lookupBkts name m = case Map.lookup name m of |
1676 | Nothing -> Nothing | 1676 | Nothing -> Nothing |
1677 | Just DHT{dhtBuckets} -> cast (name, dhtBuckets) | 1677 | Just DHT{dhtBuckets} -> cast (name, refreshBuckets dhtBuckets) |
1678 | sv <- xmppServer Tcp.noCleanUp (Just sport) | 1678 | sv <- xmppServer Tcp.noCleanUp (Just sport) |
1679 | tcp <- xmppConnections sv -- :: IO ( Manager TCPStatus T.Text ) | 1679 | tcp <- xmppConnections sv -- :: IO ( Manager TCPStatus T.Text ) |
1680 | let tman = toxman ssvar announcer <$> mbtox | 1680 | let tman = toxman ssvar announcer <$> mbtox |
@@ -1795,9 +1795,9 @@ main = do | |||
1795 | , dhtShowHexId = Nothing | 1795 | , dhtShowHexId = Nothing |
1796 | } | 1796 | } |
1797 | dhts = Map.fromList $ | 1797 | dhts = Map.fromList $ |
1798 | ("bt4", mainlineDHT Mainline.routing4 Want_IP4) | 1798 | ("bt4", mainlineDHT Mainline.refresher4 Want_IP4) |
1799 | : if ip6bt opts | 1799 | : if ip6bt opts |
1800 | then [ ("bt6", mainlineDHT Mainline.routing6 Want_IP6) ] | 1800 | then [ ("bt6", mainlineDHT Mainline.refresher6 Want_IP6) ] |
1801 | else [] | 1801 | else [] |
1802 | ips :: IO [SockAddr] | 1802 | ips :: IO [SockAddr] |
1803 | ips = readExternals Mainline.nodeAddr | 1803 | ips = readExternals Mainline.nodeAddr |
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index 08ba3318..c07b3c6c 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs | |||
@@ -16,6 +16,8 @@ | |||
16 | module Network.Kademlia.Bootstrap where | 16 | module Network.Kademlia.Bootstrap where |
17 | 17 | ||
18 | import Data.Function | 18 | import Data.Function |
19 | import qualified Data.IntMap.Strict as IntMap | ||
20 | ;import Data.IntMap.Strict (IntMap) | ||
19 | import Data.Maybe | 21 | import Data.Maybe |
20 | import qualified Data.Set as Set | 22 | import qualified Data.Set as Set |
21 | import Data.Time.Clock.POSIX (getPOSIXTime) | 23 | import Data.Time.Clock.POSIX (getPOSIXTime) |
@@ -71,10 +73,12 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
71 | , -- | Timestamp of last bucket event. | 73 | , -- | Timestamp of last bucket event. |
72 | refreshLastTouch :: TVar POSIXTime | 74 | refreshLastTouch :: TVar POSIXTime |
73 | , -- | This variable indicates whether or not we are in bootstrapping mode. | 75 | , -- | This variable indicates whether or not we are in bootstrapping mode. |
74 | bootstrapMode :: TVar Bool | 76 | bootstrapMode :: TVar Bool |
75 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on | 77 | , -- | When this countdown reaches 0, we exit bootstrap mode. It is decremented on |
76 | -- every finished refresh. | 78 | -- every finished refresh. |
77 | bootstrapCountdown :: TVar (Maybe Int) | 79 | bootstrapCountdown :: TVar (Maybe Int) |
80 | -- | Internal state of background searches. Exposed for debugging purposes. | ||
81 | , refreshState :: TVar (IntMap [BucketSearch nid ni]) | ||
78 | } | 82 | } |
79 | 83 | ||
80 | newBucketRefresher :: ( Ord addr, Hashable addr | 84 | newBucketRefresher :: ( Ord addr, Hashable addr |
@@ -91,6 +95,7 @@ newBucketRefresher bkts sch ping = do | |||
91 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... | 95 | lasttouch <- newTVar 0 -- Would use getPOSIXTime here, or minBound, but alas... |
92 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. | 96 | bootstrapVar <- newTVar True -- Start in bootstrapping mode. |
93 | bootstrapCnt <- newTVar Nothing | 97 | bootstrapCnt <- newTVar Nothing |
98 | st <- newTVar IntMap.empty | ||
94 | return BucketRefresher | 99 | return BucketRefresher |
95 | { refreshInterval = 15 * 60 | 100 | { refreshInterval = 15 * 60 |
96 | , refreshQueue = sched | 101 | , refreshQueue = sched |
@@ -100,6 +105,7 @@ newBucketRefresher bkts sch ping = do | |||
100 | , refreshLastTouch = lasttouch | 105 | , refreshLastTouch = lasttouch |
101 | , bootstrapMode = bootstrapVar | 106 | , bootstrapMode = bootstrapVar |
102 | , bootstrapCountdown = bootstrapCnt | 107 | , bootstrapCountdown = bootstrapCnt |
108 | , refreshState = st | ||
103 | } | 109 | } |
104 | 110 | ||
105 | -- | This was added to avoid the compile error "Record update for | 111 | -- | This was added to avoid the compile error "Record update for |
@@ -118,6 +124,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | |||
118 | , refreshLastTouch = refreshLastTouch | 124 | , refreshLastTouch = refreshLastTouch |
119 | , bootstrapMode = bootstrapMode | 125 | , bootstrapMode = bootstrapMode |
120 | , bootstrapCountdown = bootstrapCountdown | 126 | , bootstrapCountdown = bootstrapCountdown |
127 | , refreshState = refreshState | ||
121 | } | 128 | } |
122 | 129 | ||
123 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | 130 | -- | Fork a refresh loop. Kill the returned thread to terminate it. |
@@ -228,10 +235,29 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown | |||
228 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | 235 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." |
229 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | 236 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) |
230 | 237 | ||
238 | data BucketSearch nid ni = forall addr tok. BucketSearch | ||
239 | { bucketSample :: nid | ||
240 | , bucketResults :: TVar (Set.Set ni) | ||
241 | , bucketFinFlag :: TVar Bool | ||
242 | , bucketState :: SearchState nid addr tok ni ni | ||
243 | , bucketThread :: ThreadId | ||
244 | } | ||
245 | |||
246 | removeBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
247 | removeBucketState bst Nothing = Nothing | ||
248 | removeBucketState bst (Just xs) = case filter (\b -> bucketThread b /= bucketThread bst) xs of | ||
249 | [] -> Nothing | ||
250 | ys -> Just ys | ||
251 | |||
252 | insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe [BucketSearch nid ni] | ||
253 | insertBucketState bst Nothing = Just [bst] | ||
254 | insertBucketState bst (Just xs) = Just (bst : xs) | ||
255 | |||
231 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | 256 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => |
232 | BucketRefresher nid ni -> Int -> IO Int | 257 | BucketRefresher nid ni -> Int -> IO Int |
233 | refreshBucket r@BucketRefresher{ refreshSearch = sch | 258 | refreshBucket r@BucketRefresher{ refreshSearch = sch |
234 | , refreshBuckets = var } | 259 | , refreshBuckets = var |
260 | , refreshState = rstate } | ||
235 | n = do | 261 | n = do |
236 | tbl <- atomically (readTVar var) | 262 | tbl <- atomically (readTVar var) |
237 | let count = bktCount tbl | 263 | let count = bktCount tbl |
@@ -248,13 +274,21 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch | |||
248 | dput XRefresh $ "Start refresh " ++ show (n,sample) | 274 | dput XRefresh $ "Start refresh " ++ show (n,sample) |
249 | 275 | ||
250 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | 276 | -- Set 15 minute timeout in order to avoid overlapping refreshes. |
251 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | 277 | (s,thread) <- search sch tbl sample $ if n+1 == R.defaultBucketCount |
252 | then const $ return True -- Never short-circuit the last bucket. | 278 | then const $ return True -- Never short-circuit the last bucket. |
253 | else checkBucketFull (searchSpace sch) var resultCounter fin n | 279 | else checkBucketFull (searchSpace sch) var resultCounter fin n |
280 | let bstate = BucketSearch sample resultCounter fin s thread | ||
281 | atomically $ modifyTVar' rstate $ IntMap.alter (insertBucketState bstate) n | ||
254 | _ <- timeout (15*60*1000000) $ do | 282 | _ <- timeout (15*60*1000000) $ do |
255 | atomically $ searchIsFinished s >>= check | 283 | atomically $ searchIsFinished s >>= check |
256 | atomically $ searchCancel s | 284 | atomically $ searchCancel s |
257 | dput XDHT $ "Finish refresh " ++ show (n,sample) | 285 | dput XDHT $ "Finish refresh " ++ show (n,sample) |
286 | bg <- forkIO $ do | ||
287 | atomically $ do | ||
288 | searchIsFinished s >>= check | ||
289 | modifyTVar' rstate $ IntMap.alter (removeBucketState bstate) n | ||
290 | |||
291 | labelThread bg ("backgrounded." ++ show n ++ "." ++ show sample) | ||
258 | now <- getPOSIXTime | 292 | now <- getPOSIXTime |
259 | join $ atomically $ onFinishedRefresh r n now | 293 | join $ atomically $ onFinishedRefresh r n now |
260 | rcount <- atomically $ do | 294 | rcount <- atomically $ do |
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs index 4714cecc..6d3fd16c 100644 --- a/kad/src/Network/Kademlia/CommonAPI.hs +++ b/kad/src/Network/Kademlia/CommonAPI.hs | |||
@@ -1,5 +1,8 @@ | |||
1 | {-# LANGUAGE ExistentialQuantification #-} | 1 | {-# LANGUAGE ExistentialQuantification #-} |
2 | module Network.Kademlia.CommonAPI where | 2 | module Network.Kademlia.CommonAPI |
3 | ( module Network.Kademlia.CommonAPI | ||
4 | , refreshBuckets | ||
5 | ) where | ||
3 | 6 | ||
4 | 7 | ||
5 | import Control.Concurrent | 8 | import Control.Concurrent |
@@ -12,6 +15,7 @@ import qualified Data.Set as Set | |||
12 | import Data.Time.Clock.POSIX | 15 | import Data.Time.Clock.POSIX |
13 | import Data.Typeable | 16 | import Data.Typeable |
14 | 17 | ||
18 | import Network.Kademlia.Bootstrap | ||
15 | import Network.Kademlia.Search | 19 | import Network.Kademlia.Search |
16 | import Network.Kademlia.Routing as R | 20 | import Network.Kademlia.Routing as R |
17 | import Crypto.Tox (SecretKey,PublicKey) | 21 | import Crypto.Tox (SecretKey,PublicKey) |
@@ -29,7 +33,7 @@ data DHT = forall nid ni. ( Show ni | |||
29 | , S.Serialize nid | 33 | , S.Serialize nid |
30 | ) => | 34 | ) => |
31 | DHT | 35 | DHT |
32 | { dhtBuckets :: TVar (BucketList ni) | 36 | { dhtBuckets :: BucketRefresher nid ni |
33 | , dhtSecretKey :: STM (Maybe SecretKey) | 37 | , dhtSecretKey :: STM (Maybe SecretKey) |
34 | , dhtPing :: Map.Map String (DHTPing ni) | 38 | , dhtPing :: Map.Map String (DHTPing ni) |
35 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | 39 | , dhtQuery :: Map.Map String (DHTQuery nid ni) |
diff --git a/kad/src/Network/Kademlia/Persistence.hs b/kad/src/Network/Kademlia/Persistence.hs index 32ec169d..f89287fe 100644 --- a/kad/src/Network/Kademlia/Persistence.hs +++ b/kad/src/Network/Kademlia/Persistence.hs | |||
@@ -16,7 +16,7 @@ import System.IO.Error | |||
16 | 16 | ||
17 | saveNodes :: String -> DHT -> IO () | 17 | saveNodes :: String -> DHT -> IO () |
18 | saveNodes netname DHT{dhtBuckets} = do | 18 | saveNodes netname DHT{dhtBuckets} = do |
19 | bkts <- atomically $ readTVar dhtBuckets | 19 | bkts <- atomically $ readTVar (refreshBuckets dhtBuckets) |
20 | let ns = map fst $ concat $ R.toList bkts | 20 | let ns = map fst $ concat $ R.toList bkts |
21 | bs = J.encode ns | 21 | bs = J.encode ns |
22 | fname = nodesFileName netname | 22 | fname = nodesFileName netname |
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 5b60c303..856a7cfc 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -194,12 +194,15 @@ search :: | |||
194 | , PSQKey nid | 194 | , PSQKey nid |
195 | , PSQKey ni | 195 | , PSQKey ni |
196 | , Show nid | 196 | , Show nid |
197 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r) | 197 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) |
198 | search sch buckets target result = do | 198 | search sch buckets target result = do |
199 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | 199 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets |
200 | st <- atomically $ newSearch sch target ns | 200 | st <- atomically $ newSearch sch target ns |
201 | t <- forkIO $ searchLoop sch target result st | 201 | v <- newTVarIO False |
202 | return st | 202 | t <- forkIO $ atomically (check =<< readTVar v) >> searchLoop sch target result st |
203 | labelThread t ("search.pending." ++ show target) | ||
204 | atomically $ writeTVar v True | ||
205 | return (st,t) | ||
203 | 206 | ||
204 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | 207 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) |
205 | => Search nid addr tok ni r -- ^ Query and distance methods. | 208 | => Search nid addr tok ni r -- ^ Query and distance methods. |