diff options
Diffstat (limited to 'Control/Concurrent')
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 22 |
1 files changed, 11 insertions, 11 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs index f6295aa0..d1c977ae 100644 --- a/Control/Concurrent/STM/StatusCache.hs +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -32,13 +32,13 @@ | |||
32 | -- > import Control.Concurrent (forkIO, threadDelay) | 32 | -- > import Control.Concurrent (forkIO, threadDelay) |
33 | -- > import System.IO (hFlush, stdout) | 33 | -- > import System.IO (hFlush, stdout) |
34 | -- > import qualified Control.Concurrent.STM.StatusCache as Cache | 34 | -- > import qualified Control.Concurrent.STM.StatusCache as Cache |
35 | -- > | 35 | -- > |
36 | -- > main = do q <- atomically $ Cache.new (== '(') (==')') | 36 | -- > main = do q <- atomically $ Cache.new (== '(') (==')') |
37 | -- > backlog q "(aaa(a))(bb)(cc(c)cc)" | 37 | -- > backlog q "(aaa(a))(bb)(cc(c)cc)" |
38 | -- > fast q "(aaa(a))(bb)(cc(c)cc)" | 38 | -- > fast q "(aaa(a))(bb)(cc(c)cc)" |
39 | -- > | 39 | -- > |
40 | -- > while pred body = pred >>= flip when (body >> while pred body) | 40 | -- > while pred body = pred >>= flip when (body >> while pred body) |
41 | -- > | 41 | -- > |
42 | -- > backlog q xs = do putStr $ "Backlogged consumer: " | 42 | -- > backlog q xs = do putStr $ "Backlogged consumer: " |
43 | -- > mapM_ (atomically . Cache.push q) xs | 43 | -- > mapM_ (atomically . Cache.push q) xs |
44 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | 44 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do |
@@ -46,7 +46,7 @@ | |||
46 | -- > putChar c | 46 | -- > putChar c |
47 | -- > putStrLn "" | 47 | -- > putStrLn "" |
48 | -- > hFlush stdout | 48 | -- > hFlush stdout |
49 | -- > | 49 | -- > |
50 | -- > fast q xs = do putStr "Fast consumer: " | 50 | -- > fast q xs = do putStr "Fast consumer: " |
51 | -- > forkIO $ forever $ do | 51 | -- > forkIO $ forever $ do |
52 | -- > c <- atomically $ Cache.pull q | 52 | -- > c <- atomically $ Cache.pull q |
@@ -90,7 +90,7 @@ data StatusCache x = | |||
90 | -- begins or ends a message. | 90 | -- begins or ends a message. |
91 | new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) | 91 | new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) |
92 | new isStart isStop = do | 92 | new isStart isStop = do |
93 | feed <- newTChan >>= newTVar | 93 | feed <- newTChan >>= newTVar |
94 | cache <- newTChan >>= newTVar | 94 | cache <- newTChan >>= newTVar |
95 | flag <- newTVar True | 95 | flag <- newTVar True |
96 | pushd <- newTVar 0 | 96 | pushd <- newTVar 0 |
@@ -103,10 +103,10 @@ new isStart isStop = do | |||
103 | , isStarter = isStart | 103 | , isStarter = isStart |
104 | , isStopper = isStop | 104 | , isStopper = isStop |
105 | } | 105 | } |
106 | 106 | ||
107 | 107 | ||
108 | -- | Pull off a chunk from the 'StatusCache' for processing. | 108 | -- | Pull off a chunk from the 'StatusCache' for processing. |
109 | -- | 109 | -- |
110 | -- If chunks are not pulled off quickly, they may be obsoleted | 110 | -- If chunks are not pulled off quickly, they may be obsoleted |
111 | -- and discarded when new messages are 'push'ed. | 111 | -- and discarded when new messages are 'push'ed. |
112 | pull :: StatusCache x -> STM x | 112 | pull :: StatusCache x -> STM x |
@@ -127,7 +127,7 @@ pull q = do | |||
127 | modifyTVar' (pullDepth q) (+1) | 127 | modifyTVar' (pullDepth q) (+1) |
128 | when (depth==0) | 128 | when (depth==0) |
129 | $ writeTVar (feedFlag q) False | 129 | $ writeTVar (feedFlag q) False |
130 | when (isStopper q v) | 130 | when (isStopper q v) |
131 | $ modifyTVar' (pullDepth q) (subtract 1) | 131 | $ modifyTVar' (pullDepth q) (subtract 1) |
132 | return v | 132 | return v |
133 | 133 | ||
@@ -140,13 +140,13 @@ push q v = do | |||
140 | modifyTVar' (pushDepth q) (subtract 1) | 140 | modifyTVar' (pushDepth q) (subtract 1) |
141 | when (depth==0) | 141 | when (depth==0) |
142 | $ writeTVar (feedFlag q) True | 142 | $ writeTVar (feedFlag q) True |
143 | when (isStarter q v) | 143 | when (isStarter q v) |
144 | $ modifyTVar' (pushDepth q) (+1) | 144 | $ modifyTVar' (pushDepth q) (+1) |
145 | chan <- | 145 | chan <- |
146 | if shouldCache then do | 146 | if shouldCache then do |
147 | when (depth==0 && isStarter q v) | 147 | when (depth==0 && isStarter q v) |
148 | $ newTChan | 148 | $ newTChan |
149 | >>= writeTVar (cache q) | 149 | >>= writeTVar (cache q) |
150 | readTVar $ cache q | 150 | readTVar $ cache q |
151 | else do | 151 | else do |
152 | readTVar $ feed q | 152 | readTVar $ feed q |