diff options
-rw-r--r-- | Control/Concurrent/STM/UpdateStream.hs | 17 |
1 files changed, 9 insertions, 8 deletions
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs index 502abefb..b01fc0bc 100644 --- a/Control/Concurrent/STM/UpdateStream.hs +++ b/Control/Concurrent/STM/UpdateStream.hs | |||
@@ -115,6 +115,11 @@ new isStart isStop = do | |||
115 | -- starts with the same slot value. Otherwise, the chunk is preserved. Note | 115 | -- starts with the same slot value. Otherwise, the chunk is preserved. Note |
116 | -- that the same slot value must be passed again for every chunk of the message | 116 | -- that the same slot value must be passed again for every chunk of the message |
117 | -- as it is not remembered. | 117 | -- as it is not remembered. |
118 | -- | ||
119 | -- Note that although a slot value is provided on each push, it is assumed that | ||
120 | -- messages will be pushed in contiguous streams. To change this, we would | ||
121 | -- need to add a mutable integer to track the nesting level of the primary | ||
122 | -- (non-slotted) stream. | ||
118 | push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () | 123 | push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () |
119 | push u Nothing x = writeTChan (events u) x | 124 | push u Nothing x = writeTChan (events u) x |
120 | push u (Just n) x = do | 125 | push u (Just n) x = do |
@@ -135,19 +140,15 @@ push u (Just n) x = do | |||
135 | -- and discarded when new messages are 'push'ed. | 140 | -- and discarded when new messages are 'push'ed. |
136 | pull :: Ord slot => UpdateStream slot x -> STM x | 141 | pull :: Ord slot => UpdateStream slot x -> STM x |
137 | pull u = do | 142 | pull u = do |
138 | (inm, mbscache) <- do | 143 | mbscache <- readTVar $ inMessage u |
139 | mn <- readTVar $ inMessage u | ||
140 | -- map <- readTVar (cache u) | ||
141 | return $ (isJust mn, mn) -- mn >>= flip Map.lookup map) | ||
142 | let action = | 144 | let action = |
143 | case (inm,mbscache) of | 145 | case mbscache of |
144 | (True,Just scache) -> do | 146 | Just scache -> do |
145 | x <- Status.pull scache | 147 | x <- Status.pull scache |
146 | nesting <- readTVar (Status.pullDepth scache) | 148 | nesting <- readTVar (Status.pullDepth scache) |
147 | when (nesting==0) $ writeTVar (inMessage u) Nothing | 149 | when (nesting==0) $ writeTVar (inMessage u) Nothing |
148 | return x | 150 | return x |
149 | (True,Nothing) -> readTChan (events u) | 151 | Nothing -> do |
150 | (False,_) -> do | ||
151 | cs <- fmap (Map.toList) $ readTVar (cache u) | 152 | cs <- fmap (Map.toList) $ readTVar (cache u) |
152 | cs <- filterM (return . not <=< Status.isEmpty . snd) | 153 | cs <- filterM (return . not <=< Status.isEmpty . snd) |
153 | cs | 154 | cs |