summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-10 22:38:46 -0500
committerjoe <joe@jerkface.net>2014-02-10 22:38:46 -0500
commit2d6dae13be15b61778ed35b3501df94a8e9dd78f (patch)
tree2ba13a1cc31886776614770a43ed18c60bfb39e3 /Control/Concurrent/STM
parentafff51ac877ce8807801334745f1679dbf6440d0 (diff)
more xmppServer work
Diffstat (limited to 'Control/Concurrent/STM')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs142
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs165
2 files changed, 307 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
new file mode 100644
index 00000000..601de14c
--- /dev/null
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -0,0 +1,142 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.StatusCache
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- Motivation: Suppose you must communicate state changes over a stream. If
10-- the stream is consumed slowly, then it may occur that backlogged state
11-- changes are obsoleted by newer changes in state. In this case, it is
12-- desirable to discard the obsolete messages from the stream.
13--
14-- A streamed status message might be very large, and it would be wasteful in
15-- both time and space, to treat it as a monolithic blob that must be built
16-- completely before it can be sent. Therefore, we require that each message
17-- consist of a stream of smaller chunks of type @x@ and we require predicates
18-- that indicate when a chunk starts a new message and when it ends a message.
19--
20-- In the folowing example, our chunk type is Char and complete messages are
21-- delimited by the characters '(' and ')'. We process the input stream
22-- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an
23-- efficient dedicated thread. The result follows:
24--
25-- > Backlogged consumer: (ccccc)
26-- > Fast consumer: (aaaa)(bb)(ccccc)
27--
28-- The complete source code:
29--
30-- > import Control.Monad (when, forever, (>=>))
31-- > import Control.Monad.STM (atomically)
32-- > import Control.Concurrent (forkIO, threadDelay)
33-- > import System.IO (hFlush, stdout)
34-- > import qualified Control.Concurrent.STM.StatusCache as Cache
35-- >
36-- > while pred body =
37-- > pred >>= flip when (body >> while pred body)
38-- >
39-- > main = do
40-- > q <- atomically $ Cache.new (== '(') (==')')
41-- >
42-- > putStr $ "Backlogged consumer: "
43-- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)"
44-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
45-- > c <- atomically $ Cache.pull q
46-- > putChar c
47-- > putStrLn ""
48-- > hFlush stdout
49-- >
50-- > putStr "Fast consumer: "
51-- > forkIO $ forever $ do
52-- > c <- atomically $ Cache.pull q
53-- > putChar c
54-- > hFlush stdout
55-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
56-- > "(aaaa)(bb)(ccccc)"
57-- > putStrLn ""
58--
59-- As shown above, it is intended that this module be imported qualified.
60--
61module Control.Concurrent.STM.StatusCache
62 ( StatusCache
63 , new
64 , push
65 , pull
66 , isStopper
67 , isStarter
68 , isEmpty
69 ) where
70import Control.Monad
71import Control.Monad.STM
72import Control.Concurrent.STM.TVar
73import Control.Concurrent.STM.TChan
74
75data StatusCache x =
76 StatusCache { feed :: TVar (TChan x)
77 , cache :: TVar (TChan x)
78 , feedFlag :: TVar Bool
79 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
80 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
81 }
82
83-- | @new isStart isStop@
84--
85-- The @isStart@ and @isStop@ predicates indicate when an element
86-- begins or ends a message.
87new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
88new isStart isStop = do
89 feed <- newTChan >>= newTVar
90 cache <- newTChan >>= newTVar
91 flag <- newTVar True
92 return StatusCache { feed = feed
93 , cache = cache
94 , feedFlag = flag
95 , isStarter = isStart
96 , isStopper = isStop
97 }
98
99
100-- | Pull off a chunk from the 'StatusCache' for processing.
101--
102-- If chunks are not pulled off quickly, they may be obsoleted
103-- and discarded when new messages are 'push'ed.
104pull :: StatusCache x -> STM x
105pull q = do
106 hasCache <- readTVar (feedFlag q)
107 exhausted <- readTVar (feed q) >>= isEmptyTChan
108 when (hasCache && exhausted) $ do
109 next <- newTChan >>= swapTVar (cache q)
110 writeTVar (feedFlag q) False
111 writeTVar (feed q) next
112 chan <- readTVar $ feed q
113 exhausted <- isEmptyTChan chan
114 if exhausted then retry
115 else do
116 v <- readTChan chan
117 when (isStarter q v)
118 $ writeTVar (feedFlag q) False
119 return v
120
121-- | Enqueue a chunk into the 'StatusCache'.
122push :: StatusCache a -> a -> STM ()
123push q v = do
124 shouldCache <- readTVar (feedFlag q)
125 chan <-
126 if shouldCache then do
127 when (isStarter q v)
128 $ newTChan
129 >>= writeTVar (cache q)
130 readTVar $ cache q
131 else do
132 when (isStopper q v)
133 $ writeTVar (feedFlag q) True
134 readTVar $ feed q
135 writeTChan chan v
136
137-- | True when the 'StatusCache' is completely exhuasted.
138isEmpty :: StatusCache x -> STM Bool
139isEmpty q = do
140 empty_feed <- readTVar (feed q) >>= isEmptyTChan
141 empty_cache <- readTVar (cache q) >>= isEmptyTChan
142 return $ empty_feed && empty_cache
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
new file mode 100644
index 00000000..a92168c0
--- /dev/null
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -0,0 +1,165 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.UpdateStream
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- An UpdateSream consists of pass-through messages that are queued up and
10-- slotted messages which might be obsoleted and discarded if they are not
11-- consumed before newer slotted messages with the same slot value occur.
12--
13-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
14-- recommended that you read that documentation first.
15--
16-- For example, the output
17--
18-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
19-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
20--
21-- is produced by the following code:
22--
23-- > import Control.Monad (when, forever, (>=>))
24-- > import Control.Monad.STM (atomically)
25-- > import Control.Concurrent (forkIO, threadDelay)
26-- > import System.IO (hFlush, stdout)
27-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
28-- >
29-- > messages :: [(Maybe String, Char)]
30-- > messages = concat
31-- > [ slot "x" "(set x = 2)"
32-- > , message "(Hello)"
33-- > , slot "y" "(set y = 23)"
34-- > , message "(Joe)"
35-- > , slot "y" "(set y = 100)"
36-- > , message "(was)"
37-- > , slot "x" "(set x = 5)"
38-- > , message "(here)"
39-- > ]
40-- > where
41-- > slot v cs = map ((,) (Just v)) cs
42-- > message cs = map ((,) Nothing) cs
43-- >
44-- > main = do
45-- > q <- atomically $ Cache.new (== '(') (==')')
46-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
47-- > >=> const (threadDelay 10000))
48-- > messages
49-- > slowly = do
50-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
51-- > c <- atomically $ Cache.pull q
52-- > putChar c
53-- > putStrLn ""
54-- > hFlush stdout
55-- > where while pred body =
56-- > pred >>= flip when (body >> while pred body)
57-- > quickly = forkIO . forever $ do
58-- > c <- atomically $ Cache.pull q
59-- > putChar c
60-- > hFlush stdout
61-- > putStr $ "Backlogged consumer: "
62-- > go >> slowly
63-- > putStr "Fast consumer: "
64-- > quickly >> go
65-- > putStrLn ""
66--
67module Control.Concurrent.STM.UpdateStream
68 ( UpdateStream
69 , new
70 , push
71 , pull
72 , isStopper
73 , isStarter
74 , isEmpty
75 ) where
76
77import Control.Monad
78import Control.Monad.STM
79import Control.Concurrent.STM.TVar
80import Control.Concurrent.STM.TChan
81import Control.Concurrent.STM.StatusCache (StatusCache)
82import qualified Control.Concurrent.STM.StatusCache as Status
83import Data.Map (Map)
84import qualified Data.Map as Map
85import Data.Maybe
86import Data.Foldable (foldlM)
87
88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x
91 , inMessage :: TVar (Maybe slot)
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 }
95
96-- | @new isStart isStop@
97--
98-- The @isStart@ and @isStop@ predicates indicate when an element
99-- begins or ends a message.
100new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
101new isStart isStop = do
102 cache <- newTVar Map.empty
103 events <- newTChan
104 inMessage <- newTVar Nothing
105 return UpdateStream { cache = cache
106 , events = events
107 , inMessage = inMessage
108 , isStarter = isStart
109 , isStopper = isStop
110 }
111
112-- | Enqueue a chunk into the 'UpdateStream'
113--
114-- If a slot is provided, then the message may be obsoleted when a new message
115-- starts with the same slot value. Otherwise, the chunk is preserved. Note
116-- that the same slot value must be passed again for every chunk of the message
117-- as it is not remembered.
118push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
119push u Nothing x = writeTChan (events u) x
120push u (Just n) x = do
121 smap <- readTVar (cache u)
122 scache <-
123 case Map.lookup n smap of
124 Just scache -> return scache
125 Nothing -> do
126 scache <- Status.new (isStarter u) (isStopper u)
127 modifyTVar (cache u) (Map.insert n scache)
128 return scache
129
130 Status.push scache x
131
132-- | Pull off a chunk from the 'UpdateStream' for processing.
133--
134-- If chunks are not pulled off quickly, they may be obsoleted
135-- and discarded when new messages are 'push'ed.
136pull :: Ord slot => UpdateStream slot x -> STM x
137pull u = do
138 (inm, mbscache) <- do
139 mn <- readTVar $ inMessage u
140 map <- readTVar (cache u)
141 return $ (isJust mn, mn >>= flip Map.lookup map)
142 let action =
143 case (inm,mbscache) of
144 (True,Just scache) -> Status.pull scache
145 (True,Nothing) -> readTChan (events u)
146 (False,_) -> do
147 cs <- fmap (Map.toList) $ readTVar (cache u)
148 cs <- filterM (return . not <=< Status.isEmpty . snd)
149 cs
150 maybe (readTChan $ events u)
151 (\(n,scache) -> do
152 writeTVar (inMessage u) (Just n)
153 Status.pull scache)
154 (listToMaybe cs)
155 x <- action
156 when (isStopper u x) $ writeTVar (inMessage u) Nothing
157 return x
158
159-- | True when the 'UpdateStream' is completely exhuasted.
160isEmpty :: UpdateStream slot x -> STM Bool
161isEmpty q = do
162 e <- isEmptyTChan (events q)
163 qs <- readTVar (cache q)
164 d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
165 return $ e && d