diff options
author | joe <joe@jerkface.net> | 2014-02-10 22:38:46 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-10 22:38:46 -0500 |
commit | 2d6dae13be15b61778ed35b3501df94a8e9dd78f (patch) | |
tree | 2ba13a1cc31886776614770a43ed18c60bfb39e3 /Control/Concurrent/STM/StatusCache.hs | |
parent | afff51ac877ce8807801334745f1679dbf6440d0 (diff) |
more xmppServer work
Diffstat (limited to 'Control/Concurrent/STM/StatusCache.hs')
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs new file mode 100644 index 00000000..601de14c --- /dev/null +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -0,0 +1,142 @@ | |||
1 | --------------------------------------------------------------------------- | ||
2 | -- | | ||
3 | -- Module : Control.Concurrent.STM.StatusCache | ||
4 | -- | ||
5 | -- | ||
6 | -- Maintainer : joe@jerkface.net | ||
7 | -- Stability : experimental | ||
8 | -- | ||
9 | -- Motivation: Suppose you must communicate state changes over a stream. If | ||
10 | -- the stream is consumed slowly, then it may occur that backlogged state | ||
11 | -- changes are obsoleted by newer changes in state. In this case, it is | ||
12 | -- desirable to discard the obsolete messages from the stream. | ||
13 | -- | ||
14 | -- A streamed status message might be very large, and it would be wasteful in | ||
15 | -- both time and space, to treat it as a monolithic blob that must be built | ||
16 | -- completely before it can be sent. Therefore, we require that each message | ||
17 | -- consist of a stream of smaller chunks of type @x@ and we require predicates | ||
18 | -- that indicate when a chunk starts a new message and when it ends a message. | ||
19 | -- | ||
20 | -- In the folowing example, our chunk type is Char and complete messages are | ||
21 | -- delimited by the characters '(' and ')'. We process the input stream | ||
22 | -- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an | ||
23 | -- efficient dedicated thread. The result follows: | ||
24 | -- | ||
25 | -- > Backlogged consumer: (ccccc) | ||
26 | -- > Fast consumer: (aaaa)(bb)(ccccc) | ||
27 | -- | ||
28 | -- The complete source code: | ||
29 | -- | ||
30 | -- > import Control.Monad (when, forever, (>=>)) | ||
31 | -- > import Control.Monad.STM (atomically) | ||
32 | -- > import Control.Concurrent (forkIO, threadDelay) | ||
33 | -- > import System.IO (hFlush, stdout) | ||
34 | -- > import qualified Control.Concurrent.STM.StatusCache as Cache | ||
35 | -- > | ||
36 | -- > while pred body = | ||
37 | -- > pred >>= flip when (body >> while pred body) | ||
38 | -- > | ||
39 | -- > main = do | ||
40 | -- > q <- atomically $ Cache.new (== '(') (==')') | ||
41 | -- > | ||
42 | -- > putStr $ "Backlogged consumer: " | ||
43 | -- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)" | ||
44 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | ||
45 | -- > c <- atomically $ Cache.pull q | ||
46 | -- > putChar c | ||
47 | -- > putStrLn "" | ||
48 | -- > hFlush stdout | ||
49 | -- > | ||
50 | -- > putStr "Fast consumer: " | ||
51 | -- > forkIO $ forever $ do | ||
52 | -- > c <- atomically $ Cache.pull q | ||
53 | -- > putChar c | ||
54 | -- > hFlush stdout | ||
55 | -- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) | ||
56 | -- > "(aaaa)(bb)(ccccc)" | ||
57 | -- > putStrLn "" | ||
58 | -- | ||
59 | -- As shown above, it is intended that this module be imported qualified. | ||
60 | -- | ||
61 | module Control.Concurrent.STM.StatusCache | ||
62 | ( StatusCache | ||
63 | , new | ||
64 | , push | ||
65 | , pull | ||
66 | , isStopper | ||
67 | , isStarter | ||
68 | , isEmpty | ||
69 | ) where | ||
70 | import Control.Monad | ||
71 | import Control.Monad.STM | ||
72 | import Control.Concurrent.STM.TVar | ||
73 | import Control.Concurrent.STM.TChan | ||
74 | |||
75 | data StatusCache x = | ||
76 | StatusCache { feed :: TVar (TChan x) | ||
77 | , cache :: TVar (TChan x) | ||
78 | , feedFlag :: TVar Bool | ||
79 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | ||
80 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | ||
81 | } | ||
82 | |||
83 | -- | @new isStart isStop@ | ||
84 | -- | ||
85 | -- The @isStart@ and @isStop@ predicates indicate when an element | ||
86 | -- begins or ends a message. | ||
87 | new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) | ||
88 | new isStart isStop = do | ||
89 | feed <- newTChan >>= newTVar | ||
90 | cache <- newTChan >>= newTVar | ||
91 | flag <- newTVar True | ||
92 | return StatusCache { feed = feed | ||
93 | , cache = cache | ||
94 | , feedFlag = flag | ||
95 | , isStarter = isStart | ||
96 | , isStopper = isStop | ||
97 | } | ||
98 | |||
99 | |||
100 | -- | Pull off a chunk from the 'StatusCache' for processing. | ||
101 | -- | ||
102 | -- If chunks are not pulled off quickly, they may be obsoleted | ||
103 | -- and discarded when new messages are 'push'ed. | ||
104 | pull :: StatusCache x -> STM x | ||
105 | pull q = do | ||
106 | hasCache <- readTVar (feedFlag q) | ||
107 | exhausted <- readTVar (feed q) >>= isEmptyTChan | ||
108 | when (hasCache && exhausted) $ do | ||
109 | next <- newTChan >>= swapTVar (cache q) | ||
110 | writeTVar (feedFlag q) False | ||
111 | writeTVar (feed q) next | ||
112 | chan <- readTVar $ feed q | ||
113 | exhausted <- isEmptyTChan chan | ||
114 | if exhausted then retry | ||
115 | else do | ||
116 | v <- readTChan chan | ||
117 | when (isStarter q v) | ||
118 | $ writeTVar (feedFlag q) False | ||
119 | return v | ||
120 | |||
121 | -- | Enqueue a chunk into the 'StatusCache'. | ||
122 | push :: StatusCache a -> a -> STM () | ||
123 | push q v = do | ||
124 | shouldCache <- readTVar (feedFlag q) | ||
125 | chan <- | ||
126 | if shouldCache then do | ||
127 | when (isStarter q v) | ||
128 | $ newTChan | ||
129 | >>= writeTVar (cache q) | ||
130 | readTVar $ cache q | ||
131 | else do | ||
132 | when (isStopper q v) | ||
133 | $ writeTVar (feedFlag q) True | ||
134 | readTVar $ feed q | ||
135 | writeTChan chan v | ||
136 | |||
137 | -- | True when the 'StatusCache' is completely exhuasted. | ||
138 | isEmpty :: StatusCache x -> STM Bool | ||
139 | isEmpty q = do | ||
140 | empty_feed <- readTVar (feed q) >>= isEmptyTChan | ||
141 | empty_cache <- readTVar (cache q) >>= isEmptyTChan | ||
142 | return $ empty_feed && empty_cache | ||