summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM/UpdateStream.hs
diff options
context:
space:
mode:
Diffstat (limited to 'Control/Concurrent/STM/UpdateStream.hs')
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs165
1 files changed, 165 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
new file mode 100644
index 00000000..a92168c0
--- /dev/null
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -0,0 +1,165 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.UpdateStream
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- An UpdateSream consists of pass-through messages that are queued up and
10-- slotted messages which might be obsoleted and discarded if they are not
11-- consumed before newer slotted messages with the same slot value occur.
12--
13-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
14-- recommended that you read that documentation first.
15--
16-- For example, the output
17--
18-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
19-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
20--
21-- is produced by the following code:
22--
23-- > import Control.Monad (when, forever, (>=>))
24-- > import Control.Monad.STM (atomically)
25-- > import Control.Concurrent (forkIO, threadDelay)
26-- > import System.IO (hFlush, stdout)
27-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
28-- >
29-- > messages :: [(Maybe String, Char)]
30-- > messages = concat
31-- > [ slot "x" "(set x = 2)"
32-- > , message "(Hello)"
33-- > , slot "y" "(set y = 23)"
34-- > , message "(Joe)"
35-- > , slot "y" "(set y = 100)"
36-- > , message "(was)"
37-- > , slot "x" "(set x = 5)"
38-- > , message "(here)"
39-- > ]
40-- > where
41-- > slot v cs = map ((,) (Just v)) cs
42-- > message cs = map ((,) Nothing) cs
43-- >
44-- > main = do
45-- > q <- atomically $ Cache.new (== '(') (==')')
46-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
47-- > >=> const (threadDelay 10000))
48-- > messages
49-- > slowly = do
50-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
51-- > c <- atomically $ Cache.pull q
52-- > putChar c
53-- > putStrLn ""
54-- > hFlush stdout
55-- > where while pred body =
56-- > pred >>= flip when (body >> while pred body)
57-- > quickly = forkIO . forever $ do
58-- > c <- atomically $ Cache.pull q
59-- > putChar c
60-- > hFlush stdout
61-- > putStr $ "Backlogged consumer: "
62-- > go >> slowly
63-- > putStr "Fast consumer: "
64-- > quickly >> go
65-- > putStrLn ""
66--
67module Control.Concurrent.STM.UpdateStream
68 ( UpdateStream
69 , new
70 , push
71 , pull
72 , isStopper
73 , isStarter
74 , isEmpty
75 ) where
76
77import Control.Monad
78import Control.Monad.STM
79import Control.Concurrent.STM.TVar
80import Control.Concurrent.STM.TChan
81import Control.Concurrent.STM.StatusCache (StatusCache)
82import qualified Control.Concurrent.STM.StatusCache as Status
83import Data.Map (Map)
84import qualified Data.Map as Map
85import Data.Maybe
86import Data.Foldable (foldlM)
87
88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x
91 , inMessage :: TVar (Maybe slot)
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 }
95
96-- | @new isStart isStop@
97--
98-- The @isStart@ and @isStop@ predicates indicate when an element
99-- begins or ends a message.
100new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
101new isStart isStop = do
102 cache <- newTVar Map.empty
103 events <- newTChan
104 inMessage <- newTVar Nothing
105 return UpdateStream { cache = cache
106 , events = events
107 , inMessage = inMessage
108 , isStarter = isStart
109 , isStopper = isStop
110 }
111
112-- | Enqueue a chunk into the 'UpdateStream'
113--
114-- If a slot is provided, then the message may be obsoleted when a new message
115-- starts with the same slot value. Otherwise, the chunk is preserved. Note
116-- that the same slot value must be passed again for every chunk of the message
117-- as it is not remembered.
118push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
119push u Nothing x = writeTChan (events u) x
120push u (Just n) x = do
121 smap <- readTVar (cache u)
122 scache <-
123 case Map.lookup n smap of
124 Just scache -> return scache
125 Nothing -> do
126 scache <- Status.new (isStarter u) (isStopper u)
127 modifyTVar (cache u) (Map.insert n scache)
128 return scache
129
130 Status.push scache x
131
132-- | Pull off a chunk from the 'UpdateStream' for processing.
133--
134-- If chunks are not pulled off quickly, they may be obsoleted
135-- and discarded when new messages are 'push'ed.
136pull :: Ord slot => UpdateStream slot x -> STM x
137pull u = do
138 (inm, mbscache) <- do
139 mn <- readTVar $ inMessage u
140 map <- readTVar (cache u)
141 return $ (isJust mn, mn >>= flip Map.lookup map)
142 let action =
143 case (inm,mbscache) of
144 (True,Just scache) -> Status.pull scache
145 (True,Nothing) -> readTChan (events u)
146 (False,_) -> do
147 cs <- fmap (Map.toList) $ readTVar (cache u)
148 cs <- filterM (return . not <=< Status.isEmpty . snd)
149 cs
150 maybe (readTChan $ events u)
151 (\(n,scache) -> do
152 writeTVar (inMessage u) (Just n)
153 Status.pull scache)
154 (listToMaybe cs)
155 x <- action
156 when (isStopper u x) $ writeTVar (inMessage u) Nothing
157 return x
158
159-- | True when the 'UpdateStream' is completely exhuasted.
160isEmpty :: UpdateStream slot x -> STM Bool
161isEmpty q = do
162 e <- isEmptyTChan (events q)
163 qs <- readTVar (cache q)
164 d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
165 return $ e && d