diff options
Diffstat (limited to 'Control/Concurrent')
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 3 | ||||
-rw-r--r-- | Control/Concurrent/STM/UpdateStream.hs | 15 |
2 files changed, 11 insertions, 7 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs index db77429f..3226978b 100644 --- a/Control/Concurrent/STM/StatusCache.hs +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -63,6 +63,7 @@ module Control.Concurrent.STM.StatusCache | |||
63 | , new | 63 | , new |
64 | , push | 64 | , push |
65 | , pull | 65 | , pull |
66 | , pullDepth | ||
66 | , isStopper | 67 | , isStopper |
67 | , isStarter | 68 | , isStarter |
68 | , isEmpty | 69 | , isEmpty |
@@ -77,7 +78,7 @@ data StatusCache x = | |||
77 | , cache :: TVar (TChan x) | 78 | , cache :: TVar (TChan x) |
78 | , feedFlag :: TVar Bool | 79 | , feedFlag :: TVar Bool |
79 | , pushDepth :: TVar Int | 80 | , pushDepth :: TVar Int |
80 | , pullDepth :: TVar Int | 81 | , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data |
81 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | 82 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. |
82 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | 83 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. |
83 | } | 84 | } |
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs index a92168c0..502abefb 100644 --- a/Control/Concurrent/STM/UpdateStream.hs +++ b/Control/Concurrent/STM/UpdateStream.hs | |||
@@ -88,7 +88,7 @@ import Data.Foldable (foldlM) | |||
88 | data UpdateStream slot x = | 88 | data UpdateStream slot x = |
89 | UpdateStream { cache :: TVar (Map slot (StatusCache x)) | 89 | UpdateStream { cache :: TVar (Map slot (StatusCache x)) |
90 | , events :: TChan x | 90 | , events :: TChan x |
91 | , inMessage :: TVar (Maybe slot) | 91 | , inMessage :: TVar (Maybe (StatusCache x)) |
92 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | 92 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. |
93 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | 93 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. |
94 | } | 94 | } |
@@ -137,11 +137,15 @@ pull :: Ord slot => UpdateStream slot x -> STM x | |||
137 | pull u = do | 137 | pull u = do |
138 | (inm, mbscache) <- do | 138 | (inm, mbscache) <- do |
139 | mn <- readTVar $ inMessage u | 139 | mn <- readTVar $ inMessage u |
140 | map <- readTVar (cache u) | 140 | -- map <- readTVar (cache u) |
141 | return $ (isJust mn, mn >>= flip Map.lookup map) | 141 | return $ (isJust mn, mn) -- mn >>= flip Map.lookup map) |
142 | let action = | 142 | let action = |
143 | case (inm,mbscache) of | 143 | case (inm,mbscache) of |
144 | (True,Just scache) -> Status.pull scache | 144 | (True,Just scache) -> do |
145 | x <- Status.pull scache | ||
146 | nesting <- readTVar (Status.pullDepth scache) | ||
147 | when (nesting==0) $ writeTVar (inMessage u) Nothing | ||
148 | return x | ||
145 | (True,Nothing) -> readTChan (events u) | 149 | (True,Nothing) -> readTChan (events u) |
146 | (False,_) -> do | 150 | (False,_) -> do |
147 | cs <- fmap (Map.toList) $ readTVar (cache u) | 151 | cs <- fmap (Map.toList) $ readTVar (cache u) |
@@ -149,11 +153,10 @@ pull u = do | |||
149 | cs | 153 | cs |
150 | maybe (readTChan $ events u) | 154 | maybe (readTChan $ events u) |
151 | (\(n,scache) -> do | 155 | (\(n,scache) -> do |
152 | writeTVar (inMessage u) (Just n) | 156 | writeTVar (inMessage u) (Just scache) -- (Just n) |
153 | Status.pull scache) | 157 | Status.pull scache) |
154 | (listToMaybe cs) | 158 | (listToMaybe cs) |
155 | x <- action | 159 | x <- action |
156 | when (isStopper u x) $ writeTVar (inMessage u) Nothing | ||
157 | return x | 160 | return x |
158 | 161 | ||
159 | -- | True when the 'UpdateStream' is completely exhuasted. | 162 | -- | True when the 'UpdateStream' is completely exhuasted. |