--------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.STM.UpdateStream -- -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- An UpdateStream consists of pass-through messages that are queued up and -- slotted messages which might be obsoleted and discarded if they are not -- consumed before newer slotted messages with the same slot value occur. -- -- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is -- recommended that you read that documentation first. -- -- For example, the output -- -- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) -- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) -- -- is produced by the following code: -- -- > import Control.Monad (when, forever, (>=>)) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent (forkIO, threadDelay) -- > import System.IO (hFlush, stdout) -- > import qualified Control.Concurrent.STM.UpdateStream as Cache -- > -- > messages :: [(Maybe String, Char)] -- > messages = concat -- > [ slot "x" "(set x = 2)" -- > , message "(Hello)" -- > , slot "y" "(set y = 23)" -- > , message "(Joe)" -- > , slot "y" "(set y = 100)" -- > , message "(was)" -- > , slot "x" "(set x = 5)" -- > , message "(here)" -- > ] -- > where -- > slot v cs = map ((,) (Just v)) cs -- > message cs = map ((,) Nothing) cs -- > -- > main = do -- > q <- atomically $ Cache.new (== '(') (==')') -- > let go = mapM_ (atomically . (uncurry $ Cache.push q) -- > >=> const (threadDelay 10000)) -- > messages -- > slowly = do -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > putStrLn "" -- > hFlush stdout -- > where while pred body = -- > pred >>= flip when (body >> while pred body) -- > quickly = forkIO . forever $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > hFlush stdout -- > putStr $ "Backlogged consumer: " -- > go >> slowly -- > putStr "Fast consumer: " -- > quickly >> go -- > putStrLn "" -- module Control.Concurrent.STM.UpdateStream ( UpdateStream , new , push , pull , isStopper , isStarter , isEmpty ) where import Control.Monad import Control.Monad.STM import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TChan import Control.Concurrent.STM.StatusCache (StatusCache) import qualified Control.Concurrent.STM.StatusCache as Status import Data.Map (Map) import qualified Data.Map as Map import Data.Maybe import Data.Foldable (foldlM) data UpdateStream slot x = UpdateStream { cache :: TVar (Map slot (StatusCache x)) , events :: TChan x , inMessage :: TVar (Maybe (StatusCache x)) , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } -- | @new isStart isStop@ -- -- The @isStart@ and @isStop@ predicates indicate when an element -- begins or ends a message. new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) new isStart isStop = do cache <- newTVar Map.empty events <- newTChan inMessage <- newTVar Nothing return UpdateStream { cache = cache , events = events , inMessage = inMessage , isStarter = isStart , isStopper = isStop } -- | Enqueue a chunk into the 'UpdateStream' -- -- If a slot is provided, then the message may be obsoleted when a new message -- starts with the same slot value. Otherwise, the chunk is preserved. Note -- that the same slot value must be passed again for every chunk of the message -- as it is not remembered. -- -- Note that although a slot value is provided on each push, it is assumed that -- messages will be pushed in contiguous streams. To change this, we would -- need to add a mutable integer to track the nesting level of the primary -- (non-slotted) stream. push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () push u Nothing x = writeTChan (events u) x push u (Just n) x = do smap <- readTVar (cache u) scache <- case Map.lookup n smap of Just scache -> return scache Nothing -> do scache <- Status.new (isStarter u) (isStopper u) modifyTVar (cache u) (Map.insert n scache) return scache Status.push scache x -- | Pull off a chunk from the 'UpdateStream' for processing. -- -- If chunks are not pulled off quickly, they may be obsoleted -- and discarded when new messages are 'push'ed. pull :: Ord slot => UpdateStream slot x -> STM x pull u = do mbscache <- readTVar $ inMessage u let action = case mbscache of Just scache -> do x <- Status.pull scache nesting <- readTVar (Status.pullDepth scache) when (nesting==0) $ writeTVar (inMessage u) Nothing return x Nothing -> do cs <- fmap (Map.toList) $ readTVar (cache u) cs <- filterM (return . not <=< Status.isEmpty . snd) cs maybe (readTChan $ events u) (\(n,scache) -> do writeTVar (inMessage u) (Just scache) -- (Just n) Status.pull scache) (listToMaybe cs) x <- action return x -- | True when the 'UpdateStream' is completely exhuasted. isEmpty :: UpdateStream slot x -> STM Bool isEmpty q = do e <- isEmptyTChan (events q) qs <- readTVar (cache q) d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs return $ e && d