diff options
author | Andrew Cady <d@jerkface.net> | 2015-12-06 03:07:52 -0500 |
---|---|---|
committer | Andrew Cady <d@jerkface.net> | 2015-12-06 03:07:59 -0500 |
commit | 38b6cd686acd70b0d94271e8256cd573ecc52ced (patch) | |
tree | b8c87b4e23f7beaccbb79d06aef5044757fa0730 | |
parent | 3f83abf93d0870f3420b6efe900949fc3f6f74ba (diff) |
Document/tweak RealTimeQueue & its example program
-rw-r--r-- | RealTimeQueue.hs | 101 | ||||
-rw-r--r-- | rtq.hs | 66 |
2 files changed, 114 insertions, 53 deletions
diff --git a/RealTimeQueue.hs b/RealTimeQueue.hs index dc40c84..78314b1 100644 --- a/RealTimeQueue.hs +++ b/RealTimeQueue.hs | |||
@@ -1,16 +1,88 @@ | |||
1 | module RealTimeQueue where | 1 | module RealTimeQueue (Queue, createQueue, RealTimeQueue.null, scheduleEvent, extractScheduledEvents, scheduleEventIO, runScheduledIO) where |
2 | import BasePrelude hiding ((<>)) | 2 | import BasePrelude hiding ((<>)) |
3 | import System.Clock | 3 | import System.Clock |
4 | import Data.IntPSQ as Q | 4 | import Data.IntPSQ as Q |
5 | import Control.Monad.IO.Class | 5 | import Control.Monad.IO.Class |
6 | import Data.Semigroup | 6 | import Data.Semigroup |
7 | 7 | ||
8 | scheduleEvent :: (TimeSpec, event) -> Queue event -> Queue event | 8 | -- The intended interface of this library is mainly the three functions |
9 | scheduleEvent (ts, ev) (Queue q i) = Queue (Q.insert i ts ev q) (i+1) | 9 | -- ''createQueue'', ''scheduleEventIO'' and ''runScheduledIO''. |
10 | |||
11 | createQueue :: Queue event | ||
12 | createQueue = Queue Q.empty 0 | ||
13 | |||
14 | -- The IO commands wrap pure code alternatives (which are also exported), using | ||
15 | -- MonadIO to add management of the clock and the ability to run a user-supplied | ||
16 | -- handler. The ''Monotonic'' clock type from ''System.Clock'' is used to define | ||
17 | -- scheduling priorities internally, but these functions expect time to be | ||
18 | -- specified relative to the present. | ||
19 | |||
20 | scheduleEventIO :: MonadIO m => (TimeSpec, event) -> Queue event -> m (Queue event) | ||
21 | scheduleEventIO (ts, ev) queue = do | ||
22 | now <- liftIO $ getTime Monotonic | ||
23 | return $ scheduleEvent (ts + now, ev) queue | ||
24 | |||
25 | runScheduledIO :: MonadIO m => TimeSpec -> QueueRunner m event -> Queue event -> m (Queue event) | ||
26 | runScheduledIO timeSpan runner queue = do | ||
27 | now <- liftIO $ getTime Monotonic | ||
28 | let (events, queue') = extractScheduledEvents now timeSpan queue | ||
29 | forM_ events $ \(ts, ev) -> runner (ts - now, ev) | ||
30 | return queue' | ||
31 | |||
32 | null :: Queue event -> Bool | ||
33 | null (Queue q _) = Q.null q | ||
34 | |||
35 | |||
36 | |||
37 | |||
38 | -- The queue runner receives a time, relative to the present, at which the event | ||
39 | -- is scheduled to occur. Thus, it can wait for this amount of time if desired | ||
40 | -- (or, in the case of ALSA, push the event onto the kernel queue with this | ||
41 | -- amount of delay). | ||
42 | |||
43 | -- It is possible for this time to be negative, in which case the event was | ||
44 | -- scheduled to happen in the past. | ||
10 | 45 | ||
46 | type QueueRunner m event = (TimeSpec, event) -> m () | ||
47 | |||
48 | |||
49 | |||
50 | |||
51 | -- It is important that the keys are never re-used. Therefore the constructor is | ||
52 | -- not exported. | ||
53 | |||
54 | -- (Yes, the keys can wrap around, but if your usage is anything remotely like a | ||
55 | -- FIFO, the earlier keys will be removed before they get reused. In order to | ||
56 | -- ensure that the keys _never_ wrap, IntPSQ could be replaced with HashPSQ and | ||
57 | -- the key type made Integer, but then the size of the keys would increase over | ||
58 | -- time.) | ||
59 | |||
60 | data Queue event = Queue { | ||
61 | _intQueue :: IntPSQ TimeSpec event, | ||
62 | _nextKey :: Int | ||
63 | } | ||
11 | instance Semigroup (Queue event) where | 64 | instance Semigroup (Queue event) where |
12 | _ <> q = q | 65 | _ <> q = q |
13 | 66 | ||
67 | |||
68 | |||
69 | |||
70 | |||
71 | -- These three functions provide a pure interface to the schedule queue. They | ||
72 | -- are agnostic as to the definition of time, but the IO versions below expect | ||
73 | -- that time should always be defined by the ''Monotonic'' clock in | ||
74 | -- System.Clock. | ||
75 | |||
76 | -- Schedule an event using absolute time. | ||
77 | scheduleEvent :: (TimeSpec, event) -> Queue event -> Queue event | ||
78 | scheduleEvent (ts, ev) (Queue q i) = Queue (Q.insert i ts ev q) (i+1) | ||
79 | |||
80 | -- Given an absolute current time and a timespan (relative to current) into the | ||
81 | -- future, remove and return all events from the queue that are scheduled | ||
82 | -- earlier than the end of the timespan. | ||
83 | |||
84 | -- The returned events specify their time as relative to the supplied absolute | ||
85 | -- current time. | ||
14 | extractScheduledEvents :: TimeSpec -> TimeSpec -> Queue event -> ([(TimeSpec, event)], Queue event) | 86 | extractScheduledEvents :: TimeSpec -> TimeSpec -> Queue event -> ([(TimeSpec, event)], Queue event) |
15 | extractScheduledEvents currentTime timeSpan = getAllScheduled | 87 | extractScheduledEvents currentTime timeSpan = getAllScheduled |
16 | where | 88 | where |
@@ -27,26 +99,3 @@ extractScheduledEvents currentTime timeSpan = getAllScheduled | |||
27 | Nothing -> Nothing | 99 | Nothing -> Nothing |
28 | Just (_, ts, _) | ts > currentTime + timeSpan -> Nothing | 100 | Just (_, ts, _) | ts > currentTime + timeSpan -> Nothing |
29 | Just (_, ts, ev) -> Just (ts - currentTime, ev) | 101 | Just (_, ts, ev) -> Just (ts - currentTime, ev) |
30 | |||
31 | type QueueRunner m event = TimeSpec -> event -> m () | ||
32 | |||
33 | scheduleEventIO :: MonadIO m => (TimeSpec, event) -> Queue event -> m (Queue event) | ||
34 | scheduleEventIO (ts, ev) queue = do | ||
35 | now <- liftIO $ getTime Monotonic | ||
36 | return $ scheduleEvent (ts + now, ev) queue | ||
37 | |||
38 | runScheduledIO :: MonadIO m => TimeSpec -> QueueRunner m event -> Queue event -> m (Queue event) | ||
39 | runScheduledIO timeSpan runner queue = do | ||
40 | now <- liftIO $ getTime Monotonic | ||
41 | let (events, queue') = extractScheduledEvents now timeSpan queue | ||
42 | forM_ events $ \(ts, ev) -> runner (ts - now) ev | ||
43 | return queue' | ||
44 | |||
45 | |||
46 | createQueue :: Queue event | ||
47 | createQueue = Queue Q.empty 0 | ||
48 | |||
49 | data Queue event = Queue { | ||
50 | intQueue :: IntPSQ TimeSpec event, | ||
51 | nextKey :: Int | ||
52 | } | ||
@@ -1,46 +1,58 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | {-# LANGUAGE ExplicitForAll #-} | ||
3 | {-# LANGUAGE KindSignatures #-} | ||
4 | |||
5 | import BasePrelude hiding ((.)) | 1 | import BasePrelude hiding ((.)) |
6 | import RealTimeQueue | 2 | import RealTimeQueue as Q |
7 | import Control.Monad.IO.Class | 3 | import Control.Monad.IO.Class |
8 | import Control.Monad.State | 4 | import Control.Monad.State |
9 | import System.Clock | 5 | import System.Clock |
10 | 6 | ||
11 | data LoopState = LoopState { | 7 | data LoopState = LoopState { |
12 | _queue :: Queue String | 8 | _queue :: Queue String, |
9 | _lastAction :: TimeSpec, | ||
10 | _beforeActing :: TimeSpec | ||
13 | } | 11 | } |
14 | 12 | ||
13 | type Loop r = StateT LoopState IO r | ||
14 | |||
15 | main :: IO () | 15 | main :: IO () |
16 | main = void $ runStateT main' (LoopState createQueue) | 16 | main = void $ runStateT (queueSomeStuff >> mainLoop) (LoopState createQueue (TimeSpec 0 0) (TimeSpec 0 0)) |
17 | 17 | where | |
18 | main' :: StateT LoopState IO () | 18 | queueSomeStuff = do |
19 | main' = do | 19 | now <- liftIO $ getTime Monotonic |
20 | queueAction $ scheduleEventIO (TimeSpec 1 0, "hello world") | 20 | modify $ \s -> s { _beforeActing = now, _lastAction = now } |
21 | queueAction $ scheduleEventIO (TimeSpec 2 0, "hello world") | 21 | forM_ [1,2,4,8,9,10,11,12] $ \i -> queueAction $ scheduleEventIO (TimeSpec i 0, "hello world " ++ show i) |
22 | queueAction $ scheduleEventIO (TimeSpec 3 0, "hello world") | 22 | |
23 | queueAction $ scheduleEventIO (TimeSpec 4 0, "hello world") | 23 | mainLoop :: Loop () |
24 | mainLoop = do | ||
25 | queueAction $ runScheduledIO tickTime runner | ||
26 | unlessEmptyQueue $ do | ||
27 | liftIO $ threadDelay' tickTime | ||
24 | mainLoop | 28 | mainLoop |
25 | 29 | ||
26 | queueAction :: forall (m :: * -> *). MonadState LoopState m => (Queue String -> m (Queue String)) -> m () | 30 | queueAction :: (Queue String -> Loop (Queue String)) -> Loop () |
27 | queueAction act = do | 31 | queueAction act = do |
28 | q <- gets _queue | 32 | q <- gets _queue |
29 | act q >>= modify . const . LoopState | 33 | act q >>= \q' -> modify $ \s -> s { _queue = q' } |
30 | 34 | ||
31 | tickTime :: TimeSpec | 35 | tickTime :: TimeSpec |
32 | tickTime = TimeSpec 0 15000 | 36 | tickTime = TimeSpec 0 1000 |
33 | 37 | ||
34 | threadDelay' :: TimeSpec -> IO () | 38 | threadDelay' :: TimeSpec -> IO () |
35 | threadDelay' = threadDelay . fromIntegral . timeSpecAsNanoSecs | 39 | threadDelay' = threadDelay . fromIntegral . timeSpecAsNanoSecs |
36 | 40 | ||
37 | runner :: TimeSpec -> String -> StateT LoopState IO () | 41 | timeSpecAsDouble :: TimeSpec -> Double |
38 | runner delay str = liftIO $ do | 42 | timeSpecAsDouble ts = x / (10^(9::Int)) where x = fromIntegral $ timeSpecAsNanoSecs ts |
39 | threadDelay' delay | 43 | |
40 | putStrLn str | 44 | runner :: (TimeSpec, String) -> Loop () |
41 | 45 | runner (delay, str) = do | |
42 | mainLoop :: StateT LoopState IO () | 46 | before <- gets _lastAction |
43 | mainLoop = do | 47 | muchBefore <- gets _beforeActing |
44 | queueAction $ runScheduledIO tickTime runner | 48 | after <- liftIO $ do |
45 | liftIO $ threadDelay' tickTime | 49 | threadDelay' delay |
46 | mainLoop | 50 | now <- liftIO $ getTime Monotonic |
51 | print (now, str, timeSpecAsDouble $ now - before, timeSpecAsDouble $ now - muchBefore) | ||
52 | return now | ||
53 | modify $ \s -> s { _lastAction = after } | ||
54 | |||
55 | unlessEmptyQueue :: Loop () -> Loop () | ||
56 | unlessEmptyQueue f = do | ||
57 | q <- gets _queue | ||
58 | unless (Q.null q) f | ||