{-# OPTIONS_HADDOCK prune #-} {-# LANGUAGE CPP #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} #ifdef TEST {-# LANGUAGE FlexibleInstances #-} #endif ----------------------------------------------------------------------------- -- | -- Module : Server -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- A TCP client/server library. -- -- TODO: -- -- * interface tweaks -- module Server where import Data.ByteString (ByteString,hGetNonBlocking) import qualified Data.ByteString.Char8 as S ( hPutStrLn, hPutStr, pack) #if MIN_VERSION_containers(0,5,0) import qualified Data.Map.Strict as Map import Data.Map.Strict (Map) #else import qualified Data.Map as Map import Data.Map (Map) #endif import Data.Monoid ( (<>) ) import Control.Concurrent import Control.Concurrent.STM -- import Control.Concurrent.STM.TMVar -- import Control.Concurrent.STM.TChan -- import Control.Concurrent.STM.Delay import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) import Control.Monad import Control.Monad.Fix -- import Control.Monad.STM import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) import System.IO.Error (ioeGetErrorType) import System.IO ( IOMode(..) , hSetBuffering , BufferMode(..) , hWaitForInput , hClose , hIsEOF , stderr , stdout , Handle , hFlush ) import Network.Socket import Network.BSD ( getProtocolNumber ) import Debug.Trace import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) import Data.Time.Format (formatTime) import System.Locale (defaultTimeLocale) todo = error "unimplemented" type Microseconds = Int type Miliseconds = Int type TimeOut = Miliseconds type PingInterval = Miliseconds -- | This object is passed with the 'Listen' and 'Connect' -- instructions in order to control the behavior of the -- connections that are established. It is parameterized -- by a user-suplied type @conkey@ that is used as a lookup -- key for connections. data ConnectionParameters conkey = ConnectionParameters { pingInterval :: PingInterval -- ^ The miliseconds of idle to allow before a 'RequiresPing' -- event is signaled. , timeout :: TimeOut -- ^ The miliseconds of idle after 'RequiresPing' is signaled -- that are necessary for the connection to be considered -- lost and signalling 'EOF'. , makeConnKey :: (Socket,SockAddr) -> IO conkey -- ^ This action creates a lookup key for a new connection. If 'duplex' -- is 'True' and the result is already assocatied with an established -- connection, then an 'EOF' will be forced before the the new -- connection becomes active. -- , duplex :: Bool -- ^ If True, then the connection will be treated as a normal -- two-way socket. Otherwise, a readable socket is established -- with 'Listen' and a writable socket is established with -- 'Connect' and they are associated when 'makeConnKey' yields -- same value for each. } -- | Use this function to select appropriate default values for -- 'ConnectionParameters' other than 'makeConnKey'. -- -- Current defaults: -- -- * 'pingInterval' = 28000 -- -- * 'timeout' = 2000 -- -- * 'duplex' = True -- connectionDefaults :: ((Socket, SockAddr) -> IO conkey) -> ConnectionParameters conkey connectionDefaults f = ConnectionParameters { pingInterval = 28000 , timeout = 2000 , makeConnKey = f , duplex = True } -- | Instructions for a 'Server' object -- -- To issue a command, put it into the 'serverCommand' TMVar. data ServerInstruction conkey = Quit -- ^ kill the server. This command is automatically issued when -- the server is released. | Listen PortNumber (ConnectionParameters conkey) -- ^ listen for incomming connections | Connect SockAddr (ConnectionParameters conkey) -- ^ connect to addresses | ConnectWithEndlessRetry SockAddr (ConnectionParameters conkey) Miliseconds -- ^ keep retrying the connection | Ignore PortNumber -- ^ stop listening on specified port | Send conkey ByteString -- ^ send bytes to an established connection #ifdef TEST deriving instance Show conkey => Show (ServerInstruction conkey) instance Show (a -> b) where show _ = "" deriving instance Show conkey => Show (ConnectionParameters conkey) #endif -- | This type specifies which which half of a half-duplex -- connection is of interest. data InOrOut = In | Out deriving (Enum,Eq,Ord,Show,Read) -- | These events may be read from 'serverEvent' TChannel. -- data ConnectionEvent b = Got b -- ^ Arrival of data from a socket | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) -- ^ A new connection was established | ConnectFailure SockAddr -- ^ A 'Connect' command failed. | HalfConnection InOrOut -- ^ Half of a half-duplex connection is avaliable. | EOF -- ^ A connection was terminated | RequiresPing -- ^ 'pingInterval' miliseconds of idle was experienced #ifdef TEST instance Show (IO a) where show _ = "" instance Show (STM a) where show _ = "" instance Eq (ByteString -> IO Bool) where (==) _ _ = True instance Eq (IO (Maybe ByteString)) where (==) _ _ = True instance Eq (STM Bool) where (==) _ _ = True deriving instance Show b => Show (ConnectionEvent b) deriving instance Eq b => Eq (ConnectionEvent b) #endif -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. data Server a = Server { serverCommand :: TMVar (ServerInstruction a) , serverEvent :: TChan (a, ConnectionEvent ByteString) , serverReleaseKey :: ReleaseKey , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) } -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' -- to ensure proper cleanup. For example, -- -- > import Server -- > import Control.Monad.Trans.Resource (runResourceT) -- > import Control.Monad.IO.Class (liftIO) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent.STM.TMVar (putTMVar) -- > import Control.Concurrent.STM.TChan (readTChan) -- > -- > main = runResourceT $ do -- > sv <- server -- > let params = connectionDefaults (return . snd) -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) -- > let loop = do -- > (_,event) <- atomically $ readTChan (serverEvent sv) -- > case event of -- > Got bytes -> putStrLn $ "got: " ++ show bytes -- > _ -> return () -- > case event of EOF -> return () -- > _ -> loop -- > liftIO loop server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a) server = do (key,cmds) <- allocate (atomically newEmptyTMVar) (atomically . flip putTMVar Quit) server <- liftIO . atomically $ do tchan <- newTChan conmap <- newTVar Map.empty listenmap<- newTVar Map.empty retrymap <- newTVar Map.empty return Server { serverCommand = cmds , serverEvent = tchan , serverReleaseKey = key , conmap = conmap , listenmap = listenmap , retrymap = retrymap } liftIO $ do forkIO $ fix $ \loop -> do instr <- atomically $ takeTMVar cmds -- warn $ "instr = " <> bshow instr let again = do doit server instr -- warn $ "finished " <> bshow instr loop case instr of Quit -> closeAll server _ -> again return server where closeAll server = liftIO $ do listening <- atomically . readTVar $ listenmap server mapM_ killListener (Map.elems listening) let stopRetry (v,d) = do atomically $ writeTVar v False interruptDelay d retriers <- atomically $ do rmap <- readTVar $ retrymap server writeTVar (retrymap server) Map.empty return rmap mapM_ stopRetry (Map.elems retriers) cons <- atomically . readTVar $ conmap server atomically $ mapM_ (connClose . snd) (Map.elems cons) atomically $ mapM_ (connWait . snd) (Map.elems cons) atomically $ writeTVar (conmap server) Map.empty doit server (Listen port params) = liftIO $ do listening <- Map.member port `fmap` atomically (readTVar $ listenmap server) when (not listening) $ do let family = AF_INET6 sock <- socket family Stream 0 setSocketOption sock ReuseAddr 1 let address = case family of AF_INET -> SockAddrInet port iNADDR_ANY AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 fix $ \loop -> do handle (\(SomeException e)-> do warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e threadDelay 5000000 loop) $ bindSocket sock address listen sock 2 thread <- forkIO $ acceptLoop server params sock atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock) doit server (Ignore port) = liftIO $ do mb <- atomically $ do map <- readTVar $ listenmap server modifyTVar' (listenmap server) $ Map.delete port return $ Map.lookup port map maybe (return ()) killListener $ mb doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do map <- atomically $ readTVar (conmap server) let post False = (trace ("cant send: "++show bs) $ return ()) post True = return () maybe (post False) (post <=< flip connWrite bs . snd) $ Map.lookup con map doit server (Connect addr params) = liftIO $ do mb <- atomically $ do rmap <- readTVar (retrymap server) return $ Map.lookup addr rmap maybe forkit (\(v,d) -> do b <- atomically $ readTVar v interruptDelay d when (not b) forkit) mb where forkit = void . forkIO $ do proto <- getProtocolNumber "tcp" sock <- bracketOnError (socket (socketFamily addr) Stream proto) (\sock -> do -- only done if there's an error -- Weird hack: puting the would-be peer address -- instead of local socketName conkey <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ writeTChan (serverEvent server) $ (conkey,ConnectFailure addr)) $ \sock -> do connect sock addr return sock me <- getSocketName sock conkey <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode newConnection server params conkey h Out return () doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do proto <- getProtocolNumber "tcp" void . forkIO $ do resultVar <- atomically newEmptyTMVar timer <- interruptableDelay (retryVar,action) <- atomically $ do let noop = return () map <- readTVar (retrymap server) let mb = Map.lookup addr map action <- maybe (return noop) (\(v,d) -> do writeTVar v False return $ interruptDelay d) mb v <- newTVar True writeTVar (retrymap server) (Map.insert addr (v,timer) map) return (v,action) action fix $ \retryLoop -> do utc <- getCurrentTime forkIO $ do sock <- bracketOnError (socket (socketFamily addr) Stream proto) (\sock -> do -- only done if there's an error -- Weird hack: puting the would-be peer address -- instead of local socketName conkey <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ do writeTChan (serverEvent server) $ (conkey,ConnectFailure addr) retry <- readTVar retryVar putTMVar resultVar retry) $ \sock -> do connect sock addr return sock me <- getSocketName sock conkey <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode threads <- newConnection server params conkey h Out atomically $ do threadsWait threads retry <- readTVar retryVar putTMVar resultVar retry retry <- atomically $ takeTMVar resultVar fin_utc <- getCurrentTime when retry $ do let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) expected = fromIntegral interval when (retry && elapsed < expected) $ do warn $ "Waiting to retry " <> bshow addr startDelay timer (round $ 1000 * (expected-elapsed)) warn $ "retry " <> bshow (retry,addr) when retry $ retryLoop -- INTERNAL ---------------------------------------------------------- {- hWriteUntilNothing h outs = fix $ \loop -> do mb <- atomically $ takeTMVar outs case mb of Just bs -> do S.hPutStrLn h bs warn $ "wrote " <> bs loop Nothing -> do warn $ "wrote Nothing" hClose h -} connRead :: ConnectionState -> IO (Maybe ByteString) connRead (WriteOnlyConnection w) = do -- atomically $ discardContents (threadsChannel w) return Nothing connRead conn = do c <- atomically $ getThreads threadsRead c where getThreads = case conn of SaneConnection c -> return c ReadOnlyConnection c -> return c ConnectionPair c w -> do -- discardContents (threadsChannel w) return c socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 socketFamily (SockAddrUnix _) = AF_UNIX killListener (thread,sock) = do sClose sock -- killThread thread conevent con = Connection pingflag read write where pingflag = swapTVar (pingFlag (connPingTimer con)) False read = connRead con write = connWrite con newConnection server params conkey h inout = do hSetBuffering h NoBuffering let (forward,idle_ms,timeout_ms) = case (inout,duplex params) of (Out,True) -> ( const $ return () , 0 , 0 ) _ -> ( announce . (conkey,) . Got , pingInterval params , timeout params ) new <- do pinglogic <- pingMachine idle_ms timeout_ms connectionThreads h pinglogic started <- atomically $ newEmptyTMVar kontvar <- atomically newEmptyTMVar forkIO $ do getkont <- atomically $ takeTMVar kontvar kont <- atomically getkont kont atomically $ do current <- fmap (Map.lookup conkey) $ readTVar (conmap server) case current of Nothing -> do (newCon,e) <- return $ if duplex params then let newcon = SaneConnection new in ( newcon, (conkey, conevent newcon) ) else ( case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new , (conkey, HalfConnection inout) ) modifyTVar' (conmap server) $ Map.insert conkey (kontvar,newCon) announce e putTMVar kontvar $ return $ do atomically $ putTMVar started () handleEOF conkey kontvar newCon Just what@(mvar,_) -> do putTMVar kontvar $ return $ return () putTMVar mvar $ do kont <- updateConMap conkey new what putTMVar started () return kont #ifdef TEST -- enable this for 'Got' events forkIO $ do -- inout==In || duplex params then forkIO $ do warn $ "waiting gots thread: " <> bshow (conkey,inout) atomically $ takeTMVar started -- pingBump pinglogic -- start the ping timer fix $ \loop -> do -- warn $ "read thread: " <> bshow (conkey,inout) mb <- threadsRead new -- pingBump pinglogic warn $ "got: " <> bshow (mb,(conkey,inout)) maybe (return ()) (atomically . forward >=> const loop) mb warn $ "quit-gots: " <> bshow (conkey,inout) #endif return new where announce e = writeTChan (serverEvent server) e handleEOF conkey mvar newCon = do action <- atomically . foldr1 orElse $ [ takeTMVar mvar >>= id -- passed continuation , connWait newCon >> return eof , connWaitPing newCon >>= return . sendPing -- , pingWait pingTimer >>= return . sendPing ] action :: IO () where eof = do -- warn $ "EOF " <>bshow conkey connCancelPing newCon atomically $ do connFlush newCon announce (conkey,EOF) modifyTVar' (conmap server) $ Map.delete conkey -- warn $ "fin-EOF "<>bshow conkey sendPing PingTimeOut = do {- let me = connPingTimer newCon utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "TIMEOUT " <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} atomically (connClose newCon) eof sendPing PingIdle = do {- let me = connPingTimer newCon utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} atomically $ announce (conkey,RequiresPing) handleEOF conkey mvar newCon updateConMap conkey new (mvar,replaced) = do new' <- if duplex params then do announce (conkey,EOF) connClose replaced let newcon = SaneConnection new announce $ (conkey,conevent newcon) return $ newcon else case replaced of WriteOnlyConnection w | inout==In -> do let newcon = ConnectionPair new w announce (conkey,conevent newcon) return newcon ReadOnlyConnection r | inout==Out -> do let newcon = ConnectionPair r new announce (conkey,conevent newcon) return newcon _ -> do -- connFlush todo announce (conkey, EOF) connClose replaced announce (conkey, HalfConnection inout) return $ case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') return $ handleEOF conkey mvar new' acceptLoop server params sock = handle (acceptException server params sock) $ do con <- accept sock conkey <- makeConnKey params con h <- socketToHandle (fst con) ReadWriteMode newConnection server params conkey h In acceptLoop server params sock acceptException server params sock ioerror = do sClose sock case show (ioeGetErrorType ioerror) of "resource exhausted" -> do -- try again warn ("acceptLoop: resource exhasted") threadDelay 500000 acceptLoop server params sock "invalid argument" -> do -- quit on closed socket return () message -> do -- unexpected exception warn ("acceptLoop: "<>bshow message) return () getPacket h = do hWaitForInput h (-1) hGetNonBlocking h 1024 -- | 'ConnectionThreads' is an interface to a pair of threads -- that are reading and writing a 'Handle'. data ConnectionThreads = ConnectionThreads { threadsWriter :: TMVar (Maybe ByteString) , threadsChannel :: TChan ByteString , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close , threadsPing :: PingMachine } -- | This spawns the reader and writer threads and returns a newly -- constructed 'ConnectionThreads' object. connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads connectionThreads h pinglogic = do (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan readerThread <- forkIO $ do let finished e = do hClose h -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) -- let _ = fmap ioeGetErrorType e -- type hint let _ = fmap what e where what (SomeException _) = undefined atomically $ do tryTakeTMVar outs putTMVar outs Nothing -- quit writer putTMVar doner () handle (finished . Just) $ do pingBump pinglogic -- start the ping timer fix $ \loop -> do packet <- getPacket h atomically $ writeTChan incomming packet pingBump pinglogic isEof <- liftIO $ hIsEOF h if isEof then finished Nothing else loop writerThread <- forkIO . fix $ \loop -> do let finished = do -- warn $ "finished write" -- hClose h -- quit reader throwTo readerThread (ErrorCall "EOF") atomically $ putTMVar donew () mb <- atomically $ readTMVar outs case mb of Just bs -> handle (\(SomeException e)->finished) (do S.hPutStr h bs atomically $ takeTMVar outs loop) Nothing -> finished let wait = do readTMVar donew readTMVar doner return () return ConnectionThreads { threadsWriter = outs , threadsChannel = incomming , threadsWait = wait , threadsPing = pinglogic } -- | 'threadsWrite' writes the given 'ByteString' to the -- 'ConnectionThreads' object. It blocks until the ByteString -- is written and 'True' is returned, or the connection is -- interrupted and 'False' is returned. threadsWrite :: ConnectionThreads -> ByteString -> IO Bool threadsWrite c bs = atomically $ orElse (const False `fmap` threadsWait c) (const True `fmap` putTMVar (threadsWriter c) (Just bs)) -- | 'threadsClose' signals for the 'ConnectionThreads' object -- to quit and close the associated 'Handle'. This operation -- is non-blocking, follow it with 'threadsWait' if you want -- to wait for the operation to complete. threadsClose :: ConnectionThreads -> STM () threadsClose c = do let mvar = threadsWriter c v <- tryReadTMVar mvar case v of Just Nothing -> return () -- already closed _ -> putTMVar mvar Nothing -- | 'threadsRead' blocks until a 'ByteString' is available which -- is returned to the caller, or the connection is interrupted and -- 'Nothing' is returned. threadsRead :: ConnectionThreads -> IO (Maybe ByteString) threadsRead c = atomically $ orElse (const Nothing `fmap` threadsWait c) (Just `fmap` readTChan (threadsChannel c)) -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' -- or to a pair of 'ConnectionThreads' objects that are considered as one -- connection. data ConnectionState = SaneConnection ConnectionThreads -- ^ ordinary read/write connection | WriteOnlyConnection ConnectionThreads | ReadOnlyConnection ConnectionThreads | ConnectionPair ConnectionThreads ConnectionThreads -- ^ Two 'ConnectionThreads' objects, read operations use the -- first, write operations use the second. connWrite :: ConnectionState -> ByteString -> IO Bool connWrite (ReadOnlyConnection _) bs = return False connWrite conn bs = threadsWrite c bs where c = case conn of SaneConnection c -> c WriteOnlyConnection c -> c ConnectionPair _ c -> c mapConn :: Bool -> (ConnectionThreads -> STM ()) -> ConnectionState -> STM () mapConn both action c = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) when both $ action rem connClose :: ConnectionState -> STM () connClose c = mapConn True threadsClose c connWait :: ConnectionState -> STM () connWait c = doit -- mapConn False threadsWait c where action = threadsWait doit = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) threadsClose rem connPingTimer c = case c of SaneConnection rw -> threadsPing rw ReadOnlyConnection r -> threadsPing r WriteOnlyConnection w -> threadsPing w -- should be disabled. ConnectionPair r w -> threadsPing r connCancelPing c = pingCancel (connPingTimer c) connWaitPing c = pingWait (connPingTimer c) connFlush c = case c of SaneConnection rw -> waitChan rw ReadOnlyConnection r -> waitChan r WriteOnlyConnection w -> return () ConnectionPair r w -> waitChan r where waitChan t = do b <- isEmptyTChan (threadsChannel t) when (not b) retry bshow e = S.pack . show $ e warn str = S.hPutStrLn stderr str >> hFlush stderr data PingEvent = PingIdle | PingTimeOut data PingMachine = PingMachine { pingIdle :: PingInterval , pingTimeOut :: TimeOut , pingDelay :: TMVar (Int,PingEvent) , pingEvent :: TMVar PingEvent , pingStarted :: TMVar Bool -- True when a threadDelay is running , pingThread :: ThreadId , pingFlag :: TVar Bool } pingMachine :: PingInterval -> TimeOut -> IO PingMachine pingMachine idle timeout = do me <- do (delayVar,eventVar,startedVar,flag) <- atomically $ do d <- newEmptyTMVar e <- newEmptyTMVar s <- newTMVar False f <- newTVar False return (d,e,s,f) return PingMachine { pingIdle = idle , pingTimeOut = timeout , pingDelay = delayVar , pingEvent = eventVar , pingStarted = startedVar , pingThread = undefined , pingFlag = flag } thread <- forkIO . when (pingIdle me /=0) . fix $ \loop -> do (delay,event) <- atomically $ takeTMVar (pingDelay me) when (delay /= 0) $ do handle (\(ErrorCall _)-> do atomically $ do takeTMVar (pingStarted me) putTMVar (pingStarted me) False loop) (do atomically $ do takeTMVar (pingStarted me) putTMVar (pingStarted me) True threadDelay delay atomically $ do takeTMVar (pingStarted me) putTMVar (pingStarted me) False atomically $ putTMVar (pingEvent me) event case event of PingTimeOut -> return () PingIdle -> loop) return me { pingThread = thread } pingCancel :: PingMachine -> IO () pingCancel me = do b <- atomically $ do tryTakeTMVar (pingDelay me) -- no hang putTMVar (pingDelay me) (0,PingTimeOut) takeTMVar (pingStarted me) when b $ throwTo (pingThread me) $ ErrorCall "" atomically $ putTMVar (pingStarted me) b pingBump :: PingMachine -> IO () pingBump me = do b <- atomically $ do when (pingIdle me /= 0) $ do e <- tryReadTMVar (pingDelay me) case e of Just (0,PingTimeOut) -> return () -- canceled/fired Just _ -> retry Nothing -> putTMVar (pingDelay me) (1000*pingIdle me,PingIdle) takeTMVar (pingStarted me) when b $ throwTo (pingThread me) $ ErrorCall "" {- utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "BUMP " <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} atomically $ putTMVar (pingStarted me) b {- pingBump me = do b <- atomically $ do when (pingIdle me /= 0) $ putTMVar (pingDelay me) (1000*pingIdle me,PingIdle) readTMVar (pingStarted me) when b $ throwTo (pingThread me) $ ErrorCall "" -} pingWait :: PingMachine -> STM PingEvent pingWait me = do e <- takeTMVar (pingEvent me) case e of PingIdle -> do writeTVar (pingFlag me) True putTMVar (pingDelay me) (1000*pingTimeOut me,PingTimeOut) PingTimeOut -> putTMVar (pingDelay me) (0,PingTimeOut) return e data InterruptableDelay = InterruptableDelay { delayThread :: TMVar ThreadId } interruptableDelay :: IO InterruptableDelay interruptableDelay = do fmap InterruptableDelay $ atomically newEmptyTMVar startDelay :: InterruptableDelay -> Microseconds -> IO () startDelay d interval = do thread <- myThreadId atomically $ putTMVar (delayThread d) thread handle (\(ErrorCall _)-> warn $ "delay interrupted" ) $ do threadDelay interval void . atomically $ takeTMVar (delayThread d) interruptDelay :: InterruptableDelay -> IO () interruptDelay d = do mthread <- atomically $ do tryTakeTMVar (delayThread d) flip (maybe $ return ()) mthread $ \thread -> do throwTo thread (ErrorCall "Interrupted delay") atomically $ do putTMVar (delayThread d) thread