From 2d6dae13be15b61778ed35b3501df94a8e9dd78f Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 10 Feb 2014 22:38:46 -0500 Subject: more xmppServer work --- Control/Concurrent/STM/StatusCache.hs | 142 ++++++++++++++++++++++++++++ Control/Concurrent/STM/UpdateStream.hs | 165 +++++++++++++++++++++++++++++++++ 2 files changed, 307 insertions(+) create mode 100644 Control/Concurrent/STM/StatusCache.hs create mode 100644 Control/Concurrent/STM/UpdateStream.hs (limited to 'Control/Concurrent') diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs new file mode 100644 index 00000000..601de14c --- /dev/null +++ b/Control/Concurrent/STM/StatusCache.hs @@ -0,0 +1,142 @@ +--------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.STM.StatusCache +-- +-- +-- Maintainer : joe@jerkface.net +-- Stability : experimental +-- +-- Motivation: Suppose you must communicate state changes over a stream. If +-- the stream is consumed slowly, then it may occur that backlogged state +-- changes are obsoleted by newer changes in state. In this case, it is +-- desirable to discard the obsolete messages from the stream. +-- +-- A streamed status message might be very large, and it would be wasteful in +-- both time and space, to treat it as a monolithic blob that must be built +-- completely before it can be sent. Therefore, we require that each message +-- consist of a stream of smaller chunks of type @x@ and we require predicates +-- that indicate when a chunk starts a new message and when it ends a message. +-- +-- In the folowing example, our chunk type is Char and complete messages are +-- delimited by the characters '(' and ')'. We process the input stream +-- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an +-- efficient dedicated thread. The result follows: +-- +-- > Backlogged consumer: (ccccc) +-- > Fast consumer: (aaaa)(bb)(ccccc) +-- +-- The complete source code: +-- +-- > import Control.Monad (when, forever, (>=>)) +-- > import Control.Monad.STM (atomically) +-- > import Control.Concurrent (forkIO, threadDelay) +-- > import System.IO (hFlush, stdout) +-- > import qualified Control.Concurrent.STM.StatusCache as Cache +-- > +-- > while pred body = +-- > pred >>= flip when (body >> while pred body) +-- > +-- > main = do +-- > q <- atomically $ Cache.new (== '(') (==')') +-- > +-- > putStr $ "Backlogged consumer: " +-- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)" +-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > putStrLn "" +-- > hFlush stdout +-- > +-- > putStr "Fast consumer: " +-- > forkIO $ forever $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > hFlush stdout +-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) +-- > "(aaaa)(bb)(ccccc)" +-- > putStrLn "" +-- +-- As shown above, it is intended that this module be imported qualified. +-- +module Control.Concurrent.STM.StatusCache + ( StatusCache + , new + , push + , pull + , isStopper + , isStarter + , isEmpty + ) where +import Control.Monad +import Control.Monad.STM +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM.TChan + +data StatusCache x = + StatusCache { feed :: TVar (TChan x) + , cache :: TVar (TChan x) + , feedFlag :: TVar Bool + , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. + , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. + } + +-- | @new isStart isStop@ +-- +-- The @isStart@ and @isStop@ predicates indicate when an element +-- begins or ends a message. +new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) +new isStart isStop = do + feed <- newTChan >>= newTVar + cache <- newTChan >>= newTVar + flag <- newTVar True + return StatusCache { feed = feed + , cache = cache + , feedFlag = flag + , isStarter = isStart + , isStopper = isStop + } + + +-- | Pull off a chunk from the 'StatusCache' for processing. +-- +-- If chunks are not pulled off quickly, they may be obsoleted +-- and discarded when new messages are 'push'ed. +pull :: StatusCache x -> STM x +pull q = do + hasCache <- readTVar (feedFlag q) + exhausted <- readTVar (feed q) >>= isEmptyTChan + when (hasCache && exhausted) $ do + next <- newTChan >>= swapTVar (cache q) + writeTVar (feedFlag q) False + writeTVar (feed q) next + chan <- readTVar $ feed q + exhausted <- isEmptyTChan chan + if exhausted then retry + else do + v <- readTChan chan + when (isStarter q v) + $ writeTVar (feedFlag q) False + return v + +-- | Enqueue a chunk into the 'StatusCache'. +push :: StatusCache a -> a -> STM () +push q v = do + shouldCache <- readTVar (feedFlag q) + chan <- + if shouldCache then do + when (isStarter q v) + $ newTChan + >>= writeTVar (cache q) + readTVar $ cache q + else do + when (isStopper q v) + $ writeTVar (feedFlag q) True + readTVar $ feed q + writeTChan chan v + +-- | True when the 'StatusCache' is completely exhuasted. +isEmpty :: StatusCache x -> STM Bool +isEmpty q = do + empty_feed <- readTVar (feed q) >>= isEmptyTChan + empty_cache <- readTVar (cache q) >>= isEmptyTChan + return $ empty_feed && empty_cache diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs new file mode 100644 index 00000000..a92168c0 --- /dev/null +++ b/Control/Concurrent/STM/UpdateStream.hs @@ -0,0 +1,165 @@ +--------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.STM.UpdateStream +-- +-- +-- Maintainer : joe@jerkface.net +-- Stability : experimental +-- +-- An UpdateSream consists of pass-through messages that are queued up and +-- slotted messages which might be obsoleted and discarded if they are not +-- consumed before newer slotted messages with the same slot value occur. +-- +-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is +-- recommended that you read that documentation first. +-- +-- For example, the output +-- +-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) +-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) +-- +-- is produced by the following code: +-- +-- > import Control.Monad (when, forever, (>=>)) +-- > import Control.Monad.STM (atomically) +-- > import Control.Concurrent (forkIO, threadDelay) +-- > import System.IO (hFlush, stdout) +-- > import qualified Control.Concurrent.STM.UpdateStream as Cache +-- > +-- > messages :: [(Maybe String, Char)] +-- > messages = concat +-- > [ slot "x" "(set x = 2)" +-- > , message "(Hello)" +-- > , slot "y" "(set y = 23)" +-- > , message "(Joe)" +-- > , slot "y" "(set y = 100)" +-- > , message "(was)" +-- > , slot "x" "(set x = 5)" +-- > , message "(here)" +-- > ] +-- > where +-- > slot v cs = map ((,) (Just v)) cs +-- > message cs = map ((,) Nothing) cs +-- > +-- > main = do +-- > q <- atomically $ Cache.new (== '(') (==')') +-- > let go = mapM_ (atomically . (uncurry $ Cache.push q) +-- > >=> const (threadDelay 10000)) +-- > messages +-- > slowly = do +-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > putStrLn "" +-- > hFlush stdout +-- > where while pred body = +-- > pred >>= flip when (body >> while pred body) +-- > quickly = forkIO . forever $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > hFlush stdout +-- > putStr $ "Backlogged consumer: " +-- > go >> slowly +-- > putStr "Fast consumer: " +-- > quickly >> go +-- > putStrLn "" +-- +module Control.Concurrent.STM.UpdateStream + ( UpdateStream + , new + , push + , pull + , isStopper + , isStarter + , isEmpty + ) where + +import Control.Monad +import Control.Monad.STM +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.StatusCache (StatusCache) +import qualified Control.Concurrent.STM.StatusCache as Status +import Data.Map (Map) +import qualified Data.Map as Map +import Data.Maybe +import Data.Foldable (foldlM) + +data UpdateStream slot x = + UpdateStream { cache :: TVar (Map slot (StatusCache x)) + , events :: TChan x + , inMessage :: TVar (Maybe slot) + , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. + , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. + } + +-- | @new isStart isStop@ +-- +-- The @isStart@ and @isStop@ predicates indicate when an element +-- begins or ends a message. +new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) +new isStart isStop = do + cache <- newTVar Map.empty + events <- newTChan + inMessage <- newTVar Nothing + return UpdateStream { cache = cache + , events = events + , inMessage = inMessage + , isStarter = isStart + , isStopper = isStop + } + +-- | Enqueue a chunk into the 'UpdateStream' +-- +-- If a slot is provided, then the message may be obsoleted when a new message +-- starts with the same slot value. Otherwise, the chunk is preserved. Note +-- that the same slot value must be passed again for every chunk of the message +-- as it is not remembered. +push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () +push u Nothing x = writeTChan (events u) x +push u (Just n) x = do + smap <- readTVar (cache u) + scache <- + case Map.lookup n smap of + Just scache -> return scache + Nothing -> do + scache <- Status.new (isStarter u) (isStopper u) + modifyTVar (cache u) (Map.insert n scache) + return scache + + Status.push scache x + +-- | Pull off a chunk from the 'UpdateStream' for processing. +-- +-- If chunks are not pulled off quickly, they may be obsoleted +-- and discarded when new messages are 'push'ed. +pull :: Ord slot => UpdateStream slot x -> STM x +pull u = do + (inm, mbscache) <- do + mn <- readTVar $ inMessage u + map <- readTVar (cache u) + return $ (isJust mn, mn >>= flip Map.lookup map) + let action = + case (inm,mbscache) of + (True,Just scache) -> Status.pull scache + (True,Nothing) -> readTChan (events u) + (False,_) -> do + cs <- fmap (Map.toList) $ readTVar (cache u) + cs <- filterM (return . not <=< Status.isEmpty . snd) + cs + maybe (readTChan $ events u) + (\(n,scache) -> do + writeTVar (inMessage u) (Just n) + Status.pull scache) + (listToMaybe cs) + x <- action + when (isStopper u x) $ writeTVar (inMessage u) Nothing + return x + +-- | True when the 'UpdateStream' is completely exhuasted. +isEmpty :: UpdateStream slot x -> STM Bool +isEmpty q = do + e <- isEmptyTChan (events q) + qs <- readTVar (cache q) + d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs + return $ e && d -- cgit v1.2.3