diff options
Diffstat (limited to 'kad/src/Network/Kademlia')
-rw-r--r-- | kad/src/Network/Kademlia/Bootstrap.hs | 36 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/CommonAPI.hs | 15 | ||||
-rw-r--r-- | kad/src/Network/Kademlia/Search.hs | 76 |
3 files changed, 72 insertions, 55 deletions
diff --git a/kad/src/Network/Kademlia/Bootstrap.hs b/kad/src/Network/Kademlia/Bootstrap.hs index c07b3c6c..bd09214f 100644 --- a/kad/src/Network/Kademlia/Bootstrap.hs +++ b/kad/src/Network/Kademlia/Bootstrap.hs | |||
@@ -52,7 +52,7 @@ type SensibleNodeId nid ni = | |||
52 | , Hashable nid | 52 | , Hashable nid |
53 | , Hashable ni ) | 53 | , Hashable ni ) |
54 | 54 | ||
55 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | 55 | data BucketRefresher nid ni qk = forall tok addr. Ord addr => BucketRefresher |
56 | { -- | A staleness threshold (if a bucket goes this long without being | 56 | { -- | A staleness threshold (if a bucket goes this long without being |
57 | -- touched, a refresh will be triggered). | 57 | -- touched, a refresh will be triggered). |
58 | refreshInterval :: POSIXTime | 58 | refreshInterval :: POSIXTime |
@@ -63,7 +63,7 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
63 | -- priority in this priority search queue. | 63 | -- priority in this priority search queue. |
64 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | 64 | , refreshQueue :: TVar (Int.PSQ POSIXTime) |
65 | -- | This is the kademlia node search specification. | 65 | -- | This is the kademlia node search specification. |
66 | , refreshSearch :: Search nid addr tok ni ni | 66 | , refreshSearch :: Search nid addr tok ni ni qk |
67 | -- | The current kademlia routing table buckets. | 67 | -- | The current kademlia routing table buckets. |
68 | , refreshBuckets :: TVar (R.BucketList ni) | 68 | , refreshBuckets :: TVar (R.BucketList ni) |
69 | -- | Action to ping a node. This is used only during initial bootstrap | 69 | -- | Action to ping a node. This is used only during initial bootstrap |
@@ -84,9 +84,9 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
84 | newBucketRefresher :: ( Ord addr, Hashable addr | 84 | newBucketRefresher :: ( Ord addr, Hashable addr |
85 | , SensibleNodeId nid ni ) | 85 | , SensibleNodeId nid ni ) |
86 | => TVar (R.BucketList ni) | 86 | => TVar (R.BucketList ni) |
87 | -> Search nid addr tok ni ni | 87 | -> Search nid addr tok ni ni qk |
88 | -> (ni -> IO Bool) | 88 | -> (ni -> IO Bool) |
89 | -> STM (BucketRefresher nid ni) | 89 | -> STM (BucketRefresher nid ni qk) |
90 | newBucketRefresher bkts sch ping = do | 90 | newBucketRefresher bkts sch ping = do |
91 | let spc = searchSpace sch | 91 | let spc = searchSpace sch |
92 | nodeId = kademliaLocation spc | 92 | nodeId = kademliaLocation spc |
@@ -112,9 +112,9 @@ newBucketRefresher bkts sch ping = do | |||
112 | -- insufficiently polymorphic field" when trying to update the existentially | 112 | -- insufficiently polymorphic field" when trying to update the existentially |
113 | -- quantified field 'refreshSearch'. | 113 | -- quantified field 'refreshSearch'. |
114 | updateRefresherIO :: Ord addr | 114 | updateRefresherIO :: Ord addr |
115 | => Search nid addr tok ni ni | 115 | => Search nid addr tok ni ni qk |
116 | -> (ni -> IO Bool) | 116 | -> (ni -> IO Bool) |
117 | -> BucketRefresher nid ni -> BucketRefresher nid ni | 117 | -> BucketRefresher nid ni qk -> BucketRefresher nid ni qk |
118 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | 118 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher |
119 | { refreshSearch = sch | 119 | { refreshSearch = sch |
120 | , refreshPing = ping | 120 | , refreshPing = ping |
@@ -128,7 +128,7 @@ updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | |||
128 | } | 128 | } |
129 | 129 | ||
130 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | 130 | -- | Fork a refresh loop. Kill the returned thread to terminate it. |
131 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | 131 | forkPollForRefresh :: Ord qk => SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO ThreadId |
132 | forkPollForRefresh r@BucketRefresher{ refreshInterval | 132 | forkPollForRefresh r@BucketRefresher{ refreshInterval |
133 | , refreshQueue | 133 | , refreshQueue |
134 | , refreshBuckets | 134 | , refreshBuckets |
@@ -194,7 +194,7 @@ checkBucketFull space var resultCounter fin n found_node = do | |||
194 | 194 | ||
195 | -- | Called from 'refreshBucket' with the current time when a refresh of the | 195 | -- | Called from 'refreshBucket' with the current time when a refresh of the |
196 | -- supplied bucket number finishes. | 196 | -- supplied bucket number finishes. |
197 | onFinishedRefresh :: BucketRefresher nid ni -> Int -> POSIXTime -> STM (IO ()) | 197 | onFinishedRefresh :: BucketRefresher nid ni qk -> Int -> POSIXTime -> STM (IO ()) |
198 | onFinishedRefresh BucketRefresher { bootstrapCountdown | 198 | onFinishedRefresh BucketRefresher { bootstrapCountdown |
199 | , bootstrapMode | 199 | , bootstrapMode |
200 | , refreshQueue | 200 | , refreshQueue |
@@ -235,11 +235,11 @@ onFinishedRefresh BucketRefresher { bootstrapCountdown | |||
235 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." | 235 | return $ do action ; dput XRefresh $ "BOOTSTRAP complete (" ++ show (R.shape tbl) ++ ")." |
236 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) | 236 | else return $ do action ; dput XRefresh $ "BOOTSTRAP progress " ++ show (num,R.shape tbl,cnt) |
237 | 237 | ||
238 | data BucketSearch nid ni = forall addr tok. BucketSearch | 238 | data BucketSearch nid ni = forall addr tok qk. BucketSearch |
239 | { bucketSample :: nid | 239 | { bucketSample :: nid |
240 | , bucketResults :: TVar (Set.Set ni) | 240 | , bucketResults :: TVar (Set.Set ni) |
241 | , bucketFinFlag :: TVar Bool | 241 | , bucketFinFlag :: TVar Bool |
242 | , bucketState :: SearchState nid addr tok ni ni | 242 | , bucketState :: SearchState nid addr tok ni ni qk |
243 | , bucketThread :: ThreadId | 243 | , bucketThread :: ThreadId |
244 | } | 244 | } |
245 | 245 | ||
@@ -253,8 +253,8 @@ insertBucketState :: BucketSearch nid ni -> Maybe [BucketSearch nid ni] -> Maybe | |||
253 | insertBucketState bst Nothing = Just [bst] | 253 | insertBucketState bst Nothing = Just [bst] |
254 | insertBucketState bst (Just xs) = Just (bst : xs) | 254 | insertBucketState bst (Just xs) = Just (bst : xs) |
255 | 255 | ||
256 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni) => | 256 | refreshBucket :: (Show nid, Ord ni, Ord nid, Hashable nid, Hashable ni, Ord qk) => |
257 | BucketRefresher nid ni -> Int -> IO Int | 257 | BucketRefresher nid ni qk -> Int -> IO Int |
258 | refreshBucket r@BucketRefresher{ refreshSearch = sch | 258 | refreshBucket r@BucketRefresher{ refreshSearch = sch |
259 | , refreshBuckets = var | 259 | , refreshBuckets = var |
260 | , refreshState = rstate } | 260 | , refreshState = rstate } |
@@ -297,7 +297,7 @@ refreshBucket r@BucketRefresher{ refreshSearch = sch | |||
297 | return $ if b then 1 else c | 297 | return $ if b then 1 else c |
298 | return rcount | 298 | return rcount |
299 | 299 | ||
300 | refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO () | 300 | refreshLastBucket :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> IO () |
301 | refreshLastBucket r@BucketRefresher { refreshBuckets | 301 | refreshLastBucket r@BucketRefresher { refreshBuckets |
302 | , refreshQueue } = do | 302 | , refreshQueue } = do |
303 | 303 | ||
@@ -308,7 +308,7 @@ refreshLastBucket r@BucketRefresher { refreshBuckets | |||
308 | modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) | 308 | modifyTVar' refreshQueue $ Int.insert (cnt-1) (now - 1) |
309 | 309 | ||
310 | restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => | 310 | restartBootstrap :: (Hashable ni, Hashable nid, Ord ni, Ord nid, Show nid) => |
311 | BucketRefresher nid ni -> STM (IO ()) | 311 | BucketRefresher nid ni qk -> STM (IO ()) |
312 | restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do | 312 | restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do |
313 | unchanged <- readTVar bootstrapMode | 313 | unchanged <- readTVar bootstrapMode |
314 | writeTVar bootstrapMode True | 314 | writeTVar bootstrapMode True |
@@ -319,7 +319,7 @@ restartBootstrap r@BucketRefresher{ bootstrapMode, bootstrapCountdown } = do | |||
319 | else return $ dput XRefresh "BOOTSTRAP already bootstrapping" | 319 | else return $ dput XRefresh "BOOTSTRAP already bootstrapping" |
320 | 320 | ||
321 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | 321 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => |
322 | BucketRefresher nid ni | 322 | BucketRefresher nid ni qk |
323 | -> t1 ni -- ^ Nodes to bootstrap from. | 323 | -> t1 ni -- ^ Nodes to bootstrap from. |
324 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. | 324 | -> t ni -- ^ Fallback nodes; used only if the others are unresponsive. |
325 | -> IO () | 325 | -> IO () |
@@ -356,7 +356,7 @@ bootstrap r@BucketRefresher { refreshSearch = sch | |||
356 | -- maintenance. | 356 | -- maintenance. |
357 | 357 | ||
358 | 358 | ||
359 | effectiveRefreshInterval :: BucketRefresher nid ni -> Int -> STM POSIXTime | 359 | effectiveRefreshInterval :: BucketRefresher nid ni qk -> Int -> STM POSIXTime |
360 | effectiveRefreshInterval BucketRefresher{ refreshInterval | 360 | effectiveRefreshInterval BucketRefresher{ refreshInterval |
361 | , refreshBuckets | 361 | , refreshBuckets |
362 | , bootstrapMode } num = do | 362 | , bootstrapMode } num = do |
@@ -429,7 +429,7 @@ effectiveRefreshInterval BucketRefresher{ refreshInterval | |||
429 | -- We embed the result in the STM monad but currently, no STM state changes | 429 | -- We embed the result in the STM monad but currently, no STM state changes |
430 | -- occur until the returned IO action is invoked. TODO: simplify? | 430 | -- occur until the returned IO action is invoked. TODO: simplify? |
431 | touchBucket :: SensibleNodeId nid ni | 431 | touchBucket :: SensibleNodeId nid ni |
432 | => BucketRefresher nid ni | 432 | => BucketRefresher nid ni qk |
433 | -> RoutingTransition ni -- ^ What happened to the bucket? | 433 | -> RoutingTransition ni -- ^ What happened to the bucket? |
434 | -> STM (IO ()) | 434 | -> STM (IO ()) |
435 | touchBucket r@BucketRefresher{ refreshSearch | 435 | touchBucket r@BucketRefresher{ refreshSearch |
@@ -461,7 +461,7 @@ touchBucket r@BucketRefresher{ refreshSearch | |||
461 | writeTVar refreshLastTouch now | 461 | writeTVar refreshLastTouch now |
462 | return action | 462 | return action |
463 | 463 | ||
464 | refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni -> Kademlia nid ni | 464 | refreshKademlia :: SensibleNodeId nid ni => BucketRefresher nid ni qk -> Kademlia nid ni |
465 | refreshKademlia r@BucketRefresher { refreshSearch = sch | 465 | refreshKademlia r@BucketRefresher { refreshSearch = sch |
466 | , refreshPing = ping | 466 | , refreshPing = ping |
467 | , refreshBuckets = bkts | 467 | , refreshBuckets = bkts |
diff --git a/kad/src/Network/Kademlia/CommonAPI.hs b/kad/src/Network/Kademlia/CommonAPI.hs index 6d3fd16c..bcbfe9d8 100644 --- a/kad/src/Network/Kademlia/CommonAPI.hs +++ b/kad/src/Network/Kademlia/CommonAPI.hs | |||
@@ -20,7 +20,8 @@ import Network.Kademlia.Search | |||
20 | import Network.Kademlia.Routing as R | 20 | import Network.Kademlia.Routing as R |
21 | import Crypto.Tox (SecretKey,PublicKey) | 21 | import Crypto.Tox (SecretKey,PublicKey) |
22 | 22 | ||
23 | data DHT = forall nid ni. ( Show ni | 23 | data DHT = forall nid ni qk. |
24 | ( Show ni | ||
24 | , Read ni | 25 | , Read ni |
25 | , ToJSON ni | 26 | , ToJSON ni |
26 | , FromJSON ni | 27 | , FromJSON ni |
@@ -31,9 +32,10 @@ data DHT = forall nid ni. ( Show ni | |||
31 | , Hashable nid | 32 | , Hashable nid |
32 | , Typeable ni | 33 | , Typeable ni |
33 | , S.Serialize nid | 34 | , S.Serialize nid |
35 | , Ord qk | ||
34 | ) => | 36 | ) => |
35 | DHT | 37 | DHT |
36 | { dhtBuckets :: BucketRefresher nid ni | 38 | { dhtBuckets :: BucketRefresher nid ni qk |
37 | , dhtSecretKey :: STM (Maybe SecretKey) | 39 | , dhtSecretKey :: STM (Maybe SecretKey) |
38 | , dhtPing :: Map.Map String (DHTPing ni) | 40 | , dhtPing :: Map.Map String (DHTPing ni) |
39 | , dhtQuery :: Map.Map String (DHTQuery nid ni) | 41 | , dhtQuery :: Map.Map String (DHTQuery nid ni) |
@@ -45,13 +47,14 @@ data DHT = forall nid ni. ( Show ni | |||
45 | , dhtShowHexId :: Maybe (nid -> String) | 47 | , dhtShowHexId :: Maybe (nid -> String) |
46 | } | 48 | } |
47 | 49 | ||
48 | data DHTQuery nid ni = forall addr r tok. | 50 | data DHTQuery nid ni = forall addr r tok qk. |
49 | ( Ord addr | 51 | ( Ord addr |
50 | , Typeable r | 52 | , Typeable r |
51 | , Typeable tok | 53 | , Typeable tok |
52 | , Typeable ni | 54 | , Typeable ni |
55 | , Ord qk | ||
53 | ) => DHTQuery | 56 | ) => DHTQuery |
54 | { qsearch :: Search nid addr tok ni r | 57 | { qsearch :: Search nid addr tok ni r qk |
55 | , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. | 58 | , qhandler :: ni -> nid -> IO ([ni], [r], Maybe tok) -- ^ Invoked on local node, when there is no query destination. |
56 | , qshowR :: r -> String | 59 | , qshowR :: r -> String |
57 | , qshowTok :: tok -> Maybe String | 60 | , qshowTok :: tok -> Maybe String |
@@ -75,9 +78,9 @@ data DHTAnnouncable nid = forall dta tok ni r. | |||
75 | , announceTarget :: dta -> nid | 78 | , announceTarget :: dta -> nid |
76 | } | 79 | } |
77 | 80 | ||
78 | data DHTSearch nid ni = forall addr tok r. DHTSearch | 81 | data DHTSearch nid ni = forall addr tok r qk. DHTSearch |
79 | { searchThread :: ThreadId | 82 | { searchThread :: ThreadId |
80 | , searchState :: SearchState nid addr tok ni r | 83 | , searchState :: SearchState nid addr tok ni r qk |
81 | , searchShowTok :: tok -> Maybe String | 84 | , searchShowTok :: tok -> Maybe String |
82 | , searchResults :: TVar (Set.Set String) | 85 | , searchResults :: TVar (Set.Set String) |
83 | } | 86 | } |
diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 8d9c997b..e30c40da 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs | |||
@@ -19,6 +19,8 @@ import Control.Concurrent.Tasks | |||
19 | import Control.Concurrent.STM | 19 | import Control.Concurrent.STM |
20 | import Control.Monad | 20 | import Control.Monad |
21 | import Data.Function | 21 | import Data.Function |
22 | import qualified Data.Map.Strict as Map | ||
23 | ;import Data.Map.Strict (Map) | ||
22 | import Data.Maybe | 24 | import Data.Maybe |
23 | import qualified Data.Set as Set | 25 | import qualified Data.Set as Set |
24 | ;import Data.Set (Set) | 26 | ;import Data.Set (Set) |
@@ -38,10 +40,11 @@ import Control.Concurrent.Lifted | |||
38 | import GHC.Conc (labelThread) | 40 | import GHC.Conc (labelThread) |
39 | #endif | 41 | #endif |
40 | 42 | ||
41 | data Search nid addr tok ni r = Search | 43 | data Search nid addr tok ni r qk = Search |
42 | { searchSpace :: KademliaSpace nid ni | 44 | { searchSpace :: KademliaSpace nid ni |
43 | , searchNodeAddress :: ni -> addr | 45 | , searchNodeAddress :: ni -> addr |
44 | , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) | 46 | , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk |
47 | , searchQueryCancel :: qk -> STM () | ||
45 | , searchAlpha :: Int -- α = 8 | 48 | , searchAlpha :: Int -- α = 8 |
46 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on | 49 | -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on |
47 | -- how fast the queries are. For Tox's much slower onion-routed queries, we | 50 | -- how fast the queries are. For Tox's much slower onion-routed queries, we |
@@ -55,10 +58,11 @@ data Search nid addr tok ni r = Search | |||
55 | , searchK :: Int -- K = 16 | 58 | , searchK :: Int -- K = 16 |
56 | } | 59 | } |
57 | 60 | ||
58 | data SearchState nid addr tok ni r = SearchState | 61 | data SearchState nid addr tok ni r qk = SearchState |
59 | { -- | The number of pending queries. Incremented before any query is sent | 62 | { -- | The number of pending queries. Incremented before any query is sent |
60 | -- and decremented when we get a reply. | 63 | -- and decremented when we get a reply. |
61 | searchPendingCount :: TVar Int | 64 | searchPendingCount :: TVar Int |
65 | , searchPending :: TVar (Map qk (STM ())) | ||
62 | -- | Nodes scheduled to be queried (roughly at most K). | 66 | -- | Nodes scheduled to be queried (roughly at most K). |
63 | -- | 67 | -- |
64 | -- This will be set to Nothing when a search is canceled. | 68 | -- This will be set to Nothing when a search is canceled. |
@@ -72,7 +76,7 @@ data SearchState nid addr tok ni r = SearchState | |||
72 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha | 76 | -- set. It also can be a bounded 'MinMaxPSQ', although searchAlpha |
73 | -- should limit the number of outstanding queries. | 77 | -- should limit the number of outstanding queries. |
74 | , searchVisited :: TVar (Set addr) | 78 | , searchVisited :: TVar (Set addr) |
75 | , searchSpec :: Search nid addr tok ni r | 79 | , searchSpec :: Search nid addr tok ni r qk |
76 | } | 80 | } |
77 | 81 | ||
78 | 82 | ||
@@ -87,12 +91,13 @@ newSearch :: ( Ord addr | |||
87 | -> (r -> STM Bool) -- receives search results. | 91 | -> (r -> STM Bool) -- receives search results. |
88 | -> nid -- target of search | 92 | -> nid -- target of search |
89 | -} | 93 | -} |
90 | Search nid addr tok ni r | 94 | Search nid addr tok ni r qk |
91 | -> nid | 95 | -> nid |
92 | -> [ni] -- Initial nodes to query. | 96 | -> [ni] -- Initial nodes to query. |
93 | -> STM (SearchState nid addr tok ni r) | 97 | -> STM (SearchState nid addr tok ni r qk) |
94 | newSearch s@(Search space nAddr qry _ _) target ns = do | 98 | newSearch s@(Search space nAddr qry _ _ _) target ns = do |
95 | c <- newTVar 0 | 99 | c <- newTVar 0 |
100 | p <- newTVar Map.empty | ||
96 | q <- newTVar $ Just | 101 | q <- newTVar $ Just |
97 | $ MM.fromList | 102 | $ MM.fromList |
98 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) | 103 | $ map (\n -> n :-> kademliaXor space target (kademliaLocation space n)) |
@@ -100,7 +105,7 @@ newSearch s@(Search space nAddr qry _ _) target ns = do | |||
100 | i <- newTVar MM.empty | 105 | i <- newTVar MM.empty |
101 | v <- newTVar Set.empty | 106 | v <- newTVar Set.empty |
102 | return -- (Search space nAddr qry) , r , target | 107 | return -- (Search space nAddr qry) , r , target |
103 | ( SearchState c q i v s ) | 108 | ( SearchState c p q i v s ) |
104 | 109 | ||
105 | -- | Discard a value from a key-priority-value tuple. This is useful for | 110 | -- | Discard a value from a key-priority-value tuple. This is useful for |
106 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". | 111 | -- swaping items from a "MinMaxPSQ'" to a "MinMaxPSQ". |
@@ -110,11 +115,13 @@ stripValue (Binding ni _ nid) = (ni :-> nid) | |||
110 | -- | Reset a 'SearchState' object to ready it for a repeated search. | 115 | -- | Reset a 'SearchState' object to ready it for a repeated search. |
111 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => | 116 | reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => |
112 | (nid -> STM [ni]) | 117 | (nid -> STM [ni]) |
113 | -> Search nid addr1 tok1 ni r1 | 118 | -> Search nid addr1 tok1 ni r1 qk |
114 | -> nid | 119 | -> nid |
115 | -> SearchState nid addr tok ni r | 120 | -> SearchState nid addr tok ni r qk |
116 | -> STM (SearchState nid addr tok ni r) | 121 | -> STM (SearchState nid addr tok ni r qk) |
117 | reset nearestNodes qsearch target st = do | 122 | reset nearestNodes qsearch target st = do |
123 | pc <- readTVar (searchPendingCount st) | ||
124 | check (pc == 0) | ||
118 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. | 125 | searchIsFinished st >>= check -- Wait for a search to finish before resetting. |
119 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) | 126 | bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) |
120 | <$> nearestNodes target | 127 | <$> nearestNodes target |
@@ -125,25 +132,29 @@ reset nearestNodes qsearch target st = do | |||
125 | writeTVar (searchPendingCount st) 0 | 132 | writeTVar (searchPendingCount st) 0 |
126 | return st | 133 | return st |
127 | 134 | ||
128 | sendQuery :: forall addr nid tok ni r. | 135 | grokQuery :: forall addr nid tok ni r qk. |
129 | ( Ord addr | 136 | ( Ord addr |
130 | , PSQKey nid | 137 | , PSQKey nid |
131 | , PSQKey ni | 138 | , PSQKey ni |
132 | , Show nid | 139 | , Show nid |
140 | , Ord qk | ||
133 | ) => | 141 | ) => |
134 | Search nid addr tok ni r | 142 | Search nid addr tok ni r qk |
135 | -> nid | 143 | -> nid |
136 | -> (r -> STM Bool) -- ^ return False to terminate the search. | 144 | -> (r -> STM Bool) -- ^ return False to terminate the search. |
137 | -> SearchState nid addr tok ni r | 145 | -> SearchState nid addr tok ni r qk |
138 | -> Binding ni nid | 146 | -> Binding ni nid |
147 | -> qk | ||
148 | -> Result ([ni],[r],Maybe tok) | ||
139 | -> IO () | 149 | -> IO () |
140 | sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do | 150 | grokQuery Search{..} searchTarget withSearchResult sch@SearchState{..} (ni :-> d) qk reply = do |
141 | myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) | 151 | -- myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) |
142 | reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) | 152 | -- reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) |
143 | -- (ns,rs) | 153 | -- (ns,rs) |
144 | let tok = error "TODO: token" | 154 | -- let tok = error "TODO: token" |
145 | atomically $ do | 155 | atomically $ do |
146 | modifyTVar searchPendingCount pred | 156 | modifyTVar' searchPendingCount pred |
157 | modifyTVar' searchPending $ Map.delete qk | ||
147 | case reply of | 158 | case reply of |
148 | Success x -> go x | 159 | Success x -> go x |
149 | _ -> return () | 160 | _ -> return () |
@@ -170,7 +181,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
170 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d | 181 | modifyTVar searchInformant $ MM.insertTake' (searchK - searchAlpha) ni tok d |
171 | flip fix rs $ \loop -> \case | 182 | flip fix rs $ \loop -> \case |
172 | r:rs' -> do | 183 | r:rs' -> do |
173 | wanting <- searchResult r | 184 | wanting <- withSearchResult r |
174 | if wanting then loop rs' | 185 | if wanting then loop rs' |
175 | else searchCancel sch | 186 | else searchCancel sch |
176 | [] -> return () | 187 | [] -> return () |
@@ -178,7 +189,7 @@ sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = | |||
178 | 189 | ||
179 | searchIsFinished :: ( PSQKey nid | 190 | searchIsFinished :: ( PSQKey nid |
180 | , PSQKey ni | 191 | , PSQKey ni |
181 | ) => SearchState nid addr tok ni r -> STM Bool | 192 | ) => SearchState nid addr tok ni r qk -> STM Bool |
182 | searchIsFinished SearchState{..} = do | 193 | searchIsFinished SearchState{..} = do |
183 | readTVar searchQueued >>= \case | 194 | readTVar searchQueued >>= \case |
184 | Just q -> do | 195 | Just q -> do |
@@ -191,9 +202,11 @@ searchIsFinished SearchState{..} = do | |||
191 | <= PSQ.prio (fromJust $ MM.findMin q)))) | 202 | <= PSQ.prio (fromJust $ MM.findMin q)))) |
192 | Nothing -> return True | 203 | Nothing -> return True |
193 | 204 | ||
194 | searchCancel :: SearchState nid addr tok ni r -> STM () | 205 | searchCancel :: SearchState nid addr tok ni r qk -> STM () |
195 | searchCancel SearchState{..} = do | 206 | searchCancel SearchState{..} = do |
196 | writeTVar searchQueued Nothing | 207 | writeTVar searchQueued Nothing |
208 | m <- readTVar searchPending | ||
209 | foldr (>>) (return ()) m | ||
197 | 210 | ||
198 | search :: | 211 | search :: |
199 | ( Ord r | 212 | ( Ord r |
@@ -201,7 +214,8 @@ search :: | |||
201 | , PSQKey nid | 214 | , PSQKey nid |
202 | , PSQKey ni | 215 | , PSQKey ni |
203 | , Show nid | 216 | , Show nid |
204 | ) => Search nid addr tok ni r -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r, ThreadId) | 217 | , Ord qk |
218 | ) => Search nid addr tok ni r qk -> R.BucketList ni -> nid -> (r -> STM Bool) -> IO (SearchState nid addr tok ni r qk, ThreadId) | ||
205 | search sch buckets target result = do | 219 | search sch buckets target result = do |
206 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets | 220 | let ns = R.kclosest (searchSpace sch) (searchK sch) target buckets |
207 | st <- atomically $ newSearch sch target ns | 221 | st <- atomically $ newSearch sch target ns |
@@ -211,11 +225,11 @@ search sch buckets target result = do | |||
211 | atomically $ writeTVar v True | 225 | atomically $ writeTVar v True |
212 | return (st,t) | 226 | return (st,t) |
213 | 227 | ||
214 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni ) | 228 | searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, Ord qk ) |
215 | => Search nid addr tok ni r -- ^ Query and distance methods. | 229 | => Search nid addr tok ni r qk -- ^ Query and distance methods. |
216 | -> nid -- ^ The target we are searching for. | 230 | -> nid -- ^ The target we are searching for. |
217 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. | 231 | -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. |
218 | -> SearchState nid addr tok ni r -- ^ Search-related state. | 232 | -> SearchState nid addr tok ni r qk -- ^ Search-related state. |
219 | -> IO () | 233 | -> IO () |
220 | searchLoop sch@Search{..} target result s@SearchState{..} = do | 234 | searchLoop sch@Search{..} target result s@SearchState{..} = do |
221 | myThreadId >>= flip labelThread ("search."++show target) | 235 | myThreadId >>= flip labelThread ("search."++show target) |
@@ -240,8 +254,8 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do | |||
240 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) | 254 | modifyTVar searchVisited $ Set.insert (searchNodeAddress ni) |
241 | modifyTVar searchPendingCount succ | 255 | modifyTVar searchPendingCount succ |
242 | return $ do | 256 | return $ do |
243 | forkTask g | 257 | qk <- searchQuery target ni $ |
244 | "searchQuery" | 258 | \qk reply -> grokQuery sch target result s (ni :-> d) qk reply |
245 | $ sendQuery sch target result s (ni :-> d) | 259 | atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) |
246 | again | 260 | again |
247 | _ -> searchIsFinished s >>= check >> return (return ()) | 261 | _ -> searchIsFinished s >>= check >> return (return ()) |