summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM/StatusCache.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-10 22:38:46 -0500
committerjoe <joe@jerkface.net>2014-02-10 22:38:46 -0500
commit2d6dae13be15b61778ed35b3501df94a8e9dd78f (patch)
tree2ba13a1cc31886776614770a43ed18c60bfb39e3 /Control/Concurrent/STM/StatusCache.hs
parentafff51ac877ce8807801334745f1679dbf6440d0 (diff)
more xmppServer work
Diffstat (limited to 'Control/Concurrent/STM/StatusCache.hs')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs142
1 files changed, 142 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
new file mode 100644
index 00000000..601de14c
--- /dev/null
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -0,0 +1,142 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.StatusCache
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- Motivation: Suppose you must communicate state changes over a stream. If
10-- the stream is consumed slowly, then it may occur that backlogged state
11-- changes are obsoleted by newer changes in state. In this case, it is
12-- desirable to discard the obsolete messages from the stream.
13--
14-- A streamed status message might be very large, and it would be wasteful in
15-- both time and space, to treat it as a monolithic blob that must be built
16-- completely before it can be sent. Therefore, we require that each message
17-- consist of a stream of smaller chunks of type @x@ and we require predicates
18-- that indicate when a chunk starts a new message and when it ends a message.
19--
20-- In the folowing example, our chunk type is Char and complete messages are
21-- delimited by the characters '(' and ')'. We process the input stream
22-- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an
23-- efficient dedicated thread. The result follows:
24--
25-- > Backlogged consumer: (ccccc)
26-- > Fast consumer: (aaaa)(bb)(ccccc)
27--
28-- The complete source code:
29--
30-- > import Control.Monad (when, forever, (>=>))
31-- > import Control.Monad.STM (atomically)
32-- > import Control.Concurrent (forkIO, threadDelay)
33-- > import System.IO (hFlush, stdout)
34-- > import qualified Control.Concurrent.STM.StatusCache as Cache
35-- >
36-- > while pred body =
37-- > pred >>= flip when (body >> while pred body)
38-- >
39-- > main = do
40-- > q <- atomically $ Cache.new (== '(') (==')')
41-- >
42-- > putStr $ "Backlogged consumer: "
43-- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)"
44-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
45-- > c <- atomically $ Cache.pull q
46-- > putChar c
47-- > putStrLn ""
48-- > hFlush stdout
49-- >
50-- > putStr "Fast consumer: "
51-- > forkIO $ forever $ do
52-- > c <- atomically $ Cache.pull q
53-- > putChar c
54-- > hFlush stdout
55-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
56-- > "(aaaa)(bb)(ccccc)"
57-- > putStrLn ""
58--
59-- As shown above, it is intended that this module be imported qualified.
60--
61module Control.Concurrent.STM.StatusCache
62 ( StatusCache
63 , new
64 , push
65 , pull
66 , isStopper
67 , isStarter
68 , isEmpty
69 ) where
70import Control.Monad
71import Control.Monad.STM
72import Control.Concurrent.STM.TVar
73import Control.Concurrent.STM.TChan
74
75data StatusCache x =
76 StatusCache { feed :: TVar (TChan x)
77 , cache :: TVar (TChan x)
78 , feedFlag :: TVar Bool
79 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
80 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
81 }
82
83-- | @new isStart isStop@
84--
85-- The @isStart@ and @isStop@ predicates indicate when an element
86-- begins or ends a message.
87new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
88new isStart isStop = do
89 feed <- newTChan >>= newTVar
90 cache <- newTChan >>= newTVar
91 flag <- newTVar True
92 return StatusCache { feed = feed
93 , cache = cache
94 , feedFlag = flag
95 , isStarter = isStart
96 , isStopper = isStop
97 }
98
99
100-- | Pull off a chunk from the 'StatusCache' for processing.
101--
102-- If chunks are not pulled off quickly, they may be obsoleted
103-- and discarded when new messages are 'push'ed.
104pull :: StatusCache x -> STM x
105pull q = do
106 hasCache <- readTVar (feedFlag q)
107 exhausted <- readTVar (feed q) >>= isEmptyTChan
108 when (hasCache && exhausted) $ do
109 next <- newTChan >>= swapTVar (cache q)
110 writeTVar (feedFlag q) False
111 writeTVar (feed q) next
112 chan <- readTVar $ feed q
113 exhausted <- isEmptyTChan chan
114 if exhausted then retry
115 else do
116 v <- readTChan chan
117 when (isStarter q v)
118 $ writeTVar (feedFlag q) False
119 return v
120
121-- | Enqueue a chunk into the 'StatusCache'.
122push :: StatusCache a -> a -> STM ()
123push q v = do
124 shouldCache <- readTVar (feedFlag q)
125 chan <-
126 if shouldCache then do
127 when (isStarter q v)
128 $ newTChan
129 >>= writeTVar (cache q)
130 readTVar $ cache q
131 else do
132 when (isStopper q v)
133 $ writeTVar (feedFlag q) True
134 readTVar $ feed q
135 writeTChan chan v
136
137-- | True when the 'StatusCache' is completely exhuasted.
138isEmpty :: StatusCache x -> STM Bool
139isEmpty q = do
140 empty_feed <- readTVar (feed q) >>= isEmptyTChan
141 empty_cache <- readTVar (cache q) >>= isEmptyTChan
142 return $ empty_feed && empty_cache