From 6cb062187e94a322863363acc34e58d9d4070de1 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 17 Nov 2017 09:21:18 -0500 Subject: Factored slotsToSource out of forkConnection --- Presence/XMPPServer.hs | 71 +++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 32 deletions(-) (limited to 'Presence/XMPPServer.hs') diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index b63f04c3..3505a0a2 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs @@ -1181,6 +1181,44 @@ presenceStanza stanza_type type_attr me jid = ] , EventEndElement "{jabber:server}presence" ] +slotsToSource :: + Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event) + -> TVar Int + -> TVar (Maybe (StanzaWrap XML.Event)) + -> TVar Bool + -> TMVar () + -> Source IO (Flush XML.Event) +slotsToSource slots nesting lastStanza needsFlush rdone = + fix $ \slot_src -> join . lift . atomically $ foldr1 orElse + [Slotted.pull slots >>= \x -> do + x <- case x of + Left wrapped -> do + writeTVar nesting 1 + writeTVar lastStanza (Just wrapped) + return $ stanzaChan wrapped + Right x -> do + when (isEventBeginElement x) + $ modifyTVar' nesting (+1) + when (isEventEndElement x) $ do + n <- readTVar nesting + when (n==1) $ writeTVar lastStanza Nothing + modifyTVar' nesting (subtract 1) + return x + writeTVar needsFlush True + return $ do + -- liftIO $ wlog $ "yielding Chunk: " ++ show x + yield (Chunk x) + slot_src + ,do Slotted.isEmpty slots >>= check + readTVar needsFlush >>= check + writeTVar needsFlush False + return $ do + -- liftIO $ wlog "yielding Flush" + yield Flush + slot_src + ,readTMVar rdone >> return (return ()) + ] + forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event -> XMPPServerParameters -> ConnectionKey @@ -1210,37 +1248,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do let greet_src = do CL.sourceList (greet' namespace me) =$= CL.map Chunk yield Flush - slot_src = do - what <- lift . atomically $ foldr1 orElse - [Slotted.pull slots >>= \x -> do - x <- case x of - Left wrapped -> do - writeTVar nesting 1 - writeTVar lastStanza (Just wrapped) - return $ stanzaChan wrapped - Right x -> do - when (isEventBeginElement x) - $ modifyTVar' nesting (+1) - when (isEventEndElement x) $ do - n <- readTVar nesting - when (n==1) $ writeTVar lastStanza Nothing - modifyTVar' nesting (subtract 1) - return x - writeTVar needsFlush True - return $ do - -- liftIO $ wlog $ "yielding Chunk: " ++ show x - yield (Chunk x) - slot_src - ,do Slotted.isEmpty slots >>= check - readTVar needsFlush >>= check - writeTVar needsFlush False - return $ do - -- liftIO $ wlog "yielding Flush" - yield Flush - slot_src - ,readTMVar rdone >> return (return ()) - ] - what + slot_src = slotsToSource slots nesting lastStanza needsFlush rdone forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k) (greet_src >> slot_src) $$ snk last <- atomically $ readTVar lastStanza @@ -1641,7 +1649,6 @@ monitor sv params xmpp = do control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" - _ -> return () , readTChan stanzas >>= \stanza -> return $ do {- dup <- case stanzaType stanza of -- cgit v1.2.3