1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
|
---------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent.STM.UpdateStream
--
--
-- Maintainer : joe@jerkface.net
-- Stability : experimental
--
-- An UpdateStream consists of pass-through messages that are queued up and
-- slotted messages which might be obsoleted and discarded if they are not
-- consumed before newer slotted messages with the same slot value occur.
--
-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
-- recommended that you read that documentation first.
--
-- For example, the output
--
-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
--
-- is produced by the following code:
--
-- > import Control.Monad (when, forever, (>=>))
-- > import Control.Monad.STM (atomically)
-- > import Control.Concurrent (forkIO, threadDelay)
-- > import System.IO (hFlush, stdout)
-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
-- >
-- > messages :: [(Maybe String, Char)]
-- > messages = concat
-- > [ slot "x" "(set x = 2)"
-- > , message "(Hello)"
-- > , slot "y" "(set y = 23)"
-- > , message "(Joe)"
-- > , slot "y" "(set y = 100)"
-- > , message "(was)"
-- > , slot "x" "(set x = 5)"
-- > , message "(here)"
-- > ]
-- > where
-- > slot v cs = map ((,) (Just v)) cs
-- > message cs = map ((,) Nothing) cs
-- >
-- > main = do
-- > q <- atomically $ Cache.new (== '(') (==')')
-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
-- > >=> const (threadDelay 10000))
-- > messages
-- > slowly = do
-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
-- > c <- atomically $ Cache.pull q
-- > putChar c
-- > putStrLn ""
-- > hFlush stdout
-- > where while pred body =
-- > pred >>= flip when (body >> while pred body)
-- > quickly = forkIO . forever $ do
-- > c <- atomically $ Cache.pull q
-- > putChar c
-- > hFlush stdout
-- > putStr $ "Backlogged consumer: "
-- > go >> slowly
-- > putStr "Fast consumer: "
-- > quickly >> go
-- > putStrLn ""
--
module Control.Concurrent.STM.UpdateStream
( UpdateStream
, new
, push
, pull
, isStopper
, isStarter
, isEmpty
) where
import Control.Monad
import Control.Monad.STM
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.StatusCache (StatusCache)
import qualified Control.Concurrent.STM.StatusCache as Status
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe
import Data.Foldable (foldlM)
data UpdateStream slot x =
UpdateStream { cache :: TVar (Map slot (StatusCache x))
, events :: TChan x
, inMessage :: TVar (Maybe (StatusCache x))
, isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
, isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
}
-- | @new isStart isStop@
--
-- The @isStart@ and @isStop@ predicates indicate when an element
-- begins or ends a message.
new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
new isStart isStop = do
cache <- newTVar Map.empty
events <- newTChan
inMessage <- newTVar Nothing
return UpdateStream { cache = cache
, events = events
, inMessage = inMessage
, isStarter = isStart
, isStopper = isStop
}
-- | Enqueue a chunk into the 'UpdateStream'
--
-- If a slot is provided, then the message may be obsoleted when a new message
-- starts with the same slot value. Otherwise, the chunk is preserved. Note
-- that the same slot value must be passed again for every chunk of the message
-- as it is not remembered.
--
-- Note that although a slot value is provided on each push, it is assumed that
-- messages will be pushed in contiguous streams. To change this, we would
-- need to add a mutable integer to track the nesting level of the primary
-- (non-slotted) stream.
push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
push u Nothing x = writeTChan (events u) x
push u (Just n) x = do
smap <- readTVar (cache u)
scache <-
case Map.lookup n smap of
Just scache -> return scache
Nothing -> do
scache <- Status.new (isStarter u) (isStopper u)
modifyTVar (cache u) (Map.insert n scache)
return scache
Status.push scache x
-- | Pull off a chunk from the 'UpdateStream' for processing.
--
-- If chunks are not pulled off quickly, they may be obsoleted
-- and discarded when new messages are 'push'ed.
pull :: Ord slot => UpdateStream slot x -> STM x
pull u = do
mbscache <- readTVar $ inMessage u
let action =
case mbscache of
Just scache -> do
x <- Status.pull scache
nesting <- readTVar (Status.pullDepth scache)
when (nesting==0) $ writeTVar (inMessage u) Nothing
return x
Nothing -> do
cs <- fmap (Map.toList) $ readTVar (cache u)
cs <- filterM (return . not <=< Status.isEmpty . snd)
cs
maybe (readTChan $ events u)
(\(n,scache) -> do
writeTVar (inMessage u) (Just scache) -- (Just n)
Status.pull scache)
(listToMaybe cs)
x <- action
return x
-- | True when the 'UpdateStream' is completely exhuasted.
isEmpty :: UpdateStream slot x -> STM Bool
isEmpty q = do
e <- isEmptyTChan (events q)
qs <- readTVar (cache q)
d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
return $ e && d
|