summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM/UpdateStream.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Control/Concurrent/STM/UpdateStream.hs')
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs169
1 files changed, 169 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
new file mode 100644
index 00000000..b01fc0bc
--- /dev/null
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -0,0 +1,169 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.UpdateStream
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- An UpdateSream consists of pass-through messages that are queued up and
10-- slotted messages which might be obsoleted and discarded if they are not
11-- consumed before newer slotted messages with the same slot value occur.
12--
13-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
14-- recommended that you read that documentation first.
15--
16-- For example, the output
17--
18-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
19-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
20--
21-- is produced by the following code:
22--
23-- > import Control.Monad (when, forever, (>=>))
24-- > import Control.Monad.STM (atomically)
25-- > import Control.Concurrent (forkIO, threadDelay)
26-- > import System.IO (hFlush, stdout)
27-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
28-- >
29-- > messages :: [(Maybe String, Char)]
30-- > messages = concat
31-- > [ slot "x" "(set x = 2)"
32-- > , message "(Hello)"
33-- > , slot "y" "(set y = 23)"
34-- > , message "(Joe)"
35-- > , slot "y" "(set y = 100)"
36-- > , message "(was)"
37-- > , slot "x" "(set x = 5)"
38-- > , message "(here)"
39-- > ]
40-- > where
41-- > slot v cs = map ((,) (Just v)) cs
42-- > message cs = map ((,) Nothing) cs
43-- >
44-- > main = do
45-- > q <- atomically $ Cache.new (== '(') (==')')
46-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
47-- > >=> const (threadDelay 10000))
48-- > messages
49-- > slowly = do
50-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
51-- > c <- atomically $ Cache.pull q
52-- > putChar c
53-- > putStrLn ""
54-- > hFlush stdout
55-- > where while pred body =
56-- > pred >>= flip when (body >> while pred body)
57-- > quickly = forkIO . forever $ do
58-- > c <- atomically $ Cache.pull q
59-- > putChar c
60-- > hFlush stdout
61-- > putStr $ "Backlogged consumer: "
62-- > go >> slowly
63-- > putStr "Fast consumer: "
64-- > quickly >> go
65-- > putStrLn ""
66--
67module Control.Concurrent.STM.UpdateStream
68 ( UpdateStream
69 , new
70 , push
71 , pull
72 , isStopper
73 , isStarter
74 , isEmpty
75 ) where
76
77import Control.Monad
78import Control.Monad.STM
79import Control.Concurrent.STM.TVar
80import Control.Concurrent.STM.TChan
81import Control.Concurrent.STM.StatusCache (StatusCache)
82import qualified Control.Concurrent.STM.StatusCache as Status
83import Data.Map (Map)
84import qualified Data.Map as Map
85import Data.Maybe
86import Data.Foldable (foldlM)
87
88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x
91 , inMessage :: TVar (Maybe (StatusCache x))
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 }
95
96-- | @new isStart isStop@
97--
98-- The @isStart@ and @isStop@ predicates indicate when an element
99-- begins or ends a message.
100new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
101new isStart isStop = do
102 cache <- newTVar Map.empty
103 events <- newTChan
104 inMessage <- newTVar Nothing
105 return UpdateStream { cache = cache
106 , events = events
107 , inMessage = inMessage
108 , isStarter = isStart
109 , isStopper = isStop
110 }
111
112-- | Enqueue a chunk into the 'UpdateStream'
113--
114-- If a slot is provided, then the message may be obsoleted when a new message
115-- starts with the same slot value. Otherwise, the chunk is preserved. Note
116-- that the same slot value must be passed again for every chunk of the message
117-- as it is not remembered.
118--
119-- Note that although a slot value is provided on each push, it is assumed that
120-- messages will be pushed in contiguous streams. To change this, we would
121-- need to add a mutable integer to track the nesting level of the primary
122-- (non-slotted) stream.
123push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
124push u Nothing x = writeTChan (events u) x
125push u (Just n) x = do
126 smap <- readTVar (cache u)
127 scache <-
128 case Map.lookup n smap of
129 Just scache -> return scache
130 Nothing -> do
131 scache <- Status.new (isStarter u) (isStopper u)
132 modifyTVar (cache u) (Map.insert n scache)
133 return scache
134
135 Status.push scache x
136
137-- | Pull off a chunk from the 'UpdateStream' for processing.
138--
139-- If chunks are not pulled off quickly, they may be obsoleted
140-- and discarded when new messages are 'push'ed.
141pull :: Ord slot => UpdateStream slot x -> STM x
142pull u = do
143 mbscache <- readTVar $ inMessage u
144 let action =
145 case mbscache of
146 Just scache -> do
147 x <- Status.pull scache
148 nesting <- readTVar (Status.pullDepth scache)
149 when (nesting==0) $ writeTVar (inMessage u) Nothing
150 return x
151 Nothing -> do
152 cs <- fmap (Map.toList) $ readTVar (cache u)
153 cs <- filterM (return . not <=< Status.isEmpty . snd)
154 cs
155 maybe (readTChan $ events u)
156 (\(n,scache) -> do
157 writeTVar (inMessage u) (Just scache) -- (Just n)
158 Status.pull scache)
159 (listToMaybe cs)
160 x <- action
161 return x
162
163-- | True when the 'UpdateStream' is completely exhuasted.
164isEmpty :: UpdateStream slot x -> STM Bool
165isEmpty q = do
166 e <- isEmptyTChan (events q)
167 qs <- readTVar (cache q)
168 d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
169 return $ e && d