--------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.STM.UpdateStream -- -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- An UpdateSream consists of pass-through messages that are queued up and -- slotted messages which might be obsoleted and discarded if they are not -- consumed before newer slotted messages with the same slot value occur. -- -- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is -- recommended that you read that documentation first. -- -- For example, the output -- -- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) -- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) -- -- is produced by the following code: -- -- > import Control.Monad (when, forever, (>=>)) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent (forkIO, threadDelay) -- > import System.IO (hFlush, stdout) -- > import qualified Control.Concurrent.STM.UpdateStream as Cache -- > -- > messages :: [(Maybe String, Char)] -- > messages = concat -- > [ slot "x" "(set x = 2)" -- > , message "(Hello)" -- > , slot "y" "(set y = 23)" -- > , message "(Joe)" -- > , slot "y" "(set y = 100)" -- > , message "(was)" -- > , slot "x" "(set x = 5)" -- > , message "(here)" -- > ] -- > where -- > slot v cs = map ((,) (Just v)) cs -- > message cs = map ((,) Nothing) cs -- > -- > main = do -- > q <- atomically $ Cache.new (== '(') (==')') -- > let go = mapM_ (atomically . (uncurry $ Cache.push q) -- > >=> const (threadDelay 10000)) -- > messages -- > slowly = do -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > putStrLn "" -- > hFlush stdout -- > where while pred body = -- > pred >>= flip when (body >> while pred body) -- > quickly = forkIO . forever $ do -- > c <- atomically $ Cache.pull q -- > putChar c -- > hFlush stdout -- > putStr $ "Backlogged consumer: " -- > go >> slowly -- > putStr "Fast consumer: " -- > quickly >> go -- > putStrLn "" -- module Control.Concurrent.STM.UpdateStream ( UpdateStream , new , push , pull , isStopper , isStarter , isEmpty ) where import Control.Monad import Control.Monad.STM import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TChan import Control.Concurrent.STM.StatusCache (StatusCache) import qualified Control.Concurrent.STM.StatusCache as Status import Data.Map (Map) import qualified Data.Map as Map import Data.Maybe import Data.Foldable (foldlM) data UpdateStream slot x = UpdateStream { cache :: TVar (Map slot (StatusCache x)) , events :: TChan x , inMessage :: TVar (Maybe slot) , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. } -- | @new isStart isStop@ -- -- The @isStart@ and @isStop@ predicates indicate when an element -- begins or ends a message. new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) new isStart isStop = do cache <- newTVar Map.empty events <- newTChan inMessage <- newTVar Nothing return UpdateStream { cache = cache , events = events , inMessage = inMessage , isStarter = isStart , isStopper = isStop } -- | Enqueue a chunk into the 'UpdateStream' -- -- If a slot is provided, then the message may be obsoleted when a new message -- starts with the same slot value. Otherwise, the chunk is preserved. Note -- that the same slot value must be passed again for every chunk of the message -- as it is not remembered. push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () push u Nothing x = writeTChan (events u) x push u (Just n) x = do smap <- readTVar (cache u) scache <- case Map.lookup n smap of Just scache -> return scache Nothing -> do scache <- Status.new (isStarter u) (isStopper u) modifyTVar (cache u) (Map.insert n scache) return scache Status.push scache x -- | Pull off a chunk from the 'UpdateStream' for processing. -- -- If chunks are not pulled off quickly, they may be obsoleted -- and discarded when new messages are 'push'ed. pull :: Ord slot => UpdateStream slot x -> STM x pull u = do (inm, mbscache) <- do mn <- readTVar $ inMessage u map <- readTVar (cache u) return $ (isJust mn, mn >>= flip Map.lookup map) let action = case (inm,mbscache) of (True,Just scache) -> Status.pull scache (True,Nothing) -> readTChan (events u) (False,_) -> do cs <- fmap (Map.toList) $ readTVar (cache u) cs <- filterM (return . not <=< Status.isEmpty . snd) cs maybe (readTChan $ events u) (\(n,scache) -> do writeTVar (inMessage u) (Just n) Status.pull scache) (listToMaybe cs) x <- action when (isStopper u x) $ writeTVar (inMessage u) Nothing return x -- | True when the 'UpdateStream' is completely exhuasted. isEmpty :: UpdateStream slot x -> STM Bool isEmpty q = do e <- isEmptyTChan (events q) qs <- readTVar (cache q) d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs return $ e && d