diff options
Diffstat (limited to 'Control')
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 160 | ||||
-rw-r--r-- | Control/Concurrent/STM/UpdateStream.hs | 169 |
2 files changed, 329 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs new file mode 100644 index 00000000..d1c977ae --- /dev/null +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -0,0 +1,160 @@ | |||
1 | --------------------------------------------------------------------------- | ||
2 | -- | | ||
3 | -- Module : Control.Concurrent.STM.StatusCache | ||
4 | -- | ||
5 | -- | ||
6 | -- Maintainer : joe@jerkface.net | ||
7 | -- Stability : experimental | ||
8 | -- | ||
9 | -- Motivation: Suppose you must communicate state changes over a stream. If | ||
10 | -- the stream is consumed slowly, then it may occur that backlogged state | ||
11 | -- changes are obsoleted by newer changes in state. In this case, it is | ||
12 | -- desirable to discard the obsolete messages from the stream. | ||
13 | -- | ||
14 | -- A streamed status message might be very large, and it would be wasteful in | ||
15 | -- both time and space, to treat it as a monolithic blob that must be built | ||
16 | -- completely before it can be sent. Therefore, we require that each message | ||
17 | -- consist of a stream of smaller chunks of type @x@ and we require predicates | ||
18 | -- that indicate when a chunk starts a new message and when it ends a message. | ||
19 | -- | ||
20 | -- In the folowing example, our chunk type is Char and complete messages are | ||
21 | -- delimited by the characters '(' and ')'. We process the input stream | ||
22 | -- \"(aaa(a))(bb)(cc(c)cc)\" first with a delayed processor and then again with | ||
23 | -- an efficient dedicated thread. The result follows: | ||
24 | -- | ||
25 | -- > Backlogged consumer: (cc(c)cc) | ||
26 | -- > Fast consumer: (aaa(a))(bb)(cc(c)cc) | ||
27 | -- | ||
28 | -- The complete source code: | ||
29 | -- | ||
30 | -- > import Control.Monad (when, forever, (>=>)) | ||
31 | -- > import Control.Monad.STM (atomically) | ||
32 | -- > import Control.Concurrent (forkIO, threadDelay) | ||
33 | -- > import System.IO (hFlush, stdout) | ||
34 | -- > import qualified Control.Concurrent.STM.StatusCache as Cache | ||
35 | -- > | ||
36 | -- > main = do q <- atomically $ Cache.new (== '(') (==')') | ||
37 | -- > backlog q "(aaa(a))(bb)(cc(c)cc)" | ||
38 | -- > fast q "(aaa(a))(bb)(cc(c)cc)" | ||
39 | -- > | ||
40 | -- > while pred body = pred >>= flip when (body >> while pred body) | ||
41 | -- > | ||
42 | -- > backlog q xs = do putStr $ "Backlogged consumer: " | ||
43 | -- > mapM_ (atomically . Cache.push q) xs | ||
44 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | ||
45 | -- > c <- atomically $ Cache.pull q | ||
46 | -- > putChar c | ||
47 | -- > putStrLn "" | ||
48 | -- > hFlush stdout | ||
49 | -- > | ||
50 | -- > fast q xs = do putStr "Fast consumer: " | ||
51 | -- > forkIO $ forever $ do | ||
52 | -- > c <- atomically $ Cache.pull q | ||
53 | -- > putChar c | ||
54 | -- > hFlush stdout | ||
55 | -- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) | ||
56 | -- > xs | ||
57 | -- > putStrLn "" | ||
58 | -- | ||
59 | -- As shown above, it is intended that this module be imported qualified. | ||
60 | -- | ||
61 | {-# LANGUAGE DoAndIfThenElse #-} | ||
62 | module Control.Concurrent.STM.StatusCache | ||
63 | ( StatusCache | ||
64 | , new | ||
65 | , push | ||
66 | , pull | ||
67 | , pullDepth | ||
68 | , isStopper | ||
69 | , isStarter | ||
70 | , isEmpty | ||
71 | ) where | ||
72 | import Control.Monad | ||
73 | import Control.Monad.STM | ||
74 | import Control.Concurrent.STM.TVar | ||
75 | import Control.Concurrent.STM.TChan | ||
76 | |||
77 | data StatusCache x = | ||
78 | StatusCache { feed :: TVar (TChan x) | ||
79 | , cache :: TVar (TChan x) | ||
80 | , feedFlag :: TVar Bool | ||
81 | , pushDepth :: TVar Int | ||
82 | , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data | ||
83 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | ||
84 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | ||
85 | } | ||
86 | |||
87 | -- | @new isStart isStop@ | ||
88 | -- | ||
89 | -- The @isStart@ and @isStop@ predicates indicate when an element | ||
90 | -- begins or ends a message. | ||
91 | new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) | ||
92 | new isStart isStop = do | ||
93 | feed <- newTChan >>= newTVar | ||
94 | cache <- newTChan >>= newTVar | ||
95 | flag <- newTVar True | ||
96 | pushd <- newTVar 0 | ||
97 | pulld <- newTVar 0 | ||
98 | return StatusCache { feed = feed | ||
99 | , cache = cache | ||
100 | , feedFlag = flag | ||
101 | , pushDepth = pushd | ||
102 | , pullDepth = pulld | ||
103 | , isStarter = isStart | ||
104 | , isStopper = isStop | ||
105 | } | ||
106 | |||
107 | |||
108 | -- | Pull off a chunk from the 'StatusCache' for processing. | ||
109 | -- | ||
110 | -- If chunks are not pulled off quickly, they may be obsoleted | ||
111 | -- and discarded when new messages are 'push'ed. | ||
112 | pull :: StatusCache x -> STM x | ||
113 | pull q = do | ||
114 | hasCache <- readTVar (feedFlag q) | ||
115 | exhausted <- readTVar (feed q) >>= isEmptyTChan | ||
116 | when (hasCache && exhausted) $ do | ||
117 | next <- newTChan >>= swapTVar (cache q) | ||
118 | writeTVar (feedFlag q) False | ||
119 | writeTVar (feed q) next | ||
120 | chan <- readTVar $ feed q | ||
121 | exhausted <- isEmptyTChan chan | ||
122 | if exhausted then retry | ||
123 | else do | ||
124 | v <- readTChan chan | ||
125 | when (isStarter q v) $ do | ||
126 | depth <- readTVar (pullDepth q) | ||
127 | modifyTVar' (pullDepth q) (+1) | ||
128 | when (depth==0) | ||
129 | $ writeTVar (feedFlag q) False | ||
130 | when (isStopper q v) | ||
131 | $ modifyTVar' (pullDepth q) (subtract 1) | ||
132 | return v | ||
133 | |||
134 | -- | Enqueue a chunk into the 'StatusCache'. | ||
135 | push :: StatusCache a -> a -> STM () | ||
136 | push q v = do | ||
137 | shouldCache <- readTVar (feedFlag q) | ||
138 | depth <- readTVar (pushDepth q) | ||
139 | when (isStopper q v) $ do | ||
140 | modifyTVar' (pushDepth q) (subtract 1) | ||
141 | when (depth==0) | ||
142 | $ writeTVar (feedFlag q) True | ||
143 | when (isStarter q v) | ||
144 | $ modifyTVar' (pushDepth q) (+1) | ||
145 | chan <- | ||
146 | if shouldCache then do | ||
147 | when (depth==0 && isStarter q v) | ||
148 | $ newTChan | ||
149 | >>= writeTVar (cache q) | ||
150 | readTVar $ cache q | ||
151 | else do | ||
152 | readTVar $ feed q | ||
153 | writeTChan chan v | ||
154 | |||
155 | -- | True when the 'StatusCache' is completely exhuasted. | ||
156 | isEmpty :: StatusCache x -> STM Bool | ||
157 | isEmpty q = do | ||
158 | empty_feed <- readTVar (feed q) >>= isEmptyTChan | ||
159 | empty_cache <- readTVar (cache q) >>= isEmptyTChan | ||
160 | return $ empty_feed && empty_cache | ||
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs new file mode 100644 index 00000000..b01fc0bc --- /dev/null +++ b/Control/Concurrent/STM/UpdateStream.hs | |||
@@ -0,0 +1,169 @@ | |||
1 | --------------------------------------------------------------------------- | ||
2 | -- | | ||
3 | -- Module : Control.Concurrent.STM.UpdateStream | ||
4 | -- | ||
5 | -- | ||
6 | -- Maintainer : joe@jerkface.net | ||
7 | -- Stability : experimental | ||
8 | -- | ||
9 | -- An UpdateSream consists of pass-through messages that are queued up and | ||
10 | -- slotted messages which might be obsoleted and discarded if they are not | ||
11 | -- consumed before newer slotted messages with the same slot value occur. | ||
12 | -- | ||
13 | -- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is | ||
14 | -- recommended that you read that documentation first. | ||
15 | -- | ||
16 | -- For example, the output | ||
17 | -- | ||
18 | -- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) | ||
19 | -- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) | ||
20 | -- | ||
21 | -- is produced by the following code: | ||
22 | -- | ||
23 | -- > import Control.Monad (when, forever, (>=>)) | ||
24 | -- > import Control.Monad.STM (atomically) | ||
25 | -- > import Control.Concurrent (forkIO, threadDelay) | ||
26 | -- > import System.IO (hFlush, stdout) | ||
27 | -- > import qualified Control.Concurrent.STM.UpdateStream as Cache | ||
28 | -- > | ||
29 | -- > messages :: [(Maybe String, Char)] | ||
30 | -- > messages = concat | ||
31 | -- > [ slot "x" "(set x = 2)" | ||
32 | -- > , message "(Hello)" | ||
33 | -- > , slot "y" "(set y = 23)" | ||
34 | -- > , message "(Joe)" | ||
35 | -- > , slot "y" "(set y = 100)" | ||
36 | -- > , message "(was)" | ||
37 | -- > , slot "x" "(set x = 5)" | ||
38 | -- > , message "(here)" | ||
39 | -- > ] | ||
40 | -- > where | ||
41 | -- > slot v cs = map ((,) (Just v)) cs | ||
42 | -- > message cs = map ((,) Nothing) cs | ||
43 | -- > | ||
44 | -- > main = do | ||
45 | -- > q <- atomically $ Cache.new (== '(') (==')') | ||
46 | -- > let go = mapM_ (atomically . (uncurry $ Cache.push q) | ||
47 | -- > >=> const (threadDelay 10000)) | ||
48 | -- > messages | ||
49 | -- > slowly = do | ||
50 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | ||
51 | -- > c <- atomically $ Cache.pull q | ||
52 | -- > putChar c | ||
53 | -- > putStrLn "" | ||
54 | -- > hFlush stdout | ||
55 | -- > where while pred body = | ||
56 | -- > pred >>= flip when (body >> while pred body) | ||
57 | -- > quickly = forkIO . forever $ do | ||
58 | -- > c <- atomically $ Cache.pull q | ||
59 | -- > putChar c | ||
60 | -- > hFlush stdout | ||
61 | -- > putStr $ "Backlogged consumer: " | ||
62 | -- > go >> slowly | ||
63 | -- > putStr "Fast consumer: " | ||
64 | -- > quickly >> go | ||
65 | -- > putStrLn "" | ||
66 | -- | ||
67 | module Control.Concurrent.STM.UpdateStream | ||
68 | ( UpdateStream | ||
69 | , new | ||
70 | , push | ||
71 | , pull | ||
72 | , isStopper | ||
73 | , isStarter | ||
74 | , isEmpty | ||
75 | ) where | ||
76 | |||
77 | import Control.Monad | ||
78 | import Control.Monad.STM | ||
79 | import Control.Concurrent.STM.TVar | ||
80 | import Control.Concurrent.STM.TChan | ||
81 | import Control.Concurrent.STM.StatusCache (StatusCache) | ||
82 | import qualified Control.Concurrent.STM.StatusCache as Status | ||
83 | import Data.Map (Map) | ||
84 | import qualified Data.Map as Map | ||
85 | import Data.Maybe | ||
86 | import Data.Foldable (foldlM) | ||
87 | |||
88 | data UpdateStream slot x = | ||
89 | UpdateStream { cache :: TVar (Map slot (StatusCache x)) | ||
90 | , events :: TChan x | ||
91 | , inMessage :: TVar (Maybe (StatusCache x)) | ||
92 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | ||
93 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | ||
94 | } | ||
95 | |||
96 | -- | @new isStart isStop@ | ||
97 | -- | ||
98 | -- The @isStart@ and @isStop@ predicates indicate when an element | ||
99 | -- begins or ends a message. | ||
100 | new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) | ||
101 | new isStart isStop = do | ||
102 | cache <- newTVar Map.empty | ||
103 | events <- newTChan | ||
104 | inMessage <- newTVar Nothing | ||
105 | return UpdateStream { cache = cache | ||
106 | , events = events | ||
107 | , inMessage = inMessage | ||
108 | , isStarter = isStart | ||
109 | , isStopper = isStop | ||
110 | } | ||
111 | |||
112 | -- | Enqueue a chunk into the 'UpdateStream' | ||
113 | -- | ||
114 | -- If a slot is provided, then the message may be obsoleted when a new message | ||
115 | -- starts with the same slot value. Otherwise, the chunk is preserved. Note | ||
116 | -- that the same slot value must be passed again for every chunk of the message | ||
117 | -- as it is not remembered. | ||
118 | -- | ||
119 | -- Note that although a slot value is provided on each push, it is assumed that | ||
120 | -- messages will be pushed in contiguous streams. To change this, we would | ||
121 | -- need to add a mutable integer to track the nesting level of the primary | ||
122 | -- (non-slotted) stream. | ||
123 | push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () | ||
124 | push u Nothing x = writeTChan (events u) x | ||
125 | push u (Just n) x = do | ||
126 | smap <- readTVar (cache u) | ||
127 | scache <- | ||
128 | case Map.lookup n smap of | ||
129 | Just scache -> return scache | ||
130 | Nothing -> do | ||
131 | scache <- Status.new (isStarter u) (isStopper u) | ||
132 | modifyTVar (cache u) (Map.insert n scache) | ||
133 | return scache | ||
134 | |||
135 | Status.push scache x | ||
136 | |||
137 | -- | Pull off a chunk from the 'UpdateStream' for processing. | ||
138 | -- | ||
139 | -- If chunks are not pulled off quickly, they may be obsoleted | ||
140 | -- and discarded when new messages are 'push'ed. | ||
141 | pull :: Ord slot => UpdateStream slot x -> STM x | ||
142 | pull u = do | ||
143 | mbscache <- readTVar $ inMessage u | ||
144 | let action = | ||
145 | case mbscache of | ||
146 | Just scache -> do | ||
147 | x <- Status.pull scache | ||
148 | nesting <- readTVar (Status.pullDepth scache) | ||
149 | when (nesting==0) $ writeTVar (inMessage u) Nothing | ||
150 | return x | ||
151 | Nothing -> do | ||
152 | cs <- fmap (Map.toList) $ readTVar (cache u) | ||
153 | cs <- filterM (return . not <=< Status.isEmpty . snd) | ||
154 | cs | ||
155 | maybe (readTChan $ events u) | ||
156 | (\(n,scache) -> do | ||
157 | writeTVar (inMessage u) (Just scache) -- (Just n) | ||
158 | Status.pull scache) | ||
159 | (listToMaybe cs) | ||
160 | x <- action | ||
161 | return x | ||
162 | |||
163 | -- | True when the 'UpdateStream' is completely exhuasted. | ||
164 | isEmpty :: UpdateStream slot x -> STM Bool | ||
165 | isEmpty q = do | ||
166 | e <- isEmptyTChan (events q) | ||
167 | qs <- readTVar (cache q) | ||
168 | d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs | ||
169 | return $ e && d | ||