diff options
Diffstat (limited to 'Control/Concurrent')
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 26 |
1 files changed, 21 insertions, 5 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs index 601de14c..e08be33c 100644 --- a/Control/Concurrent/STM/StatusCache.hs +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -76,6 +76,8 @@ data StatusCache x = | |||
76 | StatusCache { feed :: TVar (TChan x) | 76 | StatusCache { feed :: TVar (TChan x) |
77 | , cache :: TVar (TChan x) | 77 | , cache :: TVar (TChan x) |
78 | , feedFlag :: TVar Bool | 78 | , feedFlag :: TVar Bool |
79 | , pushDepth :: TVar Int | ||
80 | , pullDepth :: TVar Int | ||
79 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | 81 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. |
80 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | 82 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. |
81 | } | 83 | } |
@@ -89,9 +91,13 @@ new isStart isStop = do | |||
89 | feed <- newTChan >>= newTVar | 91 | feed <- newTChan >>= newTVar |
90 | cache <- newTChan >>= newTVar | 92 | cache <- newTChan >>= newTVar |
91 | flag <- newTVar True | 93 | flag <- newTVar True |
94 | pushd <- newTVar 0 | ||
95 | pulld <- newTVar 0 | ||
92 | return StatusCache { feed = feed | 96 | return StatusCache { feed = feed |
93 | , cache = cache | 97 | , cache = cache |
94 | , feedFlag = flag | 98 | , feedFlag = flag |
99 | , pushDepth = pushd | ||
100 | , pullDepth = pulld | ||
95 | , isStarter = isStart | 101 | , isStarter = isStart |
96 | , isStopper = isStop | 102 | , isStopper = isStop |
97 | } | 103 | } |
@@ -114,23 +120,33 @@ pull q = do | |||
114 | if exhausted then retry | 120 | if exhausted then retry |
115 | else do | 121 | else do |
116 | v <- readTChan chan | 122 | v <- readTChan chan |
117 | when (isStarter q v) | 123 | when (isStarter q v) $ do |
118 | $ writeTVar (feedFlag q) False | 124 | depth <- readTVar (pullDepth q) |
125 | modifyTVar' (pullDepth q) (+1) | ||
126 | when (depth==0) | ||
127 | $ writeTVar (feedFlag q) False | ||
128 | when (isStopper q v) | ||
129 | $ modifyTVar' (pullDepth q) (subtract 1) | ||
119 | return v | 130 | return v |
120 | 131 | ||
121 | -- | Enqueue a chunk into the 'StatusCache'. | 132 | -- | Enqueue a chunk into the 'StatusCache'. |
122 | push :: StatusCache a -> a -> STM () | 133 | push :: StatusCache a -> a -> STM () |
123 | push q v = do | 134 | push q v = do |
124 | shouldCache <- readTVar (feedFlag q) | 135 | shouldCache <- readTVar (feedFlag q) |
136 | depth <- readTVar (pushDepth q) | ||
137 | when (isStopper q v) $ do | ||
138 | modifyTVar' (pushDepth q) (subtract 1) | ||
139 | when (depth==0) | ||
140 | $ writeTVar (feedFlag q) True | ||
141 | when (isStarter q v) | ||
142 | $ modifyTVar' (pushDepth q) (+1) | ||
125 | chan <- | 143 | chan <- |
126 | if shouldCache then do | 144 | if shouldCache then do |
127 | when (isStarter q v) | 145 | when (depth==0 && isStarter q v) |
128 | $ newTChan | 146 | $ newTChan |
129 | >>= writeTVar (cache q) | 147 | >>= writeTVar (cache q) |
130 | readTVar $ cache q | 148 | readTVar $ cache q |
131 | else do | 149 | else do |
132 | when (isStopper q v) | ||
133 | $ writeTVar (feedFlag q) True | ||
134 | readTVar $ feed q | 150 | readTVar $ feed q |
135 | writeTChan chan v | 151 | writeTChan chan v |
136 | 152 | ||