From 44cc78636b564d479d76be989fbb36fd1e720e10 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 11 Feb 2014 20:38:16 -0500 Subject: send-ping framework for xmppServer --- Presence/Server.hs | 4 ++-- xmppServer.hs | 45 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/Presence/Server.hs b/Presence/Server.hs index 4ee55821..1d586900 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs @@ -143,7 +143,7 @@ data InOrOut = In | Out data ConnectionEvent b = Got b -- ^ Arrival of data from a socket - | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) + | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) -- ^ A new connection was established | HalfConnection InOrOut -- ^ Half of a half-duplex connection is avaliable. @@ -314,7 +314,7 @@ killListener (thread,sock) = do sClose sock conevent con = Connection pingflag read write where - pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False + pingflag = swapTVar (pingFlag (connPingTimer con)) False read = connRead con write = connWrite con diff --git a/xmppServer.hs b/xmppServer.hs index 459d8f8d..48745e51 100644 --- a/xmppServer.hs +++ b/xmppServer.hs @@ -60,7 +60,7 @@ xmlStream conread conwrite = (xsrc,xsnk) snk = awaitForever $ liftIO . conwrite -type FlagCommand = IO Bool +type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool @@ -128,7 +128,7 @@ forkConnection :: ConnectionKey -> Source IO XML.Event -> Sink XML.Event IO () -> TChan Stanza - -> IO (Slotted.UpdateStream () XML.Event) + -> IO (TChan Stanza) forkConnection k pingflag src snk stanzas = do rdone <- atomically newEmptyTMVar forkIO $ do @@ -136,16 +136,42 @@ forkConnection k pingflag src snk stanzas = do src $$ xmppInbound k pingflag src stanzas atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k - output <- atomically $ Slotted.new isEventBeginElement isEventEndElement + slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement + let _ = slots :: Slotted.UpdateStream () XML.Event let slot_src = do what <- lift . atomically $ orElse - (Slotted.pull output >>= \x -> return $ do + (Slotted.pull slots >>= \x -> return $ do yield x slot_src) - (takeTMVar rdone >> return (return ())) + (readTMVar rdone >> return (return ())) what forkIO $ do slot_src $$ snk - wlog $ "end writer fork: " ++ show k + wlog $ "end post-queue fork: " ++ show k + output <- atomically newTChan + forkIO $ do + fix $ \loop -> do + what <- atomically $ foldr1 orElse + [readTChan output >>= \stanza -> return $ do + let xchan = stanzaChan stanza + fix $ \inner -> do + what <- atomically $ orElse + (readTChan xchan >>= \mxml -> return $ do + case mxml of + Just xml -> do + atomically $ Slotted.push slots Nothing xml + inner + Nothing -> return ()) + (readTMVar rdone >> return (return ())) + what + loop + ,do pingflag >>= check + return $ do + wlog $ "TODO: send ping" + loop + ,readTMVar rdone >> return (return ()) + ] + what + wlog $ "end pre-queue fork: " ++ show k return output monitor sv params = do @@ -168,7 +194,8 @@ monitor sv params = do _ -> return () , readTChan stanzas >>= \stanza -> return $ do xs <- chanContents (stanzaChan stanza) - prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs) + wlog "" + prettyPrint "STANZA: " (catMaybes xs) ] action loop @@ -195,8 +222,8 @@ main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) - { pingInterval = 0 - , timeout = 0 + { pingInterval = 5000 + , timeout = 5000 , duplex = False } client_params <- return $ connectionDefaults clientKey forkIO $ monitor sv peer_params -- cgit v1.2.3