{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE NondecreasingIndentation #-} module Announcer ( Announcer() , AnnounceKey , scheduleAbs , scheduleToList , packAnnounceKey , unpackAnnounceKey , encodeAnnounceKey , decodeAnnounceKey , forkAnnouncer , stopAnnouncer , cancel , runAction , unschedule , delayAction , scheduleRel -- lower level, Announcer.Tox needs these. , scheduleImmediately , ScheduledItem(ScheduledItem) ) where import Data.Wrapper.PSQ as PSQ import Control.Applicative import Control.Concurrent.Lifted.Instrument import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Char8 import Data.Hashable import Data.Time.Clock (NominalDiffTime) import Data.Time.Clock.POSIX import qualified GHC.Generics as Generics -- import Generic.Data.Internal.Meta as Lyxia -- | Actions are scheduled and canceled via this key. newtype AnnounceKey = AnnounceKey ByteString deriving (Hashable,Ord,Eq) packAnnounceKey :: Announcer -> String -> AnnounceKey packAnnounceKey _ = AnnounceKey . Char8.pack unpackAnnounceKey :: Announcer -> AnnounceKey -> String unpackAnnounceKey _ (AnnounceKey bs) = Char8.unpack bs encodeAnnounceKey :: Announcer -> AnnounceKey -> ByteString encodeAnnounceKey _ (AnnounceKey k) = k decodeAnnounceKey :: Announcer -> ByteString -> AnnounceKey decodeAnnounceKey _ k = AnnounceKey k -- | Actions that can be scheduled to occur at some particular time in the -- future. Since periodic event handlers are responsible for re-scheduling -- themselves, they are typically bootstrapped using 'scheduleImmediately' with -- 'NewAnnouncement' which triggers the ordinary recurring scheduling of -- 'Announce'. newtype ScheduledItem = ScheduledItem (Announcer -> AnnounceKey -> POSIXTime -> STM (IO ())) deriving Generics.Generic newtype Schedule = Schedule { unSchedule :: PSQ' AnnounceKey POSIXTime ScheduledItem } emptySchedule :: Schedule emptySchedule = Schedule PSQ.empty type KPS = (AnnounceKey, POSIXTime, ScheduledItem) scheduleToList :: Announcer -> STM [KPS] scheduleToList announcer = PSQ.toList . unSchedule <$> readTVar (scheduled announcer) data Announcer = Announcer { -- | The queue of scheduled events. The priority is the time at which an -- event is supposed to occur. scheduled :: TVar Schedule -- | This TVar is False when the Announcer thread has finished. , announcerActive :: TVar Bool -- | This channel is used to communicate instructions to the timing -- thread. It can be used to schedule actions or shutdown the thread. , commander :: TChan SchedulerCommand } -- | Schedule an action to happen immediately. scheduleImmediately :: Announcer -> AnnounceKey -> ScheduledItem -> STM () scheduleImmediately announcer k item = writeTChan (commander announcer) $ RunActionSTM k item -- | Schedule an action to happen at a specific time. scheduleAbs :: Announcer -> AnnounceKey -> ScheduledItem -> POSIXTime -> STM () scheduleAbs announcer k item absTime = writeTChan (commander announcer) $ ScheduleAction (k, absTime, item) -- | Schedule an action to occur s specified delta into the future from now. scheduleRel, delayAction :: Announcer -> AnnounceKey -> ScheduledItem -> NominalDiffTime -> STM () scheduleRel announcer k item relTime = writeTChan (commander announcer) $ DelayAction (k, relTime, item) -- | Schedule an action to occur s specified delta into the future from now. -- This is an alias for 'scheduleRel'. delayAction = scheduleRel -- | Fork an action immediately. The String argument is a thread label. runAction :: Announcer -> String -> IO () -> STM () runAction announcer lbl = writeTChan (commander announcer) . RunAction lbl -- | Terminate the 'Announcer' thread. Don't use the Announcer after this. stopAnnouncer :: Announcer -> IO () stopAnnouncer announcer = do atomically $ writeTChan (commander announcer) ShutdownScheduler atomically $ readTVar (announcerActive announcer) >>= check . not -- | Cancel a scheduled action (STM). unschedule :: Announcer -> AnnounceKey -> STM () unschedule announcer k = writeTChan (commander announcer) $ UnscheduleAction k -- | Cancel a scheduled action (IO). cancel :: Announcer -> AnnounceKey -> IO () cancel = ((.).(.)) atomically unschedule -- | Construct an 'Announcer' object and fork a thread in which to perform the -- Kademlia searches and announces. forkAnnouncer :: IO Announcer forkAnnouncer = do announcer <- atomically $ Announcer <$> newTVar emptySchedule <*> newTVar True <*> newTChan tid <- forkIO $ listener announcer labelThread tid "announcer" return announcer -- Wait for an item to occur on the given channel, or for the given TVar to be -- True. readTChanTimeout :: TVar Bool -> TChan a -> STM (Maybe a) readTChanTimeout delay pktChannel = orElse (Just <$> readTChan pktChannel) (pure Nothing <* (readTVar >=> check) delay) data SchedulerCommand = ShutdownScheduler | ScheduleAction KPS -- run an action at an absolute time | DelayAction KPS -- run an action at a time relative to the present | RunAction String (IO ()) | RunActionSTM AnnounceKey ScheduledItem | UnscheduleAction AnnounceKey -- This is the main scheduling thread that polls the commander TChan and forks -- scheduled actions. listener :: Announcer -> IO () listener announcer = relisten -- This function assumes it is the only writer to the (scheduled announcer :: TVar Schedule). -- If that TVar is written in another thread, the changes may be overwritten here. where chan = commander announcer note :: String -> IO () note = if False then print else const (return ()) relisten = do queue <- fmap unSchedule $ atomically $ readTVar $ scheduled announcer case minView queue of Nothing -> do note "queue empty - listening indefinitely" atomically (readTChan chan) >>= handleCommand Just ((k, p, ScheduledItem f), queue') -> do now <- getPOSIXTime note $ "queue non-empty - listening with timeout - " ++ show (p, now) delay <- registerDelay (toMicroseconds (p - now)) join $ atomically $ do readTChanTimeout delay chan >>= \case Just cmd -> return $ handleCommand cmd Nothing -> do writeTVar (scheduled announcer) (Schedule queue') io <- f announcer k now return $ do forkLabeled ("announcer:item:"++unpackAnnounceKey announcer k) io relisten where modifyScheduled f = modifyTVar (scheduled announcer) (Schedule . f . unSchedule) declareInactive = writeTVar (announcerActive announcer) False handleCommand ShutdownScheduler = atomically declareInactive handleCommand cmd = (>> relisten) $ case cmd of ScheduleAction (k, p, s) -> atomically $ modifyScheduled $ PSQ.insert' k s p UnscheduleAction k -> atomically $ modifyScheduled $ PSQ.delete k RunAction lbl io -> void $ forkLabeled lbl io RunActionSTM k (ScheduledItem f) -> do now <- getPOSIXTime void . fork . join . atomically $ modifyScheduled (PSQ.delete k) >> f announcer k now DelayAction (k, p, s) -> do now <- getPOSIXTime atomically $ modifyScheduled $ PSQ.insert' k s (now + p)