summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Announcer.hs62
-rw-r--r--Announcer/Tox.hs6
-rw-r--r--examples/dhtd.hs4
3 files changed, 44 insertions, 28 deletions
diff --git a/Announcer.hs b/Announcer.hs
index f0d65656..ad121f13 100644
--- a/Announcer.hs
+++ b/Announcer.hs
@@ -7,8 +7,10 @@
7{-# LANGUAGE NamedFieldPuns #-} 7{-# LANGUAGE NamedFieldPuns #-}
8{-# LANGUAGE NondecreasingIndentation #-} 8{-# LANGUAGE NondecreasingIndentation #-}
9module Announcer 9module Announcer
10 ( Announcer(scheduled) 10 ( Announcer()
11 , AnnounceKey 11 , AnnounceKey
12 , scheduleAbs
13 , scheduleToList
12 , packAnnounceKey 14 , packAnnounceKey
13 , unpackAnnounceKey 15 , unpackAnnounceKey
14 , forkAnnouncer 16 , forkAnnouncer
@@ -77,10 +79,24 @@ itemStatusNum (ScheduledItem {}) = 1
77itemStatusNum (StopAnnouncer ) = 2 79itemStatusNum (StopAnnouncer ) = 2
78itemStatusNum _ = error "itemStatusNum not in sync with ScheduledItem declaration." 80itemStatusNum _ = error "itemStatusNum not in sync with ScheduledItem declaration."
79 81
82newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem }
83
84emptySchedule :: Schedule
85emptySchedule = Schedule PSQ.empty
86
87findNextScheduled :: Announcer -> STM (AnnounceKey, POSIXTime, ScheduledItem)
88findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer)
89
90removeFromSchedule :: Announcer -> (AnnounceKey, POSIXTime, ScheduledItem) -> STM ()
91removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule)
92
93scheduleToList :: Announcer -> STM [(AnnounceKey, POSIXTime, ScheduledItem)]
94scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer)
95
80data Announcer = Announcer 96data Announcer = Announcer
81 { -- | The queue of scheduled events. The priority is the time at which an 97 { -- | The queue of scheduled events. The priority is the time at which an
82 -- event is supposed to occur. 98 -- event is supposed to occur.
83 scheduled :: TVar (PSQ' AnnounceKey POSIXTime ScheduledItem) 99 scheduled :: TVar Schedule
84 -- | This TVar is False when the Announcer thread has finished. 100 -- | This TVar is False when the Announcer thread has finished.
85 , announcerActive :: TVar Bool 101 , announcerActive :: TVar Bool
86 -- | This delay is used to wait until it's time to act on the earliest 102 -- | This delay is used to wait until it's time to act on the earliest
@@ -95,7 +111,12 @@ data Announcer = Announcer
95-- immediately. 111-- immediately.
96scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () 112scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM ()
97scheduleImmediately announcer k item 113scheduleImmediately announcer k item
98 = modifyTVar' (scheduled announcer) (PSQ.insert' k item 0) 114 = modifyTVar' (scheduled announcer) (Schedule . PSQ.insert' k item 0 . unSchedule)
115
116scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM ()
117scheduleAbs announcer k item absTime =
118 modifyTVar (scheduled announcer)
119 (Schedule . PSQ.insert' k item absTime . unSchedule)
99 120
100-- | Terminate the 'Announcer' thread. Don't use the Announcer after this. 121-- | Terminate the 'Announcer' thread. Don't use the Announcer after this.
101stopAnnouncer :: Announcer -> IO () 122stopAnnouncer :: Announcer -> IO ()
@@ -114,34 +135,31 @@ cancel announcer k = do
114forkAnnouncer :: IO Announcer 135forkAnnouncer :: IO Announcer
115forkAnnouncer = do 136forkAnnouncer = do
116 delay <- interruptibleDelay 137 delay <- interruptibleDelay
117 announcer <- atomically $ Announcer <$> newTVar PSQ.empty 138 announcer <- atomically $ Announcer <$> newTVar emptySchedule
118 <*> newTVar True 139 <*> newTVar True
119 <*> pure delay 140 <*> pure delay
120 fork $ announceThread announcer 141 fork $ announceThread announcer
121 return announcer 142 return announcer
122 143
123
124announceThread :: Announcer -> IO () 144announceThread :: Announcer -> IO ()
125announceThread announcer = do 145announceThread announcer = do
126 myThreadId >>= flip labelThread "announcer" 146 myThreadId >>= flip labelThread "announcer"
127 fix $ \loop -> do 147 fix $ \loop -> do
128 join $ atomically $ do 148 item <- atomically $ findNextScheduled announcer
129 item <- maybe retry return =<< findMin <$> readTVar (scheduled announcer) 149
130 return $ do 150 now <- getPOSIXTime
131 now <- getPOSIXTime 151 -- Is it time to do something?
132 -- Is it time to do something? 152 if (prio item <= now)
133 if (prio item <= now) 153 then do -- Yes. Dequeue and handle this event.
134 then do -- Yes. Dequeue and handle this event. 154 action <- atomically $ do
135 action <- atomically $ do 155 removeFromSchedule announcer item
136 modifyTVar' (scheduled announcer) 156 performScheduledItem announcer now item
137 (PSQ.delete (key item)) 157 -- Are we finished?
138 performScheduledItem announcer now item 158 mapM_ (>> loop) -- No? Okay, perform scheduled op and loop.
139 -- Are we finished? 159 action
140 mapM_ (>> loop) -- No? Okay, perform scheduled op and loop. 160 else do -- No. Wait a bit.
141 action 161 startDelay (interrutible announcer) (microseconds $ prio item - now)
142 else do -- No. Wait a bit. 162 loop
143 startDelay (interrutible announcer) (microseconds $ prio item - now)
144 loop
145 -- We're done. Let 'stopAnnouncer' know that it can stop blocking. 163 -- We're done. Let 'stopAnnouncer' know that it can stop blocking.
146 atomically $ writeTVar (announcerActive announcer) False 164 atomically $ writeTVar (announcerActive announcer) False
147 165
diff --git a/Announcer/Tox.hs b/Announcer/Tox.hs
index f79f8ee7..60a5a8b1 100644
--- a/Announcer/Tox.hs
+++ b/Announcer/Tox.hs
@@ -116,8 +116,7 @@ newAnnouncement :: STM (IO a)
116 -> POSIXTime 116 -> POSIXTime
117 -> STM (IO ()) 117 -> STM (IO ())
118newAnnouncement checkFin search announce interval = \announcer k now -> do 118newAnnouncement checkFin search announce interval = \announcer k now -> do
119 modifyTVar (scheduled announcer) 119 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
120 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
121 return $ void $ fork search 120 return $ void $ fork search
122 121
123-- time for periodic announce: 122-- time for periodic announce:
@@ -132,8 +131,7 @@ reAnnounce :: STM (IO a)
132 -> STM (IO ()) 131 -> STM (IO ())
133reAnnounce checkFin announce interval = \announcer k now -> do 132reAnnounce checkFin announce interval = \announcer k now -> do
134 isfin <- checkFin 133 isfin <- checkFin
135 modifyTVar (scheduled announcer) 134 scheduleAbs announcer k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval)
136 (PSQ.insert' k (ScheduledItem $ reAnnounce checkFin announce interval) (now + interval))
137 return $ do 135 return $ do
138 isfin 136 isfin
139 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now 137 hPutStrLn stderr $ "This print avoids negative-time future scheduling. Weird bug. TODO: fix it. "++show now
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 7cf3393c..35035372 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -988,8 +988,8 @@ clientSession s@Session{..} sock cnum h = do
988 ("a", "") -> cmd0 $ do 988 ("a", "") -> cmd0 $ do
989 now <- getPOSIXTime 989 now <- getPOSIXTime
990 rs <- atomically $ do 990 rs <- atomically $ do
991 as <- readTVar (scheduled $ announcer) 991 as <- scheduleToList announcer
992 forM (PSQ.toList as) $ \(k,ptm,item) -> do 992 forM (as) $ \(k,ptm,item) -> do
993 kstr <- unpackAnnounceKey announcer k 993 kstr <- unpackAnnounceKey announcer k
994 return [ if ptm==0 then "now" 994 return [ if ptm==0 then "now"
995 else show (ptm - now) 995 else show (ptm - now)