{-# LANGUAGE CPP #-} {-# LANGUAGE TupleSections #-} module PingMachine where import Control.Monad import Data.Function #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import InterruptibleDelay type Miliseconds = Int type TimeOut = Miliseconds type PingInterval = Miliseconds -- | Events that occur as a result of the 'PingMachine' watchdog. -- -- Use 'pingWait' to wait for one of these to occur. data PingEvent = PingIdle -- ^ You should send a ping if you observe this event. | PingTimeOut -- ^ You should give up on the connection in case of this event. data PingMachine = PingMachine { pingFlag :: TVar Bool , pingInterruptible :: InterruptibleDelay , pingEvent :: TMVar PingEvent , pingStarted :: TMVar Bool } -- | Fork a thread to monitor a connection for a ping timeout. -- -- If 'pingBump' is not invoked after a idle is signaled, a timeout event will -- occur. When that happens, even if the caller chooses to ignore this event, -- the watchdog thread will be terminated and no more ping events will be -- signaled. -- -- An idle connection will be signaled by: -- -- (1) 'pingFlag' is set 'True' -- -- (2) 'pingWait' returns 'PingIdle' -- -- Either may be tested to determine whether a ping should be sent but -- 'pingFlag' is difficult to use properly because it is up to the caller to -- remember that the ping is already in progress. forkPingMachine :: String -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. -> IO PingMachine forkPingMachine label idle timeout = do d <- interruptibleDelay flag <- atomically $ newTVar False canceled <- atomically $ newTVar False event <- atomically newEmptyTMVar started <- atomically $ newEmptyTMVar when (idle/=0) $ void . forkIO $ do myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") (>>=) (atomically (readTMVar started)) $ flip when $ do fix $ \loop -> do atomically $ writeTVar flag False fin <- startDelay d (1000*idle) (>>=) (atomically (readTMVar started)) $ flip when $ do if (not fin) then loop else do -- Idle event atomically $ do tryTakeTMVar event putTMVar event PingIdle writeTVar flag True fin <- startDelay d (1000*timeout) (>>=) (atomically (readTMVar started)) $ flip when $ do me <- myThreadId if (not fin) then loop else do -- Timeout event atomically $ do tryTakeTMVar event writeTVar flag False putTMVar event PingTimeOut return PingMachine { pingFlag = flag , pingInterruptible = d , pingEvent = event , pingStarted = started } -- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically -- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread -- regardless of idle value. forkPingMachineDynamic :: String -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. -> IO PingMachine forkPingMachineDynamic label idleV timeoutV = do d <- interruptibleDelay flag <- atomically $ newTVar False canceled <- atomically $ newTVar False event <- atomically newEmptyTMVar started <- atomically $ newEmptyTMVar void . forkIO $ do myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") (>>=) (atomically (readTMVar started)) $ flip when $ do fix $ \loop -> do atomically $ writeTVar flag False (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV fin <- startDelay d (1000*idle) (>>=) (atomically (readTMVar started)) $ flip when $ do if (not fin) then loop else do -- Idle event atomically $ do tryTakeTMVar event putTMVar event PingIdle writeTVar flag True fin <- startDelay d (1000*timeout) (>>=) (atomically (readTMVar started)) $ flip when $ do me <- myThreadId if (not fin) then loop else do -- Timeout event atomically $ do tryTakeTMVar event writeTVar flag False putTMVar event PingTimeOut return PingMachine { pingFlag = flag , pingInterruptible = d , pingEvent = event , pingStarted = started } -- | Terminate the watchdog thread. Call this upon connection close. -- -- You should ensure no threads are waiting on 'pingWait' because there is no -- 'PingEvent' signaling termination. pingCancel :: PingMachine -> IO () pingCancel me = do atomically $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) False interruptDelay (pingInterruptible me) -- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'. pingBump :: PingMachine -> IO () pingBump me = do atomically $ do b <- tryReadTMVar (pingStarted me) when (b/=Just False) $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) True interruptDelay (pingInterruptible me) -- | Retries until a 'PingEvent' occurs. pingWait :: PingMachine -> STM PingEvent pingWait me = takeTMVar (pingEvent me)