summaryrefslogtreecommitdiff
path: root/Control/Concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'Control/Concurrent')
-rw-r--r--Control/Concurrent/STM/StatusCache.hs3
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs15
2 files changed, 11 insertions, 7 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
index db77429f..3226978b 100644
--- a/Control/Concurrent/STM/StatusCache.hs
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -63,6 +63,7 @@ module Control.Concurrent.STM.StatusCache
63 , new 63 , new
64 , push 64 , push
65 , pull 65 , pull
66 , pullDepth
66 , isStopper 67 , isStopper
67 , isStarter 68 , isStarter
68 , isEmpty 69 , isEmpty
@@ -77,7 +78,7 @@ data StatusCache x =
77 , cache :: TVar (TChan x) 78 , cache :: TVar (TChan x)
78 , feedFlag :: TVar Bool 79 , feedFlag :: TVar Bool
79 , pushDepth :: TVar Int 80 , pushDepth :: TVar Int
80 , pullDepth :: TVar Int 81 , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data
81 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. 82 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
82 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. 83 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
83 } 84 }
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
index a92168c0..502abefb 100644
--- a/Control/Concurrent/STM/UpdateStream.hs
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -88,7 +88,7 @@ import Data.Foldable (foldlM)
88data UpdateStream slot x = 88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x)) 89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x 90 , events :: TChan x
91 , inMessage :: TVar (Maybe slot) 91 , inMessage :: TVar (Maybe (StatusCache x))
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. 92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. 93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 } 94 }
@@ -137,11 +137,15 @@ pull :: Ord slot => UpdateStream slot x -> STM x
137pull u = do 137pull u = do
138 (inm, mbscache) <- do 138 (inm, mbscache) <- do
139 mn <- readTVar $ inMessage u 139 mn <- readTVar $ inMessage u
140 map <- readTVar (cache u) 140 -- map <- readTVar (cache u)
141 return $ (isJust mn, mn >>= flip Map.lookup map) 141 return $ (isJust mn, mn) -- mn >>= flip Map.lookup map)
142 let action = 142 let action =
143 case (inm,mbscache) of 143 case (inm,mbscache) of
144 (True,Just scache) -> Status.pull scache 144 (True,Just scache) -> do
145 x <- Status.pull scache
146 nesting <- readTVar (Status.pullDepth scache)
147 when (nesting==0) $ writeTVar (inMessage u) Nothing
148 return x
145 (True,Nothing) -> readTChan (events u) 149 (True,Nothing) -> readTChan (events u)
146 (False,_) -> do 150 (False,_) -> do
147 cs <- fmap (Map.toList) $ readTVar (cache u) 151 cs <- fmap (Map.toList) $ readTVar (cache u)
@@ -149,11 +153,10 @@ pull u = do
149 cs 153 cs
150 maybe (readTChan $ events u) 154 maybe (readTChan $ events u)
151 (\(n,scache) -> do 155 (\(n,scache) -> do
152 writeTVar (inMessage u) (Just n) 156 writeTVar (inMessage u) (Just scache) -- (Just n)
153 Status.pull scache) 157 Status.pull scache)
154 (listToMaybe cs) 158 (listToMaybe cs)
155 x <- action 159 x <- action
156 when (isStopper u x) $ writeTVar (inMessage u) Nothing
157 return x 160 return x
158 161
159-- | True when the 'UpdateStream' is completely exhuasted. 162-- | True when the 'UpdateStream' is completely exhuasted.