--------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.STM.StatusCache -- -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- Motivation: Suppose you must communicate state changes over a stream. If -- the stream is consumed slowly, then it may occur that backlogged state -- changes are obsoleted by newer changes in state. In this case, it is -- desirable to discard the obsolete messages from the stream. -- -- A streamed status message might be very large, and it would be wasteful in -- both time and space, to treat it as a monolithic blob that must be built -- completely before it can be sent. Therefore, we require that each message -- consist of a stream of smaller chunks of type @x@ and we require predicates -- that indicate when a chunk starts a new message and when it ends a message. -- -- In the folowing example, our chunk type is Char and complete messages are -- delimited by the characters '(' and ')'. We process the input stream -- \"(aaa(a))(bb)(cc(c)cc)\" first with a delayed processor and then again with -- an efficient dedicated thread. The result follows: -- -- > Backlogged consumer: (cc(c)cc) -- > Fast consumer: (aaa(a))(bb)(cc(c)cc) -- -- The complete source code: -- -- > import Control.Monad (when, forever, (>=>)) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent (forkIO, threadDelay) -- > import System.IO (hFlush, stdout) -- > import qualified Control.Concurrent.STM.StatusCache as Cache -- > -- > main = do q <- atomically $ Cache.new (== '(') (==')') -- > backlog q "(aaa(a))(bb)(cc(c)cc)" -- > fast q "(aaa(a))(bb)(cc(c)cc)" -- > -- > while pred body = pred >>= flip when (body >> while pred body) -- > -- > backlog q xs = do putStr $ "Backlogged consumer: " -- > mapM_ (atomically . Cache.push q) xs -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > putStrLn "" -- > hFlush stdout -- > -- > fast q xs = do putStr "Fast consumer: " -- > forkIO $ forever $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > hFlush stdout -- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) -- > xs -- > putStrLn "" -- -- As shown above, it is intended that this module be imported qualified. -- {-# LANGUAGE DoAndIfThenElse #-} module Control.Concurrent.STM.StatusCache ( StatusCache , new , push , pull , pullDepth , isStopper , isStarter , isEmpty ) where import Control.Monad import Control.Monad.STM import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TChan data StatusCache x = StatusCache { feed :: TVar (TChan x) , cache :: TVar (TChan x) , feedFlag :: TVar Bool , pushDepth :: TVar Int , pullDepth :: TVar Int -- ^ current nesting depth of next-to-pull data , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } -- | @new isStart isStop@ -- -- The @isStart@ and @isStop@ predicates indicate when an element -- begins or ends a message. new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) new isStart isStop = do feed <- newTChan >>= newTVar cache <- newTChan >>= newTVar flag <- newTVar True pushd <- newTVar 0 pulld <- newTVar 0 return StatusCache { feed = feed , cache = cache , feedFlag = flag , pushDepth = pushd , pullDepth = pulld , isStarter = isStart , isStopper = isStop } -- | Pull off a chunk from the 'StatusCache' for processing. -- -- If chunks are not pulled off quickly, they may be obsoleted -- and discarded when new messages are 'push'ed. pull :: StatusCache x -> STM x pull q = do hasCache <- readTVar (feedFlag q) exhausted <- readTVar (feed q) >>= isEmptyTChan when (hasCache && exhausted) $ do next <- newTChan >>= swapTVar (cache q) writeTVar (feedFlag q) False writeTVar (feed q) next chan <- readTVar $ feed q exhausted <- isEmptyTChan chan if exhausted then retry else do v <- readTChan chan when (isStarter q v) $ do depth <- readTVar (pullDepth q) modifyTVar' (pullDepth q) (+1) when (depth==0) $ writeTVar (feedFlag q) False when (isStopper q v) $ modifyTVar' (pullDepth q) (subtract 1) return v -- | Enqueue a chunk into the 'StatusCache'. push :: StatusCache a -> a -> STM () push q v = do shouldCache <- readTVar (feedFlag q) depth <- readTVar (pushDepth q) when (isStopper q v) $ do modifyTVar' (pushDepth q) (subtract 1) when (depth==0) $ writeTVar (feedFlag q) True when (isStarter q v) $ modifyTVar' (pushDepth q) (+1) chan <- if shouldCache then do when (depth==0 && isStarter q v) $ newTChan >>= writeTVar (cache q) readTVar $ cache q else do readTVar $ feed q writeTChan chan v -- | True when the 'StatusCache' is completely exhuasted. isEmpty :: StatusCache x -> STM Bool isEmpty q = do empty_feed <- readTVar (feed q) >>= isEmptyTChan empty_cache <- readTVar (cache q) >>= isEmptyTChan return $ empty_feed && empty_cache