summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-06 20:22:55 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-06 22:01:59 -0500
commit6732956abab4c78cdd4ec127881394c5265db5eb (patch)
treeeb53d9334c054de250bdd32d3d3b26bac76f5e97
parent3f63b9bcbd5c3871f3a31fa10e4f1e49efea1c39 (diff)
Avoid overlapping searches.
-rw-r--r--dht/Announcer/Tox.hs55
1 files changed, 35 insertions, 20 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs
index 4b775049..e2459e0e 100644
--- a/dht/Announcer/Tox.hs
+++ b/dht/Announcer/Tox.hs
@@ -139,6 +139,7 @@ scheduleAnnounce :: Announcer -> AnnounceKey -> AnnounceMethod r -> r -> IO ()
139scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do 139scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTarget,aInterval} r = do
140 st <- atomically $ newSearch aSearch aTarget [] 140 st <- atomically $ newSearch aSearch aTarget []
141 ns <- atomically $ newTVar MM.empty 141 ns <- atomically $ newTVar MM.empty
142 mutex <- newMVar () -- This mutex insures one search at a time.
142 let astate = AnnounceState st ns 143 let astate = AnnounceState st ns
143 publishToNodes is = do 144 publishToNodes is = do
144 forM_ is $ \(Binding ni mtok _) -> do 145 forM_ is $ \(Binding ni mtok _) -> do
@@ -156,24 +157,29 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar
156 searchAgain = do 157 searchAgain = do
157 -- Canceling a pending search here seems to make announcements more reliable. 158 -- Canceling a pending search here seems to make announcements more reliable.
158 searchCancel st 159 searchCancel st
159 isfin <- searchIsFinished st -- Always True, since we canceled. 160 return $ void $ do
160 return $ when isfin $ void $ fork search 161 t <- fork search
162 labelThread t ("scheduleAnnounce.sch." ++ show aTarget)
161 search = do -- thread to fork 163 search = do -- thread to fork
162 atomically $ reset aNearestNodes aSearch aTarget st 164 got <- tryTakeMVar mutex
163 searchLoop aSearch aTarget onResult st 165 case got of
164 fork $ do -- Announce to any nodes we haven't already announced to. 166 Just () -> do
167 atomically $ reset aNearestNodes aSearch aTarget st
168 searchLoop aSearch aTarget onResult st
169 -- Announce to any nodes we haven't already announced to.
165 is <- atomically $ do 170 is <- atomically $ do
166 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -}) 171 bs <- readTVar (searchInformant st {- :: TVar (MinMaxPSQ' ni nid tok -})
167 nq <- readTVar ns 172 nq <- readTVar ns
168 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq) 173 return $ filter (\(Binding ni _ _) -> not $ isJust $ MM.lookup' ni nq)
169 $ MM.toList bs 174 $ MM.toList bs
170 publishToNodes is 175 publishToNodes is
171 return () 176 putMVar mutex ()
172 {- 177 Nothing -> do
173 atomically $ scheduleImmediately announcer k 178 -- Previous search did not finish. Instead of starting a new search,
174 $ SearchFinished {- st -} search announce aInterval 179 -- we will re-announce only.
175 interruptDelay (interrutible announcer) 180 announce
176 -} 181 -- Cancel search so that a new one can start in the nest period.
182 atomically $ searchCancel st
177 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) 183 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
178 184
179-- | Schedule a recurring Search + Publish sequence. 185-- | Schedule a recurring Search + Publish sequence.
@@ -181,6 +187,7 @@ scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO ()
181scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do 187scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarget,sInterval} r = do
182 st <- atomically $ newSearch sSearch sTarget [] 188 st <- atomically $ newSearch sSearch sTarget []
183 ns <- atomically $ newTVar MM.empty 189 ns <- atomically $ newTVar MM.empty
190 mutex <- newMVar () -- This mutex insures one search at a time.
184 let astate = AnnounceState st ns 191 let astate = AnnounceState st ns
185 onResult sr = do 192 onResult sr = do
186 runAction announcer "with-search-result" $ do 193 runAction announcer "with-search-result" $ do
@@ -197,10 +204,18 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge
197 searchAgain = do 204 searchAgain = do
198 -- Canceling a pending search here seems to make announcements more reliable. 205 -- Canceling a pending search here seems to make announcements more reliable.
199 searchCancel st 206 searchCancel st
200 isfin <- searchIsFinished st -- Always True, since we canceled. 207 return $ void $ do
201 return $ when isfin $ void $ fork search 208 t <- fork search
202 search = do -- thread to fork 209 labelThread t ("scheduleSearch.sch." ++ show sTarget)
203 atomically $ reset sNearestNodes sSearch sTarget st 210 search = do -- thread to fork
204 searchLoop sSearch sTarget onResult st 211 got <- tryTakeMVar mutex
212 case got of
213 Just () -> do
214 atomically $ reset sNearestNodes sSearch sTarget st
215 searchLoop sSearch sTarget onResult st
216 putMVar mutex ()
217 Nothing -> do
218 -- Cancel search so that a new one can start in the nest period.
219 atomically $ searchCancel st
205 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) 220 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)
206 221