summaryrefslogtreecommitdiff
path: root/Control/Concurrent
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-11 17:47:06 -0500
committerjoe <joe@jerkface.net>2014-02-11 17:47:06 -0500
commitb10d32b7235b48870ff0bdb2d14edbe7a4aeb80d (patch)
tree19e95f09a72c52af26d5c775504c4487287e461b /Control/Concurrent
parent2d6dae13be15b61778ed35b3501df94a8e9dd78f (diff)
Nesting support for StatusCache
Diffstat (limited to 'Control/Concurrent')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs26
1 files changed, 21 insertions, 5 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
index 601de14c..e08be33c 100644
--- a/Control/Concurrent/STM/StatusCache.hs
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -76,6 +76,8 @@ data StatusCache x =
76 StatusCache { feed :: TVar (TChan x) 76 StatusCache { feed :: TVar (TChan x)
77 , cache :: TVar (TChan x) 77 , cache :: TVar (TChan x)
78 , feedFlag :: TVar Bool 78 , feedFlag :: TVar Bool
79 , pushDepth :: TVar Int
80 , pullDepth :: TVar Int
79 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. 81 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
80 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. 82 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
81 } 83 }
@@ -89,9 +91,13 @@ new isStart isStop = do
89 feed <- newTChan >>= newTVar 91 feed <- newTChan >>= newTVar
90 cache <- newTChan >>= newTVar 92 cache <- newTChan >>= newTVar
91 flag <- newTVar True 93 flag <- newTVar True
94 pushd <- newTVar 0
95 pulld <- newTVar 0
92 return StatusCache { feed = feed 96 return StatusCache { feed = feed
93 , cache = cache 97 , cache = cache
94 , feedFlag = flag 98 , feedFlag = flag
99 , pushDepth = pushd
100 , pullDepth = pulld
95 , isStarter = isStart 101 , isStarter = isStart
96 , isStopper = isStop 102 , isStopper = isStop
97 } 103 }
@@ -114,23 +120,33 @@ pull q = do
114 if exhausted then retry 120 if exhausted then retry
115 else do 121 else do
116 v <- readTChan chan 122 v <- readTChan chan
117 when (isStarter q v) 123 when (isStarter q v) $ do
118 $ writeTVar (feedFlag q) False 124 depth <- readTVar (pullDepth q)
125 modifyTVar' (pullDepth q) (+1)
126 when (depth==0)
127 $ writeTVar (feedFlag q) False
128 when (isStopper q v)
129 $ modifyTVar' (pullDepth q) (subtract 1)
119 return v 130 return v
120 131
121-- | Enqueue a chunk into the 'StatusCache'. 132-- | Enqueue a chunk into the 'StatusCache'.
122push :: StatusCache a -> a -> STM () 133push :: StatusCache a -> a -> STM ()
123push q v = do 134push q v = do
124 shouldCache <- readTVar (feedFlag q) 135 shouldCache <- readTVar (feedFlag q)
136 depth <- readTVar (pushDepth q)
137 when (isStopper q v) $ do
138 modifyTVar' (pushDepth q) (subtract 1)
139 when (depth==0)
140 $ writeTVar (feedFlag q) True
141 when (isStarter q v)
142 $ modifyTVar' (pushDepth q) (+1)
125 chan <- 143 chan <-
126 if shouldCache then do 144 if shouldCache then do
127 when (isStarter q v) 145 when (depth==0 && isStarter q v)
128 $ newTChan 146 $ newTChan
129 >>= writeTVar (cache q) 147 >>= writeTVar (cache q)
130 readTVar $ cache q 148 readTVar $ cache q
131 else do 149 else do
132 when (isStopper q v)
133 $ writeTVar (feedFlag q) True
134 readTVar $ feed q 150 readTVar $ feed q
135 writeTChan chan v 151 writeTChan chan v
136 152