{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer() , AnnounceKey , scheduleAbs , scheduleToList , packAnnounceKey , unpackAnnounceKey , forkAnnouncer , stopAnnouncer , cancel , itemStatusNum , runAction , unschedule , delayAction , scheduleRel -- lower level, Announcer.Tox needs these. , scheduleImmediately , ScheduledItem(ScheduledItem) ) where import Data.Wrapper.PSQ as PSQ import Control.Applicative import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Hashable import Data.Time.Clock.POSIX import qualified GHC.Generics as Generics -- import Generic.Data.Internal.Meta as Lyxia newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) instance Show AnnounceKey where show (AnnounceKey bs) = "AnnounceKey " ++ show (Char8.unpack bs) packAnnounceKey :: Announcer -> String -> AnnounceKey packAnnounceKey _ = AnnounceKey . Char8.pack unpackAnnounceKey :: Announcer -> AnnounceKey -> String unpackAnnounceKey _ (AnnounceKey bs) = Char8.unpack bs -- | Actions that can be scheduled to occur at some particular time in the -- future. Since periodic event handlers are responsible for re-scheduling -- themselves, they are typically bootstrapped using 'scheduleImmediately' with -- 'NewAnnouncement' which triggers the ordinary recurring scheduling of -- 'Announce'. data ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) -- Can't use Data because STM and IO. :( -- deriving Data {- itemStatusNum sch = constrIndex $ toConstr sch -} -- Using Generic works with Lyxia's generic-data package... -- {- itemStatusNum sch = Lyxia.conIdToInt $ Lyxia.conId sch -} -- For now, make sure to keep 'itemStatusNum' up to date with 'ScheduledItem'. deriving Generics.Generic itemStatusNum :: ScheduledItem -> Int itemStatusNum = const 1 newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } emptySchedule :: Schedule emptySchedule = Schedule PSQ.empty type KPS = (AnnounceKey, POSIXTime, ScheduledItem) scheduleToList :: Announcer -> STM [KPS] scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) data Announcer = Announcer { -- | The queue of scheduled events. The priority is the time at which an -- event is supposed to occur. Do not write to this TVar ever. scheduled :: TVar Schedule -- | This TVar is False when the Announcer thread has finished. , announcerActive :: TVar Bool -- | This delay is used to wait until it's time to act on the earliest -- scheduled item. It will be interrupted whenever a new item is -- scheduled. , commander :: TChan SchedulerCommand } scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = writeTChan (commander announcer) $ RunActionSTM k item scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () scheduleAbs announcer k item absTime = writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () scheduleRel announcer k item relTime = writeTChan (commander announcer) $ DelayAction (k, relTime, item) delayAction = scheduleRel runAction :: Announcer -> IO () -> STM () runAction announcer = writeTChan (commander announcer) . RunAction -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ writeTChan (commander announcer) ShutdownScheduler atomically $ readTVar (announcerActive announcer) >>= check . not unschedule :: Announcer -> AnnounceKey -> STM () unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k cancel :: Announcer -> AnnounceKey -> IO () cancel = ((.).(.)) atomically unschedule -- | Construct an 'Announcer' object and fork a thread in which to perform the -- Kademlia searches and announces. forkAnnouncer :: IO Announcer forkAnnouncer = do announcer <- atomically $ Announcer <$> newTVar emptySchedule <*> newTVar True <*> newTChan fork $ announceThread announcer return announcer readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) readTChanTimeout delay pktChannel = do Just <$> readTChan pktChannel <|> pure Nothing <* (readTVar >=> check) delay data SchedulerCommand = ShutdownScheduler | ScheduleAction KPS -- run an action at an absolute time (todo: use UTCTime) | DelayAction KPS -- run an action at a time relative to the present (todo: use NominalDiffTime) | RunAction (IO ()) | RunActionSTM AnnounceKey ScheduledItem | UnscheduleAction AnnounceKey listener :: Announcer -> IO () listener announcer = relisten -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). -- If that TVar is written in another thread, the changes may be overwritten here. where chan = commander announcer note :: String -> IO () note = if False then print else const (return ()) relisten = do queue <- fmap unSchedule $ atomically $ readTVar $ scheduled announcer case minView queue of Nothing -> do note "queue empty - listening indefinitely" atomically (readTChan chan) >>= handleCommand Just ((k, p, ScheduledItem f), queue') -> do now <- getPOSIXTime note $ "queue full - listening with timeout - " ++ show (p, now) delay <- registerDelay (toMicroseconds (p - now)) join $ atomically $ do readTChanTimeout delay chan >>= \case Just cmd -> return $ handleCommand cmd Nothing -> do writeTVar (scheduled announcer) (Schedule queue') (fmap (>> relisten) (fmap fork (f announcer k now))) where modifyScheduled f = modifyTVar (scheduled announcer) (Schedule . f . unSchedule) declareInactive = writeTVar (announcerActive announcer) False handleCommand ShutdownScheduler = atomically declareInactive handleCommand cmd = (>> relisten) $ case cmd of ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k RunAction io -> void $ fork io RunActionSTM k (ScheduledItem f) -> do now <- getPOSIXTime void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now DelayAction (k, p, s) -> do now <- getPOSIXTime atomically $ modifyScheduled $ PSQ.insert' k s (now + p) announceThread :: Announcer -> IO () announceThread announcer = do myThreadId >>= flip labelThread "announcer" listener announcer