summaryrefslogtreecommitdiff
path: root/Control/Concurrent/STM/StatusCache.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /Control/Concurrent/STM/StatusCache.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'Control/Concurrent/STM/StatusCache.hs')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs160
1 files changed, 0 insertions, 160 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
deleted file mode 100644
index d1c977ae..00000000
--- a/Control/Concurrent/STM/StatusCache.hs
+++ /dev/null
@@ -1,160 +0,0 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.StatusCache
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- Motivation: Suppose you must communicate state changes over a stream. If
10-- the stream is consumed slowly, then it may occur that backlogged state
11-- changes are obsoleted by newer changes in state. In this case, it is
12-- desirable to discard the obsolete messages from the stream.
13--
14-- A streamed status message might be very large, and it would be wasteful in
15-- both time and space, to treat it as a monolithic blob that must be built
16-- completely before it can be sent. Therefore, we require that each message
17-- consist of a stream of smaller chunks of type @x@ and we require predicates
18-- that indicate when a chunk starts a new message and when it ends a message.
19--
20-- In the folowing example, our chunk type is Char and complete messages are
21-- delimited by the characters '(' and ')'. We process the input stream
22-- \"(aaa(a))(bb)(cc(c)cc)\" first with a delayed processor and then again with
23-- an efficient dedicated thread. The result follows:
24--
25-- > Backlogged consumer: (cc(c)cc)
26-- > Fast consumer: (aaa(a))(bb)(cc(c)cc)
27--
28-- The complete source code:
29--
30-- > import Control.Monad (when, forever, (>=>))
31-- > import Control.Monad.STM (atomically)
32-- > import Control.Concurrent (forkIO, threadDelay)
33-- > import System.IO (hFlush, stdout)
34-- > import qualified Control.Concurrent.STM.StatusCache as Cache
35-- >
36-- > main = do q <- atomically $ Cache.new (== '(') (==')')
37-- > backlog q "(aaa(a))(bb)(cc(c)cc)"
38-- > fast q "(aaa(a))(bb)(cc(c)cc)"
39-- >
40-- > while pred body = pred >>= flip when (body >> while pred body)
41-- >
42-- > backlog q xs = do putStr $ "Backlogged consumer: "
43-- > mapM_ (atomically . Cache.push q) xs
44-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
45-- > c <- atomically $ Cache.pull q
46-- > putChar c
47-- > putStrLn ""
48-- > hFlush stdout
49-- >
50-- > fast q xs = do putStr "Fast consumer: "
51-- > forkIO $ forever $ do
52-- > c <- atomically $ Cache.pull q
53-- > putChar c
54-- > hFlush stdout
55-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
56-- > xs
57-- > putStrLn ""
58--
59-- As shown above, it is intended that this module be imported qualified.
60--
61{-# LANGUAGE DoAndIfThenElse #-}
62module Control.Concurrent.STM.StatusCache
63 ( StatusCache
64 , new
65 , push
66 , pull
67 , pullDepth
68 , isStopper
69 , isStarter
70 , isEmpty
71 ) where
72import Control.Monad
73import Control.Monad.STM
74import Control.Concurrent.STM.TVar
75import Control.Concurrent.STM.TChan
76
77data StatusCache x =
78 StatusCache { feed :: TVar (TChan x)
79 , cache :: TVar (TChan x)
80 , feedFlag :: TVar Bool
81 , pushDepth :: TVar Int
82 , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data
83 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
84 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
85 }
86
87-- | @new isStart isStop@
88--
89-- The @isStart@ and @isStop@ predicates indicate when an element
90-- begins or ends a message.
91new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
92new isStart isStop = do
93 feed <- newTChan >>= newTVar
94 cache <- newTChan >>= newTVar
95 flag <- newTVar True
96 pushd <- newTVar 0
97 pulld <- newTVar 0
98 return StatusCache { feed = feed
99 , cache = cache
100 , feedFlag = flag
101 , pushDepth = pushd
102 , pullDepth = pulld
103 , isStarter = isStart
104 , isStopper = isStop
105 }
106
107
108-- | Pull off a chunk from the 'StatusCache' for processing.
109--
110-- If chunks are not pulled off quickly, they may be obsoleted
111-- and discarded when new messages are 'push'ed.
112pull :: StatusCache x -> STM x
113pull q = do
114 hasCache <- readTVar (feedFlag q)
115 exhausted <- readTVar (feed q) >>= isEmptyTChan
116 when (hasCache && exhausted) $ do
117 next <- newTChan >>= swapTVar (cache q)
118 writeTVar (feedFlag q) False
119 writeTVar (feed q) next
120 chan <- readTVar $ feed q
121 exhausted <- isEmptyTChan chan
122 if exhausted then retry
123 else do
124 v <- readTChan chan
125 when (isStarter q v) $ do
126 depth <- readTVar (pullDepth q)
127 modifyTVar' (pullDepth q) (+1)
128 when (depth==0)
129 $ writeTVar (feedFlag q) False
130 when (isStopper q v)
131 $ modifyTVar' (pullDepth q) (subtract 1)
132 return v
133
134-- | Enqueue a chunk into the 'StatusCache'.
135push :: StatusCache a -> a -> STM ()
136push q v = do
137 shouldCache <- readTVar (feedFlag q)
138 depth <- readTVar (pushDepth q)
139 when (isStopper q v) $ do
140 modifyTVar' (pushDepth q) (subtract 1)
141 when (depth==0)
142 $ writeTVar (feedFlag q) True
143 when (isStarter q v)
144 $ modifyTVar' (pushDepth q) (+1)
145 chan <-
146 if shouldCache then do
147 when (depth==0 && isStarter q v)
148 $ newTChan
149 >>= writeTVar (cache q)
150 readTVar $ cache q
151 else do
152 readTVar $ feed q
153 writeTChan chan v
154
155-- | True when the 'StatusCache' is completely exhuasted.
156isEmpty :: StatusCache x -> STM Bool
157isEmpty q = do
158 empty_feed <- readTVar (feed q) >>= isEmptyTChan
159 empty_cache <- readTVar (cache q) >>= isEmptyTChan
160 return $ empty_feed && empty_cache