From 58a6ff596876e8a3aa1bb55ac0fb2befb633fa75 Mon Sep 17 00:00:00 2001 From: Andrew Cady Date: Mon, 18 Jun 2018 08:28:47 -0400 Subject: compiles --- Announcer.hs | 48 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 45 insertions(+), 3 deletions(-) (limited to 'Announcer.hs') diff --git a/Announcer.hs b/Announcer.hs index ad121f13..7d1d605d 100644 --- a/Announcer.hs +++ b/Announcer.hs @@ -33,6 +33,7 @@ import Network.Kademlia.Search import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad +import Control.Applicative import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Function @@ -84,13 +85,14 @@ newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime Scheduled emptySchedule :: Schedule emptySchedule = Schedule PSQ.empty -findNextScheduled :: Announcer -> STM (AnnounceKey, POSIXTime, ScheduledItem) +type KPS = (AnnounceKey, POSIXTime, ScheduledItem) +findNextScheduled :: Announcer -> STM KPS findNextScheduled announcer = maybe retry return =<< (findMin . unSchedule) <$> readTVar (scheduled announcer) -removeFromSchedule :: Announcer -> (AnnounceKey, POSIXTime, ScheduledItem) -> STM () +removeFromSchedule :: Announcer -> KPS -> STM () removeFromSchedule announcer item = modifyTVar' (scheduled announcer) (Schedule . PSQ.delete (key item) . unSchedule) -scheduleToList :: Announcer -> STM [(AnnounceKey, POSIXTime, ScheduledItem)] +scheduleToList :: Announcer -> STM [KPS] scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) data Announcer = Announcer @@ -141,6 +143,46 @@ forkAnnouncer = do fork $ announceThread announcer return announcer +readTChanTimeout :: POSIXTime -> TChan a -> IO (Maybe a) +readTChanTimeout timeout pktChannel = do + delay <- registerDelay (toMicroseconds timeout) + atomically $ + Just <$> readTChan pktChannel + <|> pure Nothing <* (readTVar >=> check) delay + where + toMicroseconds :: POSIXTime -> Int + toMicroseconds = undefined + +listener :: TChan KPS -> IO () +listener chan = relisten PSQ.empty + where + note :: String -> IO () + note = if False then print else const (return ()) + relisten queue = do + case minView queue of + Nothing -> do + note "queue empty - listening indefinitely" + (k, p, s) <- atomically $ readTChan chan + note "handling new event" + relisten $ PSQ.insert' k s p queue + Just ((k, p, s), queue') -> do + note "queue full - listening with timeout" + now <- getPOSIXTime + readTChanTimeout (p - now) chan >>= \case + Just (k, p, s) -> do + note "handling new event (event occurred before timeout)" + relisten $ PSQ.insert' k s p queue + Nothing -> do + note "timed out - executing from queue" + runAction s + mapM id =<< atomically (performScheduledItem undefined now (k, p, s)) + relisten queue' + +runAction :: Monad m => ScheduledItem -> m () +runAction DeleteAnnouncement = return () +runAction StopAnnouncer = return () +runAction (ScheduledItem a) = undefined + announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" -- cgit v1.2.3