From 82f0f040bb204add2fe40f10dba259f2a524b824 Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 12 Feb 2014 00:42:08 -0500 Subject: updated UpdateStream for nesting --- Control/Concurrent/STM/StatusCache.hs | 3 ++- Control/Concurrent/STM/UpdateStream.hs | 15 +++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs index db77429f..3226978b 100644 --- a/Control/Concurrent/STM/StatusCache.hs +++ b/Control/Concurrent/STM/StatusCache.hs @@ -63,6 +63,7 @@ module Control.Concurrent.STM.StatusCache , new , push , pull + , pullDepth , isStopper , isStarter , isEmpty @@ -77,7 +78,7 @@ data StatusCache x = , cache :: TVar (TChan x) , feedFlag :: TVar Bool , pushDepth :: TVar Int - , pullDepth :: TVar Int + , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs index a92168c0..502abefb 100644 --- a/Control/Concurrent/STM/UpdateStream.hs +++ b/Control/Concurrent/STM/UpdateStream.hs @@ -88,7 +88,7 @@ import Data.Foldable (foldlM) data UpdateStream slot x = UpdateStream { cache :: TVar (Map slot (StatusCache x)) , events :: TChan x - , inMessage :: TVar (Maybe slot) + , inMessage :: TVar (Maybe (StatusCache x)) , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } @@ -137,11 +137,15 @@ pull :: Ord slot => UpdateStream slot x -> STM x pull u = do (inm, mbscache) <- do mn <- readTVar $ inMessage u - map <- readTVar (cache u) - return $ (isJust mn, mn >>= flip Map.lookup map) + -- map <- readTVar (cache u) + return $ (isJust mn, mn) -- mn >>= flip Map.lookup map) let action = case (inm,mbscache) of - (True,Just scache) -> Status.pull scache + (True,Just scache) -> do + x <- Status.pull scache + nesting <- readTVar (Status.pullDepth scache) + when (nesting==0) $ writeTVar (inMessage u) Nothing + return x (True,Nothing) -> readTChan (events u) (False,_) -> do cs <- fmap (Map.toList) $ readTVar (cache u) @@ -149,11 +153,10 @@ pull u = do cs maybe (readTChan $ events u) (\(n,scache) -> do - writeTVar (inMessage u) (Just n) + writeTVar (inMessage u) (Just scache) -- (Just n) Status.pull scache) (listToMaybe cs) x <- action - when (isStopper u x) $ writeTVar (inMessage u) Nothing return x -- | True when the 'UpdateStream' is completely exhuasted. -- cgit v1.2.3