diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-06 20:22:55 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-06 22:01:59 -0500 |
commit | 6732956abab4c78cdd4ec127881394c5265db5eb (patch) | |
tree | eb53d9334c054de250bdd32d3d3b26bac76f5e97 /dht/Announcer/Tox.hs | |
parent | 3f63b9bcbd5c3871f3a31fa10e4f1e49efea1c39 (diff) |
Avoid overlapping searches.
Diffstat (limited to 'dht/Announcer/Tox.hs')
-rw-r--r-- | dht/Announcer/Tox.hs | 55 |
1 files changed, 35 insertions, 20 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs index 4b775049..e2459e0e 100644 --- a/dht/Announcer/Tox.hs +++ b/dht/Announcer/Tox.hs | |||
@@ -139,6 +139,7 @@ scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO () | |||
139 | scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do | 139 | scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do |
140 | st <- atomically $ newSearch aSearch aTarget [] | 140 | st <- atomically $ newSearch aSearch aTarget [] |
141 | ns <- atomically $ newTVar MM.empty | 141 | ns <- atomically $ newTVar MM.empty |
142 | mutex <- newMVar () -- This mutex insures one search at a time. | ||
142 | let astate = AnnounceState st ns | 143 | let astate = AnnounceState st ns |
143 | publishToNodes is = do | 144 | publishToNodes is = do |
144 | forM_ is $ \(Binding ni mtok _) -> do | 145 | forM_ is $ \(Binding ni mtok _) -> do |
@@ -156,24 +157,29 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar | |||
156 | searchAgain = do | 157 | searchAgain = do |
157 | -- Canceling a pending search here seems to make announcements more reliable. | 158 | -- Canceling a pending search here seems to make announcements more reliable. |
158 | searchCancel st | 159 | searchCancel st |
159 | isfin <- searchIsFinished st -- Always True, since we canceled. | 160 | return $ void $ do |
160 | return $ when isfin $ void $ fork search | 161 | t <- fork search |
162 | labelThread t ("scheduleAnnounce.sch." ++ show aTarget) | ||
161 | search = do -- thread to fork | 163 | search = do -- thread to fork |
162 | atomically $ reset aNearestNodes aSearch aTarget st | 164 | got <- tryTakeMVar mutex |
163 | searchLoop aSearch aTarget onResult st | 165 | case got of |
164 | fork $ do -- Announce to any nodes we haven't already announced to. | 166 | Just () -> do |
167 | atomically $ reset aNearestNodes aSearch aTarget st | ||
168 | searchLoop aSearch aTarget onResult st | ||
169 | -- Announce to any nodes we haven't already announced to. | ||
165 | is <- atomically $ do | 170 | is <- atomically $ do |
166 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) | 171 | bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) |
167 | nq <- readTVar ns | 172 | nq <- readTVar ns |
168 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) | 173 | return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) |
169 | $ MM.toList bs | 174 | $ MM.toList bs |
170 | publishToNodes is | 175 | publishToNodes is |
171 | return () | 176 | putMVar mutex () |
172 | {- | 177 | Nothing -> do |
173 | atomically $ scheduleImmediately announcer k | 178 | -- Previous search did not finish. Instead of starting a new search, |
174 | $ SearchFinished {- st -} search announce aInterval | 179 | -- we will re-announce only. |
175 | interruptDelay (interrutible announcer) | 180 | announce |
176 | -} | 181 | -- Cancel search so that a new one can start in the nest period. |
182 | atomically $ searchCancel st | ||
177 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | 183 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) |
178 | 184 | ||
179 | -- | Schedule a recurring Search + Publish sequence. | 185 | -- | Schedule a recurring Search + Publish sequence. |
@@ -181,6 +187,7 @@ scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () | |||
181 | scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do | 187 | scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do |
182 | st <- atomically $ newSearch sSearch sTarget [] | 188 | st <- atomically $ newSearch sSearch sTarget [] |
183 | ns <- atomically $ newTVar MM.empty | 189 | ns <- atomically $ newTVar MM.empty |
190 | mutex <- newMVar () -- This mutex insures one search at a time. | ||
184 | let astate = AnnounceState st ns | 191 | let astate = AnnounceState st ns |
185 | onResult sr = do | 192 | onResult sr = do |
186 | runAction announcer "with-search-result" $ do | 193 | runAction announcer "with-search-result" $ do |
@@ -197,10 +204,18 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge | |||
197 | searchAgain = do | 204 | searchAgain = do |
198 | -- Canceling a pending search here seems to make announcements more reliable. | 205 | -- Canceling a pending search here seems to make announcements more reliable. |
199 | searchCancel st | 206 | searchCancel st |
200 | isfin <- searchIsFinished st -- Always True, since we canceled. | 207 | return $ void $ do |
201 | return $ when isfin $ void $ fork search | 208 | t <- fork search |
202 | search = do -- thread to fork | 209 | labelThread t ("scheduleSearch.sch." ++ show sTarget) |
203 | atomically $ reset sNearestNodes sSearch sTarget st | 210 | search = do -- thread to fork |
204 | searchLoop sSearch sTarget onResult st | 211 | got <- tryTakeMVar mutex |
212 | case got of | ||
213 | Just () -> do | ||
214 | atomically $ reset sNearestNodes sSearch sTarget st | ||
215 | searchLoop sSearch sTarget onResult st | ||
216 | putMVar mutex () | ||
217 | Nothing -> do | ||
218 | -- Cancel search so that a new one can start in the nest period. | ||
219 | atomically $ searchCancel st | ||
205 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) | 220 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) |
206 | 221 | ||