From 2d6dae13be15b61778ed35b3501df94a8e9dd78f Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 10 Feb 2014 22:38:46 -0500 Subject: more xmppServer work --- Control/Concurrent/STM/StatusCache.hs | 142 ++++++++++++++++++++++++++++ Control/Concurrent/STM/UpdateStream.hs | 165 +++++++++++++++++++++++++++++++++ Presence/NestingXML.hs | 10 ++ xmppServer.hs | 160 ++++++++++++++++++++++++++++---- 4 files changed, 457 insertions(+), 20 deletions(-) create mode 100644 Control/Concurrent/STM/StatusCache.hs create mode 100644 Control/Concurrent/STM/UpdateStream.hs diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs new file mode 100644 index 00000000..601de14c --- /dev/null +++ b/Control/Concurrent/STM/StatusCache.hs @@ -0,0 +1,142 @@ +--------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.STM.StatusCache +-- +-- +-- Maintainer : joe@jerkface.net +-- Stability : experimental +-- +-- Motivation: Suppose you must communicate state changes over a stream. If +-- the stream is consumed slowly, then it may occur that backlogged state +-- changes are obsoleted by newer changes in state. In this case, it is +-- desirable to discard the obsolete messages from the stream. +-- +-- A streamed status message might be very large, and it would be wasteful in +-- both time and space, to treat it as a monolithic blob that must be built +-- completely before it can be sent. Therefore, we require that each message +-- consist of a stream of smaller chunks of type @x@ and we require predicates +-- that indicate when a chunk starts a new message and when it ends a message. +-- +-- In the folowing example, our chunk type is Char and complete messages are +-- delimited by the characters '(' and ')'. We process the input stream +-- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an +-- efficient dedicated thread. The result follows: +-- +-- > Backlogged consumer: (ccccc) +-- > Fast consumer: (aaaa)(bb)(ccccc) +-- +-- The complete source code: +-- +-- > import Control.Monad (when, forever, (>=>)) +-- > import Control.Monad.STM (atomically) +-- > import Control.Concurrent (forkIO, threadDelay) +-- > import System.IO (hFlush, stdout) +-- > import qualified Control.Concurrent.STM.StatusCache as Cache +-- > +-- > while pred body = +-- > pred >>= flip when (body >> while pred body) +-- > +-- > main = do +-- > q <- atomically $ Cache.new (== '(') (==')') +-- > +-- > putStr $ "Backlogged consumer: " +-- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)" +-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > putStrLn "" +-- > hFlush stdout +-- > +-- > putStr "Fast consumer: " +-- > forkIO $ forever $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > hFlush stdout +-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) +-- > "(aaaa)(bb)(ccccc)" +-- > putStrLn "" +-- +-- As shown above, it is intended that this module be imported qualified. +-- +module Control.Concurrent.STM.StatusCache + ( StatusCache + , new + , push + , pull + , isStopper + , isStarter + , isEmpty + ) where +import Control.Monad +import Control.Monad.STM +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM.TChan + +data StatusCache x = + StatusCache { feed :: TVar (TChan x) + , cache :: TVar (TChan x) + , feedFlag :: TVar Bool + , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. + , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. + } + +-- | @new isStart isStop@ +-- +-- The @isStart@ and @isStop@ predicates indicate when an element +-- begins or ends a message. +new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) +new isStart isStop = do + feed <- newTChan >>= newTVar + cache <- newTChan >>= newTVar + flag <- newTVar True + return StatusCache { feed = feed + , cache = cache + , feedFlag = flag + , isStarter = isStart + , isStopper = isStop + } + + +-- | Pull off a chunk from the 'StatusCache' for processing. +-- +-- If chunks are not pulled off quickly, they may be obsoleted +-- and discarded when new messages are 'push'ed. +pull :: StatusCache x -> STM x +pull q = do + hasCache <- readTVar (feedFlag q) + exhausted <- readTVar (feed q) >>= isEmptyTChan + when (hasCache && exhausted) $ do + next <- newTChan >>= swapTVar (cache q) + writeTVar (feedFlag q) False + writeTVar (feed q) next + chan <- readTVar $ feed q + exhausted <- isEmptyTChan chan + if exhausted then retry + else do + v <- readTChan chan + when (isStarter q v) + $ writeTVar (feedFlag q) False + return v + +-- | Enqueue a chunk into the 'StatusCache'. +push :: StatusCache a -> a -> STM () +push q v = do + shouldCache <- readTVar (feedFlag q) + chan <- + if shouldCache then do + when (isStarter q v) + $ newTChan + >>= writeTVar (cache q) + readTVar $ cache q + else do + when (isStopper q v) + $ writeTVar (feedFlag q) True + readTVar $ feed q + writeTChan chan v + +-- | True when the 'StatusCache' is completely exhuasted. +isEmpty :: StatusCache x -> STM Bool +isEmpty q = do + empty_feed <- readTVar (feed q) >>= isEmptyTChan + empty_cache <- readTVar (cache q) >>= isEmptyTChan + return $ empty_feed && empty_cache diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs new file mode 100644 index 00000000..a92168c0 --- /dev/null +++ b/Control/Concurrent/STM/UpdateStream.hs @@ -0,0 +1,165 @@ +--------------------------------------------------------------------------- +-- | +-- Module : Control.Concurrent.STM.UpdateStream +-- +-- +-- Maintainer : joe@jerkface.net +-- Stability : experimental +-- +-- An UpdateSream consists of pass-through messages that are queued up and +-- slotted messages which might be obsoleted and discarded if they are not +-- consumed before newer slotted messages with the same slot value occur. +-- +-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is +-- recommended that you read that documentation first. +-- +-- For example, the output +-- +-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) +-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) +-- +-- is produced by the following code: +-- +-- > import Control.Monad (when, forever, (>=>)) +-- > import Control.Monad.STM (atomically) +-- > import Control.Concurrent (forkIO, threadDelay) +-- > import System.IO (hFlush, stdout) +-- > import qualified Control.Concurrent.STM.UpdateStream as Cache +-- > +-- > messages :: [(Maybe String, Char)] +-- > messages = concat +-- > [ slot "x" "(set x = 2)" +-- > , message "(Hello)" +-- > , slot "y" "(set y = 23)" +-- > , message "(Joe)" +-- > , slot "y" "(set y = 100)" +-- > , message "(was)" +-- > , slot "x" "(set x = 5)" +-- > , message "(here)" +-- > ] +-- > where +-- > slot v cs = map ((,) (Just v)) cs +-- > message cs = map ((,) Nothing) cs +-- > +-- > main = do +-- > q <- atomically $ Cache.new (== '(') (==')') +-- > let go = mapM_ (atomically . (uncurry $ Cache.push q) +-- > >=> const (threadDelay 10000)) +-- > messages +-- > slowly = do +-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > putStrLn "" +-- > hFlush stdout +-- > where while pred body = +-- > pred >>= flip when (body >> while pred body) +-- > quickly = forkIO . forever $ do +-- > c <- atomically $ Cache.pull q +-- > putChar c +-- > hFlush stdout +-- > putStr $ "Backlogged consumer: " +-- > go >> slowly +-- > putStr "Fast consumer: " +-- > quickly >> go +-- > putStrLn "" +-- +module Control.Concurrent.STM.UpdateStream + ( UpdateStream + , new + , push + , pull + , isStopper + , isStarter + , isEmpty + ) where + +import Control.Monad +import Control.Monad.STM +import Control.Concurrent.STM.TVar +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.StatusCache (StatusCache) +import qualified Control.Concurrent.STM.StatusCache as Status +import Data.Map (Map) +import qualified Data.Map as Map +import Data.Maybe +import Data.Foldable (foldlM) + +data UpdateStream slot x = + UpdateStream { cache :: TVar (Map slot (StatusCache x)) + , events :: TChan x + , inMessage :: TVar (Maybe slot) + , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. + , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. + } + +-- | @new isStart isStop@ +-- +-- The @isStart@ and @isStop@ predicates indicate when an element +-- begins or ends a message. +new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) +new isStart isStop = do + cache <- newTVar Map.empty + events <- newTChan + inMessage <- newTVar Nothing + return UpdateStream { cache = cache + , events = events + , inMessage = inMessage + , isStarter = isStart + , isStopper = isStop + } + +-- | Enqueue a chunk into the 'UpdateStream' +-- +-- If a slot is provided, then the message may be obsoleted when a new message +-- starts with the same slot value. Otherwise, the chunk is preserved. Note +-- that the same slot value must be passed again for every chunk of the message +-- as it is not remembered. +push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () +push u Nothing x = writeTChan (events u) x +push u (Just n) x = do + smap <- readTVar (cache u) + scache <- + case Map.lookup n smap of + Just scache -> return scache + Nothing -> do + scache <- Status.new (isStarter u) (isStopper u) + modifyTVar (cache u) (Map.insert n scache) + return scache + + Status.push scache x + +-- | Pull off a chunk from the 'UpdateStream' for processing. +-- +-- If chunks are not pulled off quickly, they may be obsoleted +-- and discarded when new messages are 'push'ed. +pull :: Ord slot => UpdateStream slot x -> STM x +pull u = do + (inm, mbscache) <- do + mn <- readTVar $ inMessage u + map <- readTVar (cache u) + return $ (isJust mn, mn >>= flip Map.lookup map) + let action = + case (inm,mbscache) of + (True,Just scache) -> Status.pull scache + (True,Nothing) -> readTChan (events u) + (False,_) -> do + cs <- fmap (Map.toList) $ readTVar (cache u) + cs <- filterM (return . not <=< Status.isEmpty . snd) + cs + maybe (readTChan $ events u) + (\(n,scache) -> do + writeTVar (inMessage u) (Just n) + Status.pull scache) + (listToMaybe cs) + x <- action + when (isStopper u x) $ writeTVar (inMessage u) Nothing + return x + +-- | True when the 'UpdateStream' is completely exhuasted. +isEmpty :: UpdateStream slot x -> STM Bool +isEmpty q = do + e <- isEmptyTChan (events q) + qs <- readTVar (cache q) + d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs + return $ e && d diff --git a/Presence/NestingXML.hs b/Presence/NestingXML.hs index bf12c9ae..c26e3d5c 100644 --- a/Presence/NestingXML.hs +++ b/Presence/NestingXML.hs @@ -79,6 +79,16 @@ awaitCloser lvl = do withXML $ \xml -> do loop +doUntilCloser :: Monad m + => Int -> (Event -> NestingXML o m ()) -> NestingXML o m () +doUntilCloser lvl thunk = do + fix $ \loop -> do + lvl' <- nesting + when (lvl' >= lvl) $ do + withXML $ \xml -> do + thunk xml + loop + nextElement :: Monad m => NestingXML o m (Maybe Event) nextElement = do lvl <- nesting diff --git a/xmppServer.hs b/xmppServer.hs index c720ea5f..459d8f8d 100644 --- a/xmppServer.hs +++ b/xmppServer.hs @@ -1,6 +1,9 @@ +{-# LANGUAGE OverloadedStrings #-} import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) +import Control.Monad.IO.Class (liftIO) import Control.Monad.Fix (fix) +import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan @@ -8,18 +11,42 @@ import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals +import Data.ByteString (ByteString) +import qualified Data.ByteString.Char8 as Strict8 +-- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Conduit +import qualified Data.Conduit.List as CL +import qualified Data.Conduit.Binary as CB import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML +import Data.XML.Types as XML +import Data.Maybe (catMaybes) +import Data.Monoid ( (<>) ) +import qualified Control.Concurrent.STM.UpdateStream as Slotted +import ControlMaybe +import NestingXML import Server + wlog s = putStrLn s + where _ = s :: String +wlogb s = Strict8.putStrLn s control sv = atomically . putTMVar (serverCommand sv) +-- Note: This function ignores name space qualification +elementAttrs expected (EventBeginElement name attrs) + | nameLocalName name==expected + = return attrs +elementAttrs _ _ = mzero + +getStreamName (EventBeginElement name _) = name + +xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event + , Sink XML.Event IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def @@ -30,34 +57,125 @@ xmlStream conread conwrite = (xsrc,xsnk) maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v - snk = awaitForever $ lift . conwrite + snk = awaitForever $ liftIO . conwrite -forkConnection k pingflag src snk = do +type FlagCommand = IO Bool +type ReadCommand = IO (Maybe ByteString) +type WriteCommand = ByteString -> IO Bool + +data Stanza + = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } + +prettyPrint prefix xs = + liftIO $ + CL.sourceList xs + $= XML.renderBytes (XML.def { XML.rsPretty=True }) + =$= CB.lines + $$ CL.mapM_ (wlogb . (prefix <>)) + +xmppInbound :: ConnectionKey -> FlagCommand + -> Source IO XML.Event + -> TChan Stanza + -> Sink XML.Event IO () +xmppInbound k pingflag src stanzas = doNestingXML $ do + withXML $ \begindoc -> do + when (begindoc==EventBeginDocument) $ do + -- liftIO . wlog $ "begin-doc" + withXML $ \xml -> do + withJust (elementAttrs "stream" xml) $ \stream_attrs -> do + -- liftIO . wlog $ "stream: " ++ show (getStreamName xml) + -- liftIO . wlog $ "stream atributes: " ++ show stream_attrs + fix $ \loop -> do + -- liftIO . wlog $ "waiting for stanza." + chan <- liftIO $ atomically newTChan + whenJust nextElement $ \stanza -> do + stanza_lvl <- nesting + liftIO . atomically $ writeTChan chan (Just stanza) + -- liftIO . wlog $ "stanza: "++show stanza + + liftIO . atomically $ writeTChan stanzas $ + UnrecognizedStanza chan + doUntilCloser stanza_lvl $ \xml -> do + liftIO . atomically $ writeTChan chan (Just xml) + -- liftIO . wlog $ "-stanza: " ++ show xml + liftIO . atomically $ writeTChan chan Nothing + loop + + +chanContents :: TChan x -> IO [x] +chanContents ch = do + x <- atomically $ do + bempty <- isEmptyTChan ch + if bempty + then return Nothing + else fmap Just $ readTChan ch + maybe (return []) + (\x -> do + xs <- chanContents ch + return (x:xs)) + x + + +isEventBeginElement (EventBeginElement {}) = True +isEventBeginElement _ = False + +isEventEndElement (EventEndElement {}) = True +isEventEndElement _ = False + +forkConnection :: ConnectionKey + -> FlagCommand + -> Source IO XML.Event + -> Sink XML.Event IO () + -> TChan Stanza + -> IO (Slotted.UpdateStream () XML.Event) +forkConnection k pingflag src snk stanzas = do + rdone <- atomically newEmptyTMVar forkIO $ do - src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) - wlog $ "end fork: " ++ show k - return () + -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) + src $$ xmppInbound k pingflag src stanzas + atomically $ putTMVar rdone () + wlog $ "end reader fork: " ++ show k + output <- atomically $ Slotted.new isEventBeginElement isEventEndElement + let slot_src = do + what <- lift . atomically $ orElse + (Slotted.pull output >>= \x -> return $ do + yield x + slot_src) + (takeTMVar rdone >> return (return ())) + what + forkIO $ do slot_src $$ snk + wlog $ "end writer fork: " ++ show k + return output monitor sv params = do chan <- return $ serverEvent sv + stanzas <- atomically newTChan fix $ \loop -> do - (k,e) <- atomically $ readTChan chan - case e of - Connection pingflag conread conwrite -> do - let (xsrc,xsnk) = xmlStream conread conwrite - forkConnection k pingflag xsrc xsnk - wlog $ tomsg k "Connection" - EOF -> wlog $ tomsg k "EOF" - HalfConnection In -> do - wlog $ tomsg k "ReadOnly" - control sv (Connect (callBackAddress k) params) - HalfConnection Out -> wlog $ tomsg k "WriteOnly" - RequiresPing -> wlog $ tomsg k "RequiresPing" - _ -> return () + action <- atomically $ foldr1 orElse + [ readTChan chan >>= \(k,e) -> return $ do + case e of + Connection pingflag conread conwrite -> do + let (xsrc,xsnk) = xmlStream conread conwrite + forkConnection k pingflag xsrc xsnk stanzas + wlog $ tomsg k "Connection" + EOF -> wlog $ tomsg k "EOF" + HalfConnection In -> do + wlog $ tomsg k "ReadOnly" + control sv (Connect (callBackAddress k) params) + HalfConnection Out -> wlog $ tomsg k "WriteOnly" + RequiresPing -> wlog $ tomsg k "RequiresPing" + _ -> return () + , readTChan stanzas >>= \stanza -> return $ do + xs <- chanContents (stanzaChan stanza) + prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs) + ] + action loop where tomsg k str = printf "%12s %s" str (show k) + where + _ = str :: String data ConnectionKey = PeerKey { callBackAddress :: SockAddr } @@ -77,11 +195,13 @@ main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) - { pingInterval = 2000, duplex = False } + { pingInterval = 0 + , timeout = 0 + , duplex = False } client_params <- return $ connectionDefaults clientKey forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) - control sv (Listen clientport client_params) + -- control sv (Listen clientport client_params) -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c quitVar <- newEmptyTMVarIO -- cgit v1.2.3