diff options
-rw-r--r-- | Announcer.hs | 62 | ||||
-rw-r--r-- | Announcer/Tox.hs | 6 | ||||
-rw-r--r-- | examples/dhtd.hs | 4 |
3 files changed, 44 insertions, 28 deletions
diff --git a/Announcer.hs b/Announcer.hs index f0d65656..ad121f13 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -7,8 +7,10 @@ | |||
7 | {-# LANGUAGE NamedFieldPuns #-} | 7 | {-# LANGUAGE NamedFieldPuns #-} |
8 | {-# LANGUAGE NondecreasingIndentation #-} | 8 | {-# LANGUAGE NondecreasingIndentation #-} |
9 | module Announcer | 9 | module Announcer |
10 | ( Announcer(scheduled) | 10 | ( Announcer() |
11 | , AnnounceKey | 11 | , AnnounceKey |
12 | , scheduleAbs | ||
13 | , scheduleToList | ||
12 | , packAnnounceKey | 14 | , packAnnounceKey |
13 | , unpackAnnounceKey | 15 | , unpackAnnounceKey |
14 | , forkAnnouncer | 16 | , forkAnnouncer |
@@ -77,10 +79,24 @@ itemStatusNum (ScheduledItem {}) = 1 | |||
77 | itemStatusNum (StopAnnouncer ) = 2 | 79 | itemStatusNum (StopAnnouncer ) = 2 |
78 | itemStatusNum _ = error "itemStatusNum not in sync with ScheduledItem declaration." | 80 | itemStatusNum _ = error "itemStatusNum not in sync with ScheduledItem declaration." |
79 | 81 | ||
82 | newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } | ||
83 | |||
84 | emptySchedule :: Schedule | ||
85 | emptySchedule = Schedule PSQ.empty | ||
86 | |||
87 | findNextScheduled :: Announcer -> STM (AnnounceKey, POSIXTime, ScheduledItem) | ||
88 | findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer) | ||
89 | |||
90 | removeFromSchedule :: Announcer -> (AnnounceKey, POSIXTime, ScheduledItem) -> STM () | ||
91 | removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule) | ||
92 | |||
93 | scheduleToList :: Announcer -> STM [(AnnounceKey, POSIXTime, ScheduledItem)] | ||
94 | scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) | ||
95 | |||
80 | data Announcer = Announcer | 96 | data Announcer = Announcer |
81 | { -- | The queue of scheduled events. The priority is the time at which an | 97 | { -- | The queue of scheduled events. The priority is the time at which an |
82 | -- event is supposed to occur. | 98 | -- event is supposed to occur. |
83 | scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) | 99 | scheduled :: TVar Schedule |
84 | -- | This TVar is False when the Announcer thread has finished. | 100 | -- | This TVar is False when the Announcer thread has finished. |
85 | , announcerActive :: TVar Bool | 101 | , announcerActive :: TVar Bool |
86 | -- | This delay is used to wait until it's time to act on the earliest | 102 | -- | This delay is used to wait until it's time to act on the earliest |
@@ -95,7 +111,12 @@ data Announcer = Announcer | |||
95 | -- immediately. | 111 | -- immediately. |
96 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () | 112 | scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () |
97 | scheduleImmediately announcer k item | 113 | scheduleImmediately announcer k item |
98 | = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) | 114 | = modifyTVar' (scheduled announcer) (Schedule . PSQ.insert' k item 0 . unSchedule) |
115 | |||
116 | scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () | ||
117 | scheduleAbs announcer k item absTime = | ||
118 | modifyTVar (scheduled announcer) | ||
119 | (Schedule . PSQ.insert' k item absTime . unSchedule) | ||
99 | 120 | ||
100 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. | 121 | -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. |
101 | stopAnnouncer :: Announcer -> IO () | 122 | stopAnnouncer :: Announcer -> IO () |
@@ -114,34 +135,31 @@ cancel announcer k = do | |||
114 | forkAnnouncer :: IO Announcer | 135 | forkAnnouncer :: IO Announcer |
115 | forkAnnouncer = do | 136 | forkAnnouncer = do |
116 | delay <- interruptibleDelay | 137 | delay <- interruptibleDelay |
117 | announcer <- atomically $ Announcer <$> newTVar PSQ.empty | 138 | announcer <- atomically $ Announcer <$> newTVar emptySchedule |
118 | <*> newTVar True | 139 | <*> newTVar True |
119 | <*> pure delay | 140 | <*> pure delay |
120 | fork $ announceThread announcer | 141 | fork $ announceThread announcer |
121 | return announcer | 142 | return announcer |
122 | 143 | ||
123 | |||
124 | announceThread :: Announcer -> IO () | 144 | announceThread :: Announcer -> IO () |
125 | announceThread announcer = do | 145 | announceThread announcer = do |
126 | myThreadId >>= flip labelThread "announcer" | 146 | myThreadId >>= flip labelThread "announcer" |
127 | fix $ \loop -> do | 147 | fix $ \loop -> do |
128 | join $ atomically $ do | 148 | item <- atomically $ findNextScheduled announcer |
129 | item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) | 149 | |
130 | return $ do | 150 | now <- getPOSIXTime |
131 | now <- getPOSIXTime | 151 | -- Is it time to do something? |
132 | -- Is it time to do something? | 152 | if (prio item <= now) |
133 | if (prio item <= now) | 153 | then do -- Yes. Dequeue and handle this event. |
134 | then do -- Yes. Dequeue and handle this event. | 154 | action <- atomically $ do |
135 | action <- atomically $ do | 155 | removeFromSchedule announcer item |
136 | modifyTVar' (scheduled announcer) | 156 | performScheduledItem announcer now item |
137 | (PSQ.delete (key item)) | 157 | -- Are we finished? |
138 | performScheduledItem announcer now item | 158 | mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. |
139 | -- Are we finished? | 159 | action |
140 | mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. | 160 | else do -- No. Wait a bit. |
141 | action | 161 | startDelay (interrutible announcer) (microseconds $ prio item - now) |
142 | else do -- No. Wait a bit. | 162 | loop |
143 | startDelay (interrutible announcer) (microseconds $ prio item - now) | ||
144 | loop | ||
145 | -- We're done. Let 'stopAnnouncer' know that it can stop blocking. | 163 | -- We're done. Let 'stopAnnouncer' know that it can stop blocking. |
146 | atomically $ writeTVar (announcerActive announcer) False | 164 | atomically $ writeTVar (announcerActive announcer) False |
147 | 165 | ||
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs index f79f8ee7..60a5a8b1 100644 --- a/Announcer/Tox.hs +++ b/Announcer/Tox.hs | |||
@@ -116,8 +116,7 @@ newAnnouncement :: STM (IO a) | |||
116 | -> POSIXTime | 116 | -> POSIXTime |
117 | -> STM (IO ()) | 117 | -> STM (IO ()) |
118 | newAnnouncement checkFin search announce interval = \announcer k now -> do | 118 | newAnnouncement checkFin search announce interval = \announcer k now -> do |
119 | modifyTVar (scheduled announcer) | 119 | scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) |
120 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
121 | return $ void $ fork search | 120 | return $ void $ fork search |
122 | 121 | ||
123 | -- time for periodic announce: | 122 | -- time for periodic announce: |
@@ -132,8 +131,7 @@ reAnnounce :: STM (IO a) | |||
132 | -> STM (IO ()) | 131 | -> STM (IO ()) |
133 | reAnnounce checkFin announce interval = \announcer k now -> do | 132 | reAnnounce checkFin announce interval = \announcer k now -> do |
134 | isfin <- checkFin | 133 | isfin <- checkFin |
135 | modifyTVar (scheduled announcer) | 134 | scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval) |
136 | (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)) | ||
137 | return $ do | 135 | return $ do |
138 | isfin | 136 | isfin |
139 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now | 137 | hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 7cf3393c..35035372 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -988,8 +988,8 @@ clientSession s@Session{..} sock cnum h = do | |||
988 | ("a", "") -> cmd0 $ do | 988 | ("a", "") -> cmd0 $ do |
989 | now <- getPOSIXTime | 989 | now <- getPOSIXTime |
990 | rs <- atomically $ do | 990 | rs <- atomically $ do |
991 | as <- readTVar (scheduled $ announcer) | 991 | as <- scheduleToList announcer |
992 | forM (PSQ.toList as) $ \(k,ptm,item) -> do | 992 | forM (as) $ \(k,ptm,item) -> do |
993 | kstr <- unpackAnnounceKey announcer k | 993 | kstr <- unpackAnnounceKey announcer k |
994 | return [ if ptm==0 then "now" | 994 | return [ if ptm==0 then "now" |
995 | else show (ptm - now) | 995 | else show (ptm - now) |