From afff51ac877ce8807801334745f1679dbf6440d0 Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 10 Feb 2014 02:52:23 -0500 Subject: Fixed ping bug, forked connection thread in xmppServer --- Presence/Server.hs | 113 ++++++++++++++++++++++++++++++++++++----------------- xmppServer.hs | 39 ++++++++++++++++-- 2 files changed, 113 insertions(+), 39 deletions(-) diff --git a/Presence/Server.hs b/Presence/Server.hs index 530b6669..4ee55821 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs @@ -48,6 +48,7 @@ import System.IO , hClose , hIsEOF , stderr + , stdout , Handle , hFlush ) @@ -142,7 +143,7 @@ data InOrOut = In | Out data ConnectionEvent b = Got b -- ^ Arrival of data from a socket - | Connection + | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) -- ^ A new connection was established | HalfConnection InOrOut -- ^ Half of a half-duplex connection is avaliable. @@ -151,8 +152,10 @@ data ConnectionEvent b | RequiresPing -- ^ 'pingInterval' miliseconds of idle was experienced +{- deriving instance Show b => Show (ConnectionEvent b) deriving instance Eq b => Eq (ConnectionEvent b) +-} -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. @@ -285,9 +288,10 @@ hWriteUntilNothing h outs = Nothing -> do warn $ "wrote Nothing" hClose h +-} connRead :: ConnectionState -> IO (Maybe ByteString) connRead (WriteOnlyConnection w) = do - atomically $ discardContents (threadsChannel w) + -- atomically $ discardContents (threadsChannel w) return Nothing connRead conn = do c <- atomically $ getThreads @@ -297,9 +301,8 @@ connRead conn = do case conn of SaneConnection c -> return c ReadOnlyConnection c -> return c ConnectionPair c w -> do - discardContents (threadsChannel w) + -- discardContents (threadsChannel w) return c --} socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 @@ -309,6 +312,12 @@ killListener (thread,sock) = do sClose sock -- killThread thread +conevent con = Connection pingflag read write + where + pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False + read = connRead con + write = connWrite con + newConnection server params conkey h inout = do hSetBuffering h NoBuffering let (forward,idle_ms,timeout_ms) = @@ -335,7 +344,8 @@ newConnection server params conkey h inout = do Nothing -> do (newCon,e) <- return $ if duplex params - then ( SaneConnection new, (conkey, Connection) ) + then let newcon = SaneConnection new + in ( newcon, (conkey, conevent newcon) ) else ( case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new @@ -351,6 +361,8 @@ newConnection server params conkey h inout = do kont <- updateConMap conkey new what putTMVar started () return kont + {- + -- enable this for 'Got' events forkIO $ do -- inout==In || duplex params then forkIO $ do -- warn $ "waiting read thread: " <> bshow (conkey,inout) atomically $ takeTMVar started @@ -363,6 +375,7 @@ newConnection server params conkey h inout = do maybe (return ()) (atomically . forward >=> const loop) mb + -} return () where @@ -389,7 +402,7 @@ newConnection server params conkey h inout = do sendPing PingTimeOut = do atomically (connClose newCon) eof sendPing PingIdle = do - atomically . announce $ (conkey,RequiresPing) + atomically $ announce (conkey,RequiresPing) handleEOF conkey mvar newCon @@ -398,16 +411,19 @@ newConnection server params conkey h inout = do if duplex params then do announce (conkey,EOF) connClose replaced - announce $ (conkey,Connection) - return $ SaneConnection new + let newcon = SaneConnection new + announce $ (conkey,conevent newcon) + return $ newcon else case replaced of WriteOnlyConnection w | inout==In -> - do announce (conkey,Connection) - return $ ConnectionPair new w + do let newcon = ConnectionPair new w + announce (conkey,conevent newcon) + return newcon ReadOnlyConnection r | inout==Out -> - do announce (conkey,Connection) - return $ ConnectionPair r new + do let newcon = ConnectionPair r new + announce (conkey,conevent newcon) + return newcon _ -> do -- connFlush todo announce (conkey, EOF) connClose replaced @@ -460,23 +476,16 @@ connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads connectionThreads h pinglogic = do (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar - writerThread <- forkIO . fix $ \loop -> do - let finished = do -- warn $ "finished write" - hClose h -- quit reader - atomically $ putTMVar donew () - mb <- atomically $ readTMVar outs - case mb of Just bs -> handle (\(SomeException e)->finished) - (do S.hPutStr h bs - atomically $ takeTMVar outs - loop) - Nothing -> finished (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan readerThread <- forkIO $ do let finished e = do + hClose h -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) let _ = fmap ioeGetErrorType e -- type hint - atomically $ do putTMVar outs Nothing -- quit writer + -- let _ = fmap what e where what (SomeException _) = undefined + atomically $ do tryTakeTMVar outs + putTMVar outs Nothing -- quit writer putTMVar doner () handle (finished . Just) $ do pingBump pinglogic -- start the ping timer @@ -487,6 +496,18 @@ connectionThreads h pinglogic = do isEof <- liftIO $ hIsEOF h if isEof then finished Nothing else loop + writerThread <- forkIO . fix $ \loop -> do + let finished = do -- warn $ "finished write" + hClose h -- quit reader + -- throwTo readerThread (ErrorCall "EOF") + atomically $ putTMVar donew () + mb <- atomically $ readTMVar outs + case mb of Just bs -> handle (\(SomeException e)->finished) + (do S.hPutStr h bs + atomically $ takeTMVar outs + loop) + Nothing -> finished + let wait = do readTMVar donew readTMVar doner return () @@ -564,7 +585,18 @@ connClose :: ConnectionState -> STM () connClose c = mapConn True threadsClose c connWait :: ConnectionState -> STM () -connWait c = mapConn False threadsWait c +connWait c = doit -- mapConn False threadsWait c + where + action = threadsWait + doit = + case c of + SaneConnection rw -> action rw + ReadOnlyConnection r -> action r + WriteOnlyConnection w -> action w + ConnectionPair r w -> do + rem <- orElse (const w `fmap` action r) + (const r `fmap` action w) + threadsClose rem connPingTimer c = case c of @@ -599,34 +631,40 @@ data PingMachine = PingMachine , pingTimeOut :: TimeOut , pingDelay :: TMVar (Int,PingEvent) , pingEvent :: TMVar PingEvent - , pingStarted :: TVar Bool -- True when a threadDelay is running + , pingStarted :: TMVar Bool -- True when a threadDelay is running , pingThread :: ThreadId + , pingFlag :: TVar Bool } pingMachine :: PingInterval -> TimeOut -> IO PingMachine pingMachine idle timeout = do me <- do - (delayVar,eventVar,startedVar) <- atomically $ do + (delayVar,eventVar,startedVar,flag) <- atomically $ do d <- newEmptyTMVar e <- newEmptyTMVar - s <- newTVar False - return (d,e,s) + s <- newTMVar False + f <- newTVar False + return (d,e,s,f) return PingMachine { pingIdle = idle , pingTimeOut = timeout , pingDelay = delayVar , pingEvent = eventVar , pingStarted = startedVar - , pingThread = undefined } + , pingThread = undefined + , pingFlag = flag } thread <- forkIO . when (pingIdle me /=0) . fix $ \loop -> do (delay,event) <- atomically $ takeTMVar (pingDelay me) when (delay /= 0) $ do handle (\(ErrorCall _)-> do - atomically $ writeTVar (pingStarted me) False + atomically $ do takeTMVar (pingStarted me) + putTMVar (pingStarted me) False loop) - (do atomically $ writeTVar (pingStarted me) True + (do atomically $ do takeTMVar (pingStarted me) + putTMVar (pingStarted me) True threadDelay delay - atomically $ writeTVar (pingStarted me) False + atomically $ do takeTMVar (pingStarted me) + putTMVar (pingStarted me) False atomically $ putTMVar (pingEvent me) event case event of PingTimeOut -> return () PingIdle -> loop) @@ -638,8 +676,9 @@ pingCancel me = do b <- atomically $ do tryTakeTMVar (pingDelay me) -- no hang putTMVar (pingDelay me) (0,PingTimeOut) - readTVar (pingStarted me) + takeTMVar (pingStarted me) when b $ throwTo (pingThread me) $ ErrorCall "" + atomically $ putTMVar (pingStarted me) b pingBump :: PingMachine -> IO () pingBump me = do @@ -651,15 +690,17 @@ pingBump me = do Just _ -> retry Nothing -> putTMVar (pingDelay me) (1000*pingIdle me,PingIdle) - readTVar (pingStarted me) + takeTMVar (pingStarted me) when b $ throwTo (pingThread me) $ ErrorCall "" + atomically $ putTMVar (pingStarted me) b pingWait :: PingMachine -> STM PingEvent pingWait me = do e <- takeTMVar (pingEvent me) case e of - PingIdle -> putTMVar (pingDelay me) - (1000*pingTimeOut me,PingTimeOut) + PingIdle -> do writeTVar (pingFlag me) True + putTMVar (pingDelay me) + (1000*pingTimeOut me,PingTimeOut) PingTimeOut -> putTMVar (pingDelay me) (0,PingTimeOut) return e diff --git a/xmppServer.hs b/xmppServer.hs index 50818dd6..c720ea5f 100644 --- a/xmppServer.hs +++ b/xmppServer.hs @@ -7,6 +7,12 @@ import Control.Concurrent.STM import Network.Socket import XMPPTypes (withPort) import Text.Printf +import System.Posix.Signals + +import Data.Conduit + +import qualified Text.XML.Stream.Render as XML +import qualified Text.XML.Stream.Parse as XML import Server @@ -14,12 +20,34 @@ wlog s = putStrLn s control sv = atomically . putTMVar (serverCommand sv) +xmlStream conread conwrite = (xsrc,xsnk) + where + xsrc = src $= XML.parseBytes XML.def + xsnk = XML.renderBytes XML.def =$ snk + + src = do + v <- lift conread + maybe (return ()) -- lift . wlog $ "conread: Nothing") + (\v -> yield v >> src) + v + snk = awaitForever $ lift . conwrite + + +forkConnection k pingflag src snk = do + forkIO $ do + src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) + wlog $ "end fork: " ++ show k + return () + monitor sv params = do chan <- return $ serverEvent sv fix $ \loop -> do (k,e) <- atomically $ readTChan chan case e of - Connection -> wlog $ tomsg k "Connection" + Connection pingflag conread conwrite -> do + let (xsrc,xsnk) = xmlStream conread conwrite + forkConnection k pingflag xsrc xsnk + wlog $ tomsg k "Connection" EOF -> wlog $ tomsg k "EOF" HalfConnection In -> do wlog $ tomsg k "ReadOnly" @@ -49,12 +77,17 @@ main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) - { duplex = False } + { pingInterval = 2000, duplex = False } client_params <- return $ connectionDefaults clientKey forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) control sv (Listen clientport client_params) - atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c + -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c + quitVar <- newEmptyTMVarIO + installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing + installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing + quitMessage <- atomically $ takeTMVar quitVar + wlog "goodbye." return () -- cgit v1.2.3