summaryrefslogtreecommitdiff
path: root/Control
diff options
context:
space:
mode:
Diffstat (limited to 'Control')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs160
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs169
2 files changed, 329 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
new file mode 100644
index 00000000..d1c977ae
--- /dev/null
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -0,0 +1,160 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.StatusCache
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- Motivation: Suppose you must communicate state changes over a stream. If
10-- the stream is consumed slowly, then it may occur that backlogged state
11-- changes are obsoleted by newer changes in state. In this case, it is
12-- desirable to discard the obsolete messages from the stream.
13--
14-- A streamed status message might be very large, and it would be wasteful in
15-- both time and space, to treat it as a monolithic blob that must be built
16-- completely before it can be sent. Therefore, we require that each message
17-- consist of a stream of smaller chunks of type @x@ and we require predicates
18-- that indicate when a chunk starts a new message and when it ends a message.
19--
20-- In the folowing example, our chunk type is Char and complete messages are
21-- delimited by the characters '(' and ')'. We process the input stream
22-- \"(aaa(a))(bb)(cc(c)cc)\" first with a delayed processor and then again with
23-- an efficient dedicated thread. The result follows:
24--
25-- > Backlogged consumer: (cc(c)cc)
26-- > Fast consumer: (aaa(a))(bb)(cc(c)cc)
27--
28-- The complete source code:
29--
30-- > import Control.Monad (when, forever, (>=>))
31-- > import Control.Monad.STM (atomically)
32-- > import Control.Concurrent (forkIO, threadDelay)
33-- > import System.IO (hFlush, stdout)
34-- > import qualified Control.Concurrent.STM.StatusCache as Cache
35-- >
36-- > main = do q <- atomically $ Cache.new (== '(') (==')')
37-- > backlog q "(aaa(a))(bb)(cc(c)cc)"
38-- > fast q "(aaa(a))(bb)(cc(c)cc)"
39-- >
40-- > while pred body = pred >>= flip when (body >> while pred body)
41-- >
42-- > backlog q xs = do putStr $ "Backlogged consumer: "
43-- > mapM_ (atomically . Cache.push q) xs
44-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
45-- > c <- atomically $ Cache.pull q
46-- > putChar c
47-- > putStrLn ""
48-- > hFlush stdout
49-- >
50-- > fast q xs = do putStr "Fast consumer: "
51-- > forkIO $ forever $ do
52-- > c <- atomically $ Cache.pull q
53-- > putChar c
54-- > hFlush stdout
55-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
56-- > xs
57-- > putStrLn ""
58--
59-- As shown above, it is intended that this module be imported qualified.
60--
61{-# LANGUAGE DoAndIfThenElse #-}
62module Control.Concurrent.STM.StatusCache
63 ( StatusCache
64 , new
65 , push
66 , pull
67 , pullDepth
68 , isStopper
69 , isStarter
70 , isEmpty
71 ) where
72import Control.Monad
73import Control.Monad.STM
74import Control.Concurrent.STM.TVar
75import Control.Concurrent.STM.TChan
76
77data StatusCache x =
78 StatusCache { feed :: TVar (TChan x)
79 , cache :: TVar (TChan x)
80 , feedFlag :: TVar Bool
81 , pushDepth :: TVar Int
82 , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data
83 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
84 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
85 }
86
87-- | @new isStart isStop@
88--
89-- The @isStart@ and @isStop@ predicates indicate when an element
90-- begins or ends a message.
91new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
92new isStart isStop = do
93 feed <- newTChan >>= newTVar
94 cache <- newTChan >>= newTVar
95 flag <- newTVar True
96 pushd <- newTVar 0
97 pulld <- newTVar 0
98 return StatusCache { feed = feed
99 , cache = cache
100 , feedFlag = flag
101 , pushDepth = pushd
102 , pullDepth = pulld
103 , isStarter = isStart
104 , isStopper = isStop
105 }
106
107
108-- | Pull off a chunk from the 'StatusCache' for processing.
109--
110-- If chunks are not pulled off quickly, they may be obsoleted
111-- and discarded when new messages are 'push'ed.
112pull :: StatusCache x -> STM x
113pull q = do
114 hasCache <- readTVar (feedFlag q)
115 exhausted <- readTVar (feed q) >>= isEmptyTChan
116 when (hasCache && exhausted) $ do
117 next <- newTChan >>= swapTVar (cache q)
118 writeTVar (feedFlag q) False
119 writeTVar (feed q) next
120 chan <- readTVar $ feed q
121 exhausted <- isEmptyTChan chan
122 if exhausted then retry
123 else do
124 v <- readTChan chan
125 when (isStarter q v) $ do
126 depth <- readTVar (pullDepth q)
127 modifyTVar' (pullDepth q) (+1)
128 when (depth==0)
129 $ writeTVar (feedFlag q) False
130 when (isStopper q v)
131 $ modifyTVar' (pullDepth q) (subtract 1)
132 return v
133
134-- | Enqueue a chunk into the 'StatusCache'.
135push :: StatusCache a -> a -> STM ()
136push q v = do
137 shouldCache <- readTVar (feedFlag q)
138 depth <- readTVar (pushDepth q)
139 when (isStopper q v) $ do
140 modifyTVar' (pushDepth q) (subtract 1)
141 when (depth==0)
142 $ writeTVar (feedFlag q) True
143 when (isStarter q v)
144 $ modifyTVar' (pushDepth q) (+1)
145 chan <-
146 if shouldCache then do
147 when (depth==0 && isStarter q v)
148 $ newTChan
149 >>= writeTVar (cache q)
150 readTVar $ cache q
151 else do
152 readTVar $ feed q
153 writeTChan chan v
154
155-- | True when the 'StatusCache' is completely exhuasted.
156isEmpty :: StatusCache x -> STM Bool
157isEmpty q = do
158 empty_feed <- readTVar (feed q) >>= isEmptyTChan
159 empty_cache <- readTVar (cache q) >>= isEmptyTChan
160 return $ empty_feed && empty_cache
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
new file mode 100644
index 00000000..b01fc0bc
--- /dev/null
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -0,0 +1,169 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.UpdateStream
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- An UpdateSream consists of pass-through messages that are queued up and
10-- slotted messages which might be obsoleted and discarded if they are not
11-- consumed before newer slotted messages with the same slot value occur.
12--
13-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
14-- recommended that you read that documentation first.
15--
16-- For example, the output
17--
18-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
19-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
20--
21-- is produced by the following code:
22--
23-- > import Control.Monad (when, forever, (>=>))
24-- > import Control.Monad.STM (atomically)
25-- > import Control.Concurrent (forkIO, threadDelay)
26-- > import System.IO (hFlush, stdout)
27-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
28-- >
29-- > messages :: [(Maybe String, Char)]
30-- > messages = concat
31-- > [ slot "x" "(set x = 2)"
32-- > , message "(Hello)"
33-- > , slot "y" "(set y = 23)"
34-- > , message "(Joe)"
35-- > , slot "y" "(set y = 100)"
36-- > , message "(was)"
37-- > , slot "x" "(set x = 5)"
38-- > , message "(here)"
39-- > ]
40-- > where
41-- > slot v cs = map ((,) (Just v)) cs
42-- > message cs = map ((,) Nothing) cs
43-- >
44-- > main = do
45-- > q <- atomically $ Cache.new (== '(') (==')')
46-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
47-- > >=> const (threadDelay 10000))
48-- > messages
49-- > slowly = do
50-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
51-- > c <- atomically $ Cache.pull q
52-- > putChar c
53-- > putStrLn ""
54-- > hFlush stdout
55-- > where while pred body =
56-- > pred >>= flip when (body >> while pred body)
57-- > quickly = forkIO . forever $ do
58-- > c <- atomically $ Cache.pull q
59-- > putChar c
60-- > hFlush stdout
61-- > putStr $ "Backlogged consumer: "
62-- > go >> slowly
63-- > putStr "Fast consumer: "
64-- > quickly >> go
65-- > putStrLn ""
66--
67module Control.Concurrent.STM.UpdateStream
68 ( UpdateStream
69 , new
70 , push
71 , pull
72 , isStopper
73 , isStarter
74 , isEmpty
75 ) where
76
77import Control.Monad
78import Control.Monad.STM
79import Control.Concurrent.STM.TVar
80import Control.Concurrent.STM.TChan
81import Control.Concurrent.STM.StatusCache (StatusCache)
82import qualified Control.Concurrent.STM.StatusCache as Status
83import Data.Map (Map)
84import qualified Data.Map as Map
85import Data.Maybe
86import Data.Foldable (foldlM)
87
88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x
91 , inMessage :: TVar (Maybe (StatusCache x))
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 }
95
96-- | @new isStart isStop@
97--
98-- The @isStart@ and @isStop@ predicates indicate when an element
99-- begins or ends a message.
100new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
101new isStart isStop = do
102 cache <- newTVar Map.empty
103 events <- newTChan
104 inMessage <- newTVar Nothing
105 return UpdateStream { cache = cache
106 , events = events
107 , inMessage = inMessage
108 , isStarter = isStart
109 , isStopper = isStop
110 }
111
112-- | Enqueue a chunk into the 'UpdateStream'
113--
114-- If a slot is provided, then the message may be obsoleted when a new message
115-- starts with the same slot value. Otherwise, the chunk is preserved. Note
116-- that the same slot value must be passed again for every chunk of the message
117-- as it is not remembered.
118--
119-- Note that although a slot value is provided on each push, it is assumed that
120-- messages will be pushed in contiguous streams. To change this, we would
121-- need to add a mutable integer to track the nesting level of the primary
122-- (non-slotted) stream.
123push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
124push u Nothing x = writeTChan (events u) x
125push u (Just n) x = do
126 smap <- readTVar (cache u)
127 scache <-
128 case Map.lookup n smap of
129 Just scache -> return scache
130 Nothing -> do
131 scache <- Status.new (isStarter u) (isStopper u)
132 modifyTVar (cache u) (Map.insert n scache)
133 return scache
134
135 Status.push scache x
136
137-- | Pull off a chunk from the 'UpdateStream' for processing.
138--
139-- If chunks are not pulled off quickly, they may be obsoleted
140-- and discarded when new messages are 'push'ed.
141pull :: Ord slot => UpdateStream slot x -> STM x
142pull u = do
143 mbscache <- readTVar $ inMessage u
144 let action =
145 case mbscache of
146 Just scache -> do
147 x <- Status.pull scache
148 nesting <- readTVar (Status.pullDepth scache)
149 when (nesting==0) $ writeTVar (inMessage u) Nothing
150 return x
151 Nothing -> do
152 cs <- fmap (Map.toList) $ readTVar (cache u)
153 cs <- filterM (return . not <=< Status.isEmpty . snd)
154 cs
155 maybe (readTChan $ events u)
156 (\(n,scache) -> do
157 writeTVar (inMessage u) (Just scache) -- (Just n)
158 Status.pull scache)
159 (listToMaybe cs)
160 x <- action
161 return x
162
163-- | True when the 'UpdateStream' is completely exhuasted.
164isEmpty :: UpdateStream slot x -> STM Bool
165isEmpty q = do
166 e <- isEmptyTChan (events q)
167 qs <- readTVar (cache q)
168 d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
169 return $ e && d