--------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.STM.StatusCache -- -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- Motivation: Suppose you must communicate state changes over a stream. If -- the stream is consumed slowly, then it may occur that backlogged state -- changes are obsoleted by newer changes in state. In this case, it is -- desirable to discard the obsolete messages from the stream. -- -- A streamed status message might be very large, and it would be wasteful in -- both time and space, to treat it as a monolithic blob that must be built -- completely before it can be sent. Therefore, we require that each message -- consist of a stream of smaller chunks of type @x@ and we require predicates -- that indicate when a chunk starts a new message and when it ends a message. -- -- In the folowing example, our chunk type is Char and complete messages are -- delimited by the characters '(' and ')'. We process the input stream -- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an -- efficient dedicated thread. The result follows: -- -- > Backlogged consumer: (ccccc) -- > Fast consumer: (aaaa)(bb)(ccccc) -- -- The complete source code: -- -- > import Control.Monad (when, forever, (>=>)) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent (forkIO, threadDelay) -- > import System.IO (hFlush, stdout) -- > import qualified Control.Concurrent.STM.StatusCache as Cache -- > -- > while pred body = -- > pred >>= flip when (body >> while pred body) -- > -- > main = do -- > q <- atomically $ Cache.new (== '(') (==')') -- > -- > putStr $ "Backlogged consumer: " -- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)" -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > putStrLn "" -- > hFlush stdout -- > -- > putStr "Fast consumer: " -- > forkIO $ forever $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > hFlush stdout -- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) -- > "(aaaa)(bb)(ccccc)" -- > putStrLn "" -- -- As shown above, it is intended that this module be imported qualified. -- module Control.Concurrent.STM.StatusCache ( StatusCache , new , push , pull , isStopper , isStarter , isEmpty ) where import Control.Monad import Control.Monad.STM import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TChan data StatusCache x = StatusCache { feed :: TVar (TChan x) , cache :: TVar (TChan x) , feedFlag :: TVar Bool , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } -- | @new isStart isStop@ -- -- The @isStart@ and @isStop@ predicates indicate when an element -- begins or ends a message. new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) new isStart isStop = do feed <- newTChan >>= newTVar cache <- newTChan >>= newTVar flag <- newTVar True return StatusCache { feed = feed , cache = cache , feedFlag = flag , isStarter = isStart , isStopper = isStop } -- | Pull off a chunk from the 'StatusCache' for processing. -- -- If chunks are not pulled off quickly, they may be obsoleted -- and discarded when new messages are 'push'ed. pull :: StatusCache x -> STM x pull q = do hasCache <- readTVar (feedFlag q) exhausted <- readTVar (feed q) >>= isEmptyTChan when (hasCache && exhausted) $ do next <- newTChan >>= swapTVar (cache q) writeTVar (feedFlag q) False writeTVar (feed q) next chan <- readTVar $ feed q exhausted <- isEmptyTChan chan if exhausted then retry else do v <- readTChan chan when (isStarter q v) $ writeTVar (feedFlag q) False return v -- | Enqueue a chunk into the 'StatusCache'. push :: StatusCache a -> a -> STM () push q v = do shouldCache <- readTVar (feedFlag q) chan <- if shouldCache then do when (isStarter q v) $ newTChan >>= writeTVar (cache q) readTVar $ cache q else do when (isStopper q v) $ writeTVar (feedFlag q) True readTVar $ feed q writeTChan chan v -- | True when the 'StatusCache' is completely exhuasted. isEmpty :: StatusCache x -> STM Bool isEmpty q = do empty_feed <- readTVar (feed q) >>= isEmptyTChan empty_cache <- readTVar (cache q) >>= isEmptyTChan return $ empty_feed && empty_cache