1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
---------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent.STM.StatusCache
--
--
-- Maintainer : joe@jerkface.net
-- Stability : experimental
--
-- Motivation: Suppose you must communicate state changes over a stream. If
-- the stream is consumed slowly, then it may occur that backlogged state
-- changes are obsoleted by newer changes in state. In this case, it is
-- desirable to discard the obsolete messages from the stream.
--
-- A streamed status message might be very large, and it would be wasteful in
-- both time and space, to treat it as a monolithic blob that must be built
-- completely before it can be sent. Therefore, we require that each message
-- consist of a stream of smaller chunks of type @x@ and we require predicates
-- that indicate when a chunk starts a new message and when it ends a message.
--
-- In the folowing example, our chunk type is Char and complete messages are
-- delimited by the characters '(' and ')'. We process the input stream
-- \"(aaa(a))(bb)(cc(c)cc)\" first with a delayed processor and then again with
-- an efficient dedicated thread. The result follows:
--
-- > Backlogged consumer: (cc(c)cc)
-- > Fast consumer: (aaa(a))(bb)(cc(c)cc)
--
-- The complete source code:
--
-- > import Control.Monad (when, forever, (>=>))
-- > import Control.Monad.STM (atomically)
-- > import Control.Concurrent (forkIO, threadDelay)
-- > import System.IO (hFlush, stdout)
-- > import qualified Control.Concurrent.STM.StatusCache as Cache
-- >
-- > main = do q <- atomically $ Cache.new (== '(') (==')')
-- > backlog q "(aaa(a))(bb)(cc(c)cc)"
-- > fast q "(aaa(a))(bb)(cc(c)cc)"
-- >
-- > while pred body = pred >>= flip when (body >> while pred body)
-- >
-- > backlog q xs = do putStr $ "Backlogged consumer: "
-- > mapM_ (atomically . Cache.push q) xs
-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
-- > c <- atomically $ Cache.pull q
-- > putChar c
-- > putStrLn ""
-- > hFlush stdout
-- >
-- > fast q xs = do putStr "Fast consumer: "
-- > forkIO $ forever $ do
-- > c <- atomically $ Cache.pull q
-- > putChar c
-- > hFlush stdout
-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
-- > xs
-- > putStrLn ""
--
-- As shown above, it is intended that this module be imported qualified.
--
{-# LANGUAGE DoAndIfThenElse #-}
module Control.Concurrent.STM.StatusCache
( StatusCache
, new
, push
, pull
, pullDepth
, isStopper
, isStarter
, isEmpty
) where
import Control.Monad
import Control.Monad.STM
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM.TChan
data StatusCache x =
StatusCache { feed :: TVar (TChan x)
, cache :: TVar (TChan x)
, feedFlag :: TVar Bool
, pushDepth :: TVar Int
, pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data
, isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
, isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
}
-- | @new isStart isStop@
--
-- The @isStart@ and @isStop@ predicates indicate when an element
-- begins or ends a message.
new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
new isStart isStop = do
feed <- newTChan >>= newTVar
cache <- newTChan >>= newTVar
flag <- newTVar True
pushd <- newTVar 0
pulld <- newTVar 0
return StatusCache { feed = feed
, cache = cache
, feedFlag = flag
, pushDepth = pushd
, pullDepth = pulld
, isStarter = isStart
, isStopper = isStop
}
-- | Pull off a chunk from the 'StatusCache' for processing.
--
-- If chunks are not pulled off quickly, they may be obsoleted
-- and discarded when new messages are 'push'ed.
pull :: StatusCache x -> STM x
pull q = do
hasCache <- readTVar (feedFlag q)
exhausted <- readTVar (feed q) >>= isEmptyTChan
when (hasCache && exhausted) $ do
next <- newTChan >>= swapTVar (cache q)
writeTVar (feedFlag q) False
writeTVar (feed q) next
chan <- readTVar $ feed q
exhausted <- isEmptyTChan chan
if exhausted then retry
else do
v <- readTChan chan
when (isStarter q v) $ do
depth <- readTVar (pullDepth q)
modifyTVar' (pullDepth q) (+1)
when (depth==0)
$ writeTVar (feedFlag q) False
when (isStopper q v)
$ modifyTVar' (pullDepth q) (subtract 1)
return v
-- | Enqueue a chunk into the 'StatusCache'.
push :: StatusCache a -> a -> STM ()
push q v = do
shouldCache <- readTVar (feedFlag q)
depth <- readTVar (pushDepth q)
when (isStopper q v) $ do
modifyTVar' (pushDepth q) (subtract 1)
when (depth==0)
$ writeTVar (feedFlag q) True
when (isStarter q v)
$ modifyTVar' (pushDepth q) (+1)
chan <-
if shouldCache then do
when (depth==0 && isStarter q v)
$ newTChan
>>= writeTVar (cache q)
readTVar $ cache q
else do
readTVar $ feed q
writeTChan chan v
-- | True when the 'StatusCache' is completely exhuasted.
isEmpty :: StatusCache x -> STM Bool
isEmpty q = do
empty_feed <- readTVar (feed q) >>= isEmptyTChan
empty_cache <- readTVar (cache q) >>= isEmptyTChan
return $ empty_feed && empty_cache
|