summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Cady <d@jerkface.net>2018-06-18 17:17:15 -0400
committerAndrew Cady <d@jerkface.net>2018-06-18 18:26:26 -0400
commit4e0630466ec2fcdf3f5b5d4f581256be6927bb55 (patch)
treea6d61b15cc0eedfd357dc547e39cff1503fc5458
parentc87db3d4c1bc52396c160085d44e4bb814537e68 (diff)
announcer scheduling redux
-rw-r--r--Announcer.hs96
-rw-r--r--Announcer/Tox.hs3
2 files changed, 36 insertions, 63 deletions
diff --git a/Announcer.hs b/Announcer.hs
index e8288756..c9c4b60b 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -21,12 +21,10 @@ module Announcer
21 -- lower level, Announcer.Tox needs these. 21 -- lower level, Announcer.Tox needs these.
22 , scheduleImmediately 22 , scheduleImmediately
23 , ScheduledItem(ScheduledItem) 23 , ScheduledItem(ScheduledItem)
24 , interrutible
25 ) where 24 ) where
26 25
27import qualified Data.MinMaxPSQ as MM 26import qualified Data.MinMaxPSQ as MM
28import Data.Wrapper.PSQ as PSQ 27import Data.Wrapper.PSQ as PSQ
29import InterruptibleDelay
30import Network.Kademlia.Routing as R 28import Network.Kademlia.Routing as R
31import Network.Kademlia.Search 29import Network.Kademlia.Search
32 30
@@ -86,25 +84,20 @@ emptySchedule :: Schedule
86emptySchedule = Schedule PSQ.empty 84emptySchedule = Schedule PSQ.empty
87 85
88type KPS = (AnnounceKey, POSIXTime, ScheduledItem) 86type KPS = (AnnounceKey, POSIXTime, ScheduledItem)
89findNextScheduled :: Announcer -> STM KPS
90findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer)
91
92removeFromSchedule :: Announcer -> KPS -> STM ()
93removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule)
94 87
95scheduleToList :: Announcer -> STM [KPS] 88scheduleToList :: Announcer -> STM [KPS]
96scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) 89scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer)
97 90
98data Announcer = Announcer 91data Announcer = Announcer
99 { -- | The queue of scheduled events. The priority is the time at which an 92 { -- | The queue of scheduled events. The priority is the time at which an
100 -- event is supposed to occur. 93 -- event is supposed to occur. Do not write to this TVar ever.
101 scheduled :: TVar Schedule 94 scheduled :: TVar Schedule
102 -- | This TVar is False when the Announcer thread has finished. 95 -- | This TVar is False when the Announcer thread has finished.
103 , announcerActive :: TVar Bool 96 , announcerActive :: TVar Bool
104 -- | This delay is used to wait until it's time to act on the earliest 97 -- | This delay is used to wait until it's time to act on the earliest
105 -- scheduled item. It will be interrupted whenever a new item is 98 -- scheduled item. It will be interrupted whenever a new item is
106 -- scheduled. 99 -- scheduled.
107 , interrutible :: InterruptibleDelay 100 , commander :: TChan SchedulerCommand
108 } 101 }
109 102
110-- | Schedules an event to occur long ago at the epoch (which effectively makes 103-- | Schedules an event to occur long ago at the epoch (which effectively makes
@@ -112,34 +105,29 @@ data Announcer = Announcer
112-- also want to interrupt the 'interrutible' delay so that it finds this item 105-- also want to interrupt the 'interrutible' delay so that it finds this item
113-- immediately. 106-- immediately.
114scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () 107scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM ()
115scheduleImmediately announcer k item 108scheduleImmediately announcer k item = scheduleAbs announcer k item 0
116 = modifyTVar' (scheduled announcer) (Schedule . PSQ.insert' k item 0 . unSchedule)
117 109
118scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () 110scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM ()
119scheduleAbs announcer k item absTime = 111scheduleAbs announcer k item absTime =
120 modifyTVar (scheduled announcer) 112 writeTChan (commander announcer) $ ScheduleAction (k, absTime, item)
121 (Schedule . PSQ.insert' k item absTime . unSchedule)
122 113
123-- | Terminate the 'Announcer' thread. Don't use the Announcer after this. 114-- | Terminate the 'Announcer' thread. Don't use the Announcer after this.
124stopAnnouncer :: Announcer -> IO () 115stopAnnouncer :: Announcer -> IO ()
125stopAnnouncer announcer = do 116stopAnnouncer announcer = do
126 atomically $ scheduleImmediately announcer (AnnounceKey "*stop*") StopAnnouncer 117 atomically $ writeTChan (commander announcer) ShutdownScheduler
127 interruptDelay (interrutible announcer)
128 atomically $ readTVar (announcerActive announcer) >>= check . not 118 atomically $ readTVar (announcerActive announcer) >>= check . not
129 119
130cancel :: Announcer -> AnnounceKey -> IO () 120cancel :: Announcer -> AnnounceKey -> IO ()
131cancel announcer k = do 121cancel announcer k = do
132 atomically $ scheduleImmediately announcer k $ DeleteAnnouncement 122 atomically $ writeTChan (commander announcer) $ ScheduleAction (k, 0, (ScheduledItem (\a k p -> return $ atomically $ return ())))
133 interruptDelay (interrutible announcer)
134 123
135-- | Construct an 'Announcer' object and fork a thread in which to perform the 124-- | Construct an 'Announcer' object and fork a thread in which to perform the
136-- Kademlia searches and announces. 125-- Kademlia searches and announces.
137forkAnnouncer :: IO Announcer 126forkAnnouncer :: IO Announcer
138forkAnnouncer = do 127forkAnnouncer = do
139 delay <- interruptibleDelay
140 announcer <- atomically $ Announcer <$> newTVar emptySchedule 128 announcer <- atomically $ Announcer <$> newTVar emptySchedule
141 <*> newTVar True 129 <*> newTVar True
142 <*> pure delay 130 <*> newTChan
143 fork $ announceThread announcer 131 fork $ announceThread announcer
144 return announcer 132 return announcer
145 133
@@ -151,66 +139,54 @@ readTChanTimeout timeout pktChannel = do
151 <|> pure Nothing <* (readTVar >=> check) delay 139 <|> pure Nothing <* (readTVar >=> check) delay
152 where 140 where
153 toMicroseconds :: POSIXTime -> Int 141 toMicroseconds :: POSIXTime -> Int
154 toMicroseconds = undefined 142 toMicroseconds = round . (* 1000) . (* 1000)
155 143
156data SchedulerCommand = ShutdownScheduler | ScheduledAction KPS | SendNudes (TChan [KPS]) 144data SchedulerCommand = ShutdownScheduler | ScheduleAction KPS
157 145
158listener :: TChan SchedulerCommand -> IO () 146listener :: Announcer -> IO ()
159listener chan = relisten PSQ.empty 147listener announcer = relisten
148 -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule).
149 -- If that TVar is written in another thread, the changes may be overwritten here.
160 where 150 where
151 chan = commander announcer
161 note :: String -> IO () 152 note :: String -> IO ()
162 note = 153 note = if False then print else const (return ())
163 if False 154 relisten = do
164 then print 155 queue <- fmap unSchedule $ atomically $ readTVar $ scheduled announcer
165 else const (return ())
166 relisten queue = do
167 case minView queue of 156 case minView queue of
168 Nothing -> do 157 Nothing -> do
169 note "queue empty - listening indefinitely" 158 note "queue empty - listening indefinitely"
170 atomically (readTChan chan) >>= handleCommand 159 atomically (readTChan chan) >>= handleCommand
171 Just ((k, p, s), queue') -> do 160 Just ((k, p, s), queue') -> do
172 note "queue full - listening with timeout" 161 -- note "queue full - listening with timeout"
173 now <- getPOSIXTime 162 now <- getPOSIXTime
163 note $ "queue full - listening with timeout - " ++ show (p, now)
174 readTChanTimeout (p - now) chan >>= \case 164 readTChanTimeout (p - now) chan >>= \case
175 Just cmd -> handleCommand cmd 165 Just cmd -> handleCommand cmd
176 Nothing -> do 166 Nothing -> do
177 note "timed out - executing from queue" 167 note "timed out - executing from queue"
178 runAction s 168 atomically $ writeTVar (scheduled announcer) (Schedule queue')
179 mapM id =<< atomically (performScheduledItem undefined now (k, p, s)) 169 case s of
180 relisten queue' 170 DeleteAnnouncement -> relisten
171 StopAnnouncer -> declareInactive
172 ScheduledItem f -> (fork $ join $ atomically $ f announcer k now) >> relisten
173 relisten
181 where 174 where
182 handleCommand = \case 175 declareInactive = atomically $ writeTVar (announcerActive announcer) False
183 ShutdownScheduler -> return () 176 handleCommand =
184 ScheduledAction (k, p, s) -> relisten $ PSQ.insert' k s p queue 177 \case
185 SendNudes nudesChan -> do atomically $ writeTChan nudesChan (PSQ.toList queue) 178 ShutdownScheduler -> declareInactive
186 relisten queue 179 ScheduleAction (k, p, s) -> do
187 180 atomically $
188runAction :: Monad m => ScheduledItem -> m () 181 modifyTVar
189runAction DeleteAnnouncement = return () 182 (scheduled announcer)
190runAction StopAnnouncer = return () 183 (Schedule . PSQ.insert' k s p . unSchedule)
191runAction (ScheduledItem a) = undefined 184 relisten
192 185
193announceThread :: Announcer -> IO () 186announceThread :: Announcer -> IO ()
194announceThread announcer = do 187announceThread announcer = do
195 myThreadId >>= flip labelThread "announcer" 188 myThreadId >>= flip labelThread "announcer"
196 fix $ \loop -> do 189 listener announcer
197 item <- atomically $ findNextScheduled announcer
198
199 now <- getPOSIXTime
200 -- Is it time to do something?
201 if (prio item <= now)
202 then do -- Yes. Dequeue and handle this event.
203 action <- atomically $ do
204 removeFromSchedule announcer item
205 performScheduledItem announcer now item
206 -- Are we finished?
207 mapM_ (>> loop) -- No? Okay, perform scheduled op and loop.
208 action
209 else do -- No. Wait a bit.
210 startDelay (interrutible announcer) (microseconds $ prio item - now)
211 loop
212 -- We're done. Let 'stopAnnouncer' know that it can stop blocking.
213 atomically $ writeTVar (announcerActive announcer) False
214 190
215-- | Returns 'Nothing' to stop the announcer thread (when the event is 191-- | Returns 'Nothing' to stop the announcer thread (when the event is
216-- StopAnnouncer). Otherwise, returns an action that will be performed in the 192-- StopAnnouncer). Otherwise, returns an action that will be performed in the
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs
index 60a5a8b1..d0fc828f 100644
--- a/Announcer/Tox.hs
+++ b/Announcer/Tox.hs
@@ -134,7 +134,6 @@ reAnnounce checkFin announce interval = \announcer k now -> do
134 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) 134 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
135 return $ do 135 return $ do
136 isfin 136 isfin
137 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
138 announce 137 announce
139 138
140-- | Schedule a recurring Search + Announce sequence. 139-- | Schedule a recurring Search + Announce sequence.
@@ -178,7 +177,6 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar
178 interruptDelay (interrutible announcer) 177 interruptDelay (interrutible announcer)
179 -} 178 -}
180 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval) 179 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search announce aInterval)
181 interruptDelay (interrutible announcer)
182 180
183-- | Schedule a recurring Search + Publish sequence. 181-- | Schedule a recurring Search + Publish sequence.
184scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO () 182scheduleSearch :: Announcer -> AnnounceKey -> SearchMethod r -> r -> IO ()
@@ -208,5 +206,4 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge
208 atomically $ reset sNearestNodes sSearch sTarget st 206 atomically $ reset sNearestNodes sSearch sTarget st
209 searchLoop sSearch sTarget onResult st 207 searchLoop sSearch sTarget onResult st
210 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval) 208 atomically $ scheduleImmediately announcer k $ ScheduledItem (newAnnouncement searchAgain search (return ()) sInterval)
211 interruptDelay (interrutible announcer)
212 209