diff options
author | Andrew Cady <d@jerkface.net> | 2018-06-18 08:28:47 -0400 |
---|---|---|
committer | Andrew Cady <d@jerkface.net> | 2018-06-18 18:20:29 -0400 |
commit | 58a6ff596876e8a3aa1bb55ac0fb2befb633fa75 (patch) | |
tree | 56117fe6e7038f29ea9da29a3c74fa431d9aa434 | |
parent | 7bf9b4bc9d327c8952559f1c670bdf607fc7ac82 (diff) |
compiles
-rw-r--r-- | Announcer.hs | 48 |
1 files changed, 45 insertions, 3 deletions
diff --git a/Announcer.hs b/Announcer.hs index ad121f13..7d1d605d 100644 --- a/Announcer.hs +++ b/Announcer.hs | |||
@@ -33,6 +33,7 @@ import Network.Kademlia.Search | |||
33 | import Control.Concurrent.Lifted.Instrument | 33 | import Control.Concurrent.Lifted.Instrument |
34 | import Control.Concurrent.STM | 34 | import Control.Concurrent.STM |
35 | import Control.Monad | 35 | import Control.Monad |
36 | import Control.Applicative | ||
36 | import Data.ByteString (ByteString) | 37 | import Data.ByteString (ByteString) |
37 | import qualified Data.ByteString.Char8 as Char8 | 38 | import qualified Data.ByteString.Char8 as Char8 |
38 | import Data.Function | 39 | import Data.Function |
@@ -84,13 +85,14 @@ newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime Scheduled | |||
84 | emptySchedule :: Schedule | 85 | emptySchedule :: Schedule |
85 | emptySchedule = Schedule PSQ.empty | 86 | emptySchedule = Schedule PSQ.empty |
86 | 87 | ||
87 | findNextScheduled :: Announcer -> STM (AnnounceKey, POSIXTime, ScheduledItem) | 88 | type KPS = (AnnounceKey, POSIXTime, ScheduledItem) |
89 | findNextScheduled :: Announcer -> STM KPS | ||
88 | findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer) | 90 | findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer) |
89 | 91 | ||
90 | removeFromSchedule :: Announcer -> (AnnounceKey, POSIXTime, ScheduledItem) -> STM () | 92 | removeFromSchedule :: Announcer -> KPS -> STM () |
91 | removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule) | 93 | removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule) |
92 | 94 | ||
93 | scheduleToList :: Announcer -> STM [(AnnounceKey, POSIXTime, ScheduledItem)] | 95 | scheduleToList :: Announcer -> STM [KPS] |
94 | scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) | 96 | scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) |
95 | 97 | ||
96 | data Announcer = Announcer | 98 | data Announcer = Announcer |
@@ -141,6 +143,46 @@ forkAnnouncer = do | |||
141 | fork $ announceThread announcer | 143 | fork $ announceThread announcer |
142 | return announcer | 144 | return announcer |
143 | 145 | ||
146 | readTChanTimeout :: POSIXTime -> TChan a -> IO (Maybe a) | ||
147 | readTChanTimeout timeout pktChannel = do | ||
148 | delay <- registerDelay (toMicroseconds timeout) | ||
149 | atomically $ | ||
150 | Just <$> readTChan pktChannel | ||
151 | <|> pure Nothing <* (readTVar >=> check) delay | ||
152 | where | ||
153 | toMicroseconds :: POSIXTime -> Int | ||
154 | toMicroseconds = undefined | ||
155 | |||
156 | listener :: TChan KPS -> IO () | ||
157 | listener chan = relisten PSQ.empty | ||
158 | where | ||
159 | note :: String -> IO () | ||
160 | note = if False then print else const (return ()) | ||
161 | relisten queue = do | ||
162 | case minView queue of | ||
163 | Nothing -> do | ||
164 | note "queue empty - listening indefinitely" | ||
165 | (k, p, s) <- atomically $ readTChan chan | ||
166 | note "handling new event" | ||
167 | relisten $ PSQ.insert' k s p queue | ||
168 | Just ((k, p, s), queue') -> do | ||
169 | note "queue full - listening with timeout" | ||
170 | now <- getPOSIXTime | ||
171 | readTChanTimeout (p - now) chan >>= \case | ||
172 | Just (k, p, s) -> do | ||
173 | note "handling new event (event occurred before timeout)" | ||
174 | relisten $ PSQ.insert' k s p queue | ||
175 | Nothing -> do | ||
176 | note "timed out - executing from queue" | ||
177 | runAction s | ||
178 | mapM id =<< atomically (performScheduledItem undefined now (k, p, s)) | ||
179 | relisten queue' | ||
180 | |||
181 | runAction :: Monad m => ScheduledItem -> m () | ||
182 | runAction DeleteAnnouncement = return () | ||
183 | runAction StopAnnouncer = return () | ||
184 | runAction (ScheduledItem a) = undefined | ||
185 | |||
144 | announceThread :: Announcer -> IO () | 186 | announceThread :: Announcer -> IO () |
145 | announceThread announcer = do | 187 | announceThread announcer = do |
146 | myThreadId >>= flip labelThread "announcer" | 188 | myThreadId >>= flip labelThread "announcer" |