diff options
author | Andrew Cady <d@jerkface.net> | 2018-06-18 17:17:15 -0400 |
---|---|---|
committer | Andrew Cady <d@jerkface.net> | 2018-06-18 18:26:26 -0400 |
commit | 4e0630466ec2fcdf3f5b5d4f581256be6927bb55 (patch) | |
tree | a6d61b15cc0eedfd357dc547e39cff1503fc5458 | |
parent | c87db3d4c1bc52396c160085d44e4bb814537e68 (diff) |
announcer scheduling redux
-rw-r--r-- | Announcer.hs | 96 | ||||
-rw-r--r-- | Announcer/Tox.hs | 3 |
2 files changed, 36 insertions, 63 deletions
diff --git a/Announcer.hs b/Announcer.hs index e8288756..c9c4b60b 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -21,12 +21,10 @@ module Announcer | |||
21 | -- lower level, Announcer.Tox needs these. | 21 | -- lower level, Announcer.Tox needs these. |
22 | , scheduleImmediately | 22 | , scheduleImmediately |
23 | , ScheduledItem(ScheduledItem) | 23 | , ScheduledItem(ScheduledItem) |
24 | , interrutible | ||
25 | ) where | 24 | ) where |
26 | 25 | ||
27 | import qualified Data.MinMaxPSQ as MM | 26 | import qualified Data.MinMaxPSQ as MM |
28 | import Data.Wrapper.PSQ as PSQ | 27 | import Data.Wrapper.PSQ as PSQ |
29 | import InterruptibleDelay | ||
30 | import Network.Kademlia.Routing as R | 28 | import Network.Kademlia.Routing as R |
31 | import Network.Kademlia.Search | 29 | import Network.Kademlia.Search |
32 | 30 | ||
@@ -86,25 +84,20 @@ emptySchedule :: Schedule | |||
86 | emptySchedule = Schedule PSQ.empty | 84 | emptySchedule = Schedule PSQ.empty |
87 | 85 | ||
88 | type KPS = (AnnounceKey, POSIXTime, ScheduledItem) | 86 | type KPS = (AnnounceKey, POSIXTime, ScheduledItem) |
89 | findNextScheduled :: Announcer -> STM KPS | ||
90 | findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer) | ||
91 | |||
92 | removeFromSchedule :: Announcer -> KPS -> STM () | ||
93 | removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule) | ||
94 | 87 | ||
95 | scheduleToList :: Announcer -> STM [KPS] | 88 | scheduleToList :: Announcer -> STM [KPS] |
96 | scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) | 89 | scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) |
97 | 90 | ||
98 | data Announcer = Announcer | 91 | data Announcer = Announcer |
99 | { -- | The queue of scheduled events. The priority is the time at which an | 92 | { -- | The queue of scheduled events. The priority is the time at which an |
100 | -- event is supposed to occur. | 93 | -- event is supposed to occur. Do not write to this TVar ever. |
101 | scheduled :: TVar Schedule | 94 | scheduled :: TVar Schedule |
102 | -- | This TVar is False when the Announcer thread has finished. | 95 | -- | This TVar is False when the Announcer thread has finished. |
103 | , announcerActive :: TVar Bool | 96 | , announcerActive :: TVar Bool |
104 | -- | This delay is used to wait until it's time to act on the earliest | 97 | -- | This delay is used to wait until it's time to act on the earliest |
105 | -- scheduled item. It will be interrupted whenever a new item is | 98 | -- scheduled item. It will be interrupted whenever a new item is |
106 | -- scheduled. | 99 | -- scheduled. |
107 | , interrutible :: InterruptibleDelay | 100 | , commander :: TChan SchedulerCommand |
108 | } | 101 | } |
109 | 102 | ||
110 | -- | Schedules an event to occur long ago at the epoch (which effectively makes | 103 | -- | Schedules an event to occur long ago at the epoch (which effectively makes |
@@ -112,34 +105,29 @@ data Announcer = Announcer | |||
112 | -- also want to interrupt the 'interrutible' delay so that it finds this item | 105 | -- also want to interrupt the 'interrutible' delay so that it finds this item |
113 | -- immediately. | 106 | -- immediately. |
114 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () | 107 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () |
115 | scheduleImmediately announcer k item | 108 | scheduleImmediately announcer k item = scheduleAbs announcer k item 0 |
116 | = modifyTVar' (scheduled announcer) (Schedule . PSQ.insert' k item 0 . unSchedule) | ||
117 | 109 | ||
118 | scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () | 110 | scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () |
119 | scheduleAbs announcer k item absTime = | 111 | scheduleAbs announcer k item absTime = |
120 | modifyTVar (scheduled announcer) | 112 | writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) |
121 | (Schedule . PSQ.insert' k item absTime . unSchedule) | ||
122 | 113 | ||
123 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. | 114 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. |
124 | stopAnnouncer :: Announcer -> IO () | 115 | stopAnnouncer :: Announcer -> IO () |
125 | stopAnnouncer announcer = do | 116 | stopAnnouncer announcer = do |
126 | atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer | 117 | atomically $ writeTChan (commander announcer) ShutdownScheduler |
127 | interruptDelay (interrutible announcer) | ||
128 | atomically $ readTVar (announcerActive announcer) >>= check . not | 118 | atomically $ readTVar (announcerActive announcer) >>= check . not |
129 | 119 | ||
130 | cancel :: Announcer -> AnnounceKey -> IO () | 120 | cancel :: Announcer -> AnnounceKey -> IO () |
131 | cancel announcer k = do | 121 | cancel announcer k = do |
132 | atomically $ scheduleImmediately announcer k $ DeleteAnnouncement | 122 | atomically $ writeTChan (commander announcer) $ ScheduleAction (k, 0, (ScheduledItem (\a k p -> return $ atomically $ return ()))) |
133 | interruptDelay (interrutible announcer) | ||
134 | 123 | ||
135 | -- | Construct an 'Announcer' object and fork a thread in which to perform the | 124 | -- | Construct an 'Announcer' object and fork a thread in which to perform the |
136 | -- Kademlia searches and announces. | 125 | -- Kademlia searches and announces. |
137 | forkAnnouncer :: IO Announcer | 126 | forkAnnouncer :: IO Announcer |
138 | forkAnnouncer = do | 127 | forkAnnouncer = do |
139 | delay <- interruptibleDelay | ||
140 | announcer <- atomically $ Announcer <$> newTVar emptySchedule | 128 | announcer <- atomically $ Announcer <$> newTVar emptySchedule |
141 | <*> newTVar True | 129 | <*> newTVar True |
142 | <*> pure delay | 130 | <*> newTChan |
143 | fork $ announceThread announcer | 131 | fork $ announceThread announcer |
144 | return announcer | 132 | return announcer |
145 | 133 | ||
@@ -151,66 +139,54 @@ readTChanTimeout timeout pktChannel = do | |||
151 | <|> pure Nothing <* (readTVar >=> check) delay | 139 | <|> pure Nothing <* (readTVar >=> check) delay |
152 | where | 140 | where |
153 | toMicroseconds :: POSIXTime -> Int | 141 | toMicroseconds :: POSIXTime -> Int |
154 | toMicroseconds = undefined | 142 | toMicroseconds = round . (* 1000) . (* 1000) |
155 | 143 | ||
156 | data SchedulerCommand = ShutdownScheduler | ScheduledAction KPS | SendNudes (TChan [KPS]) | 144 | data SchedulerCommand = ShutdownScheduler | ScheduleAction KPS |
157 | 145 | ||
158 | listener :: TChan SchedulerCommand -> IO () | 146 | listener :: Announcer -> IO () |
159 | listener chan = relisten PSQ.empty | 147 | listener announcer = relisten |
148 | -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). | ||
149 | -- If that TVar is written in another thread, the changes may be overwritten here. | ||
160 | where | 150 | where |
151 | chan = commander announcer | ||
161 | note :: String -> IO () | 152 | note :: String -> IO () |
162 | note = | 153 | note = if False then print else const (return ()) |
163 | if False | 154 | relisten = do |
164 | then print | 155 | queue <- fmap unSchedule $ atomically $ readTVar $ scheduled announcer |
165 | else const (return ()) | ||
166 | relisten queue = do | ||
167 | case minView queue of | 156 | case minView queue of |
168 | Nothing -> do | 157 | Nothing -> do |
169 | note "queue empty - listening indefinitely" | 158 | note "queue empty - listening indefinitely" |
170 | atomically (readTChan chan) >>= handleCommand | 159 | atomically (readTChan chan) >>= handleCommand |
171 | Just ((k, p, s), queue') -> do | 160 | Just ((k, p, s), queue') -> do |
172 | note "queue full - listening with timeout" | 161 | -- note "queue full - listening with timeout" |
173 | now <- getPOSIXTime | 162 | now <- getPOSIXTime |
163 | note $ "queue full - listening with timeout - " ++ show (p, now) | ||
174 | readTChanTimeout (p - now) chan >>= \case | 164 | readTChanTimeout (p - now) chan >>= \case |
175 | Just cmd -> handleCommand cmd | 165 | Just cmd -> handleCommand cmd |
176 | Nothing -> do | 166 | Nothing -> do |
177 | note "timed out - executing from queue" | 167 | note "timed out - executing from queue" |
178 | runAction s | 168 | atomically $ writeTVar (scheduled announcer) (Schedule queue') |
179 | mapM id =<< atomically (performScheduledItem undefined now (k, p, s)) | 169 | case s of |
180 | relisten queue' | 170 | DeleteAnnouncement -> relisten |
171 | StopAnnouncer -> declareInactive | ||
172 | ScheduledItem f -> (fork $ join $ atomically $ f announcer k now) >> relisten | ||
173 | relisten | ||
181 | where | 174 | where |
182 | handleCommand = \case | 175 | declareInactive = atomically $ writeTVar (announcerActive announcer) False |
183 | ShutdownScheduler -> return () | 176 | handleCommand = |
184 | ScheduledAction (k, p, s) -> relisten $ PSQ.insert' k s p queue | 177 | \case |
185 | SendNudes nudesChan -> do atomically $ writeTChan nudesChan (PSQ.toList queue) | 178 | ShutdownScheduler -> declareInactive |
186 | relisten queue | 179 | ScheduleAction (k, p, s) -> do |
187 | 180 | atomically $ | |
188 | runAction :: Monad m => ScheduledItem -> m () | 181 | modifyTVar |
189 | runAction DeleteAnnouncement = return () | 182 | (scheduled announcer) |
190 | runAction StopAnnouncer = return () | 183 | (Schedule . PSQ.insert' k s p . unSchedule) |
191 | runAction (ScheduledItem a) = undefined | 184 | relisten |
192 | 185 | ||
193 | announceThread :: Announcer -> IO () | 186 | announceThread :: Announcer -> IO () |
194 | announceThread announcer = do | 187 | announceThread announcer = do |
195 | myThreadId >>= flip labelThread "announcer" | 188 | myThreadId >>= flip labelThread "announcer" |
196 | fix $ \loop -> do | 189 | listener announcer |
197 | item <- atomically $ findNextScheduled announcer | ||
198 | |||
199 | now <- getPOSIXTime | ||
200 | -- Is it time to do something? | ||
201 | if (prio item <= now) | ||
202 | then do -- Yes. Dequeue and handle this event. | ||
203 | action <- atomically $ do | ||
204 | removeFromSchedule announcer item | ||
205 | performScheduledItem announcer now item | ||
206 | -- Are we finished? | ||
207 | mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. | ||
208 | action | ||
209 | else do -- No. Wait a bit. | ||
210 | startDelay (interrutible announcer) (microseconds $ prio item - now) | ||
211 | loop | ||
212 | -- We're done. Let 'stopAnnouncer' know that it can stop blocking. | ||
213 | atomically $ writeTVar (announcerActive announcer) False | ||
214 | 190 | ||
215 | -- | Returns 'Nothing' to stop the announcer thread (when the event is | 191 | -- | Returns 'Nothing' to stop the announcer thread (when the event is |
216 | -- StopAnnouncer). Otherwise, returns an action that will be performed in the | 192 | -- StopAnnouncer). Otherwise, returns an action that will be performed in the |
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs index 60a5a8b1..d0fc828f 100644 --- a/Announcer/Tox.hs +++ b/Announcer/Tox.hs | |||
@@ -134,7 +134,6 @@ reAnnounce checkFin announce interval = \announcer k now -> do | |||
134 | scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) | 134 | scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) |
135 | return $ do | 135 | return $ do |
136 | isfin | 136 | isfin |
137 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | ||
138 | announce | 137 | announce |
139 | 138 | ||
140 | -- | Schedule a recurring Search + Announce sequence. | 139 | -- | Schedule a recurring Search + Announce sequence. |
@@ -178,7 +177,6 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar | |||
178 | interruptDelay (interrutible announcer) | 177 | interruptDelay (interrutible announcer) |
179 | -} | 178 | -} |
180 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) | 179 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) |
181 | interruptDelay (interrutible announcer) | ||
182 | 180 | ||
183 | -- | Schedule a recurring Search + Publish sequence. | 181 | -- | Schedule a recurring Search + Publish sequence. |
184 | scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () | 182 | scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () |
@@ -208,5 +206,4 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge | |||
208 | atomically $ reset sNearestNodes sSearch sTarget st | 206 | atomically $ reset sNearestNodes sSearch sTarget st |
209 | searchLoop sSearch sTarget onResult st | 207 | searchLoop sSearch sTarget onResult st |
210 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) | 208 | atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) |
211 | interruptDelay (interrutible announcer) | ||
212 | 209 | ||