{-# OPTIONS_HADDOCK prune #-} {-# LANGUAGE CPP #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} #ifdef TEST {-# LANGUAGE FlexibleInstances #-} #endif ----------------------------------------------------------------------------- -- | -- Module : Server -- -- Maintainer : joe@jerkface.net -- Stability : experimental -- -- A TCP client/server library. -- -- TODO: -- -- * interface tweaks -- module Server where import Data.ByteString (ByteString,hGetNonBlocking) import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) #if MIN_VERSION_containers(0,5,0) import qualified Data.Map.Strict as Map import Data.Map.Strict (Map) #else import qualified Data.Map as Map import Data.Map (Map) #endif import Data.Monoid ( (<>) ) import Control.Concurrent import Control.Concurrent.STM -- import Control.Concurrent.STM.TMVar -- import Control.Concurrent.STM.TChan -- import Control.Concurrent.STM.Delay import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) import Control.Monad import Control.Monad.Fix -- import Control.Monad.STM import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) import System.IO.Error (ioeGetErrorType,isDoesNotExistError) import System.IO ( IOMode(..) , hSetBuffering , BufferMode(..) , hWaitForInput , hClose , hIsEOF , stderr , stdout , Handle , hFlush ) import Network.Socket import Network.BSD ( getProtocolNumber ) import Debug.Trace import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) import Data.Time.Format (formatTime) import System.Locale (defaultTimeLocale) todo = error "unimplemented" #if MIN_VERSION_network(2,4,0) #else deriving instance Ord SockAddr #endif type Microseconds = Int type Miliseconds = Int type TimeOut = Miliseconds type PingInterval = Miliseconds -- | This object is passed with the 'Listen' and 'Connect' -- instructions in order to control the behavior of the -- connections that are established. It is parameterized -- by a user-suplied type @conkey@ that is used as a lookup -- key for connections. data ConnectionParameters conkey u = ConnectionParameters { pingInterval :: PingInterval -- ^ The miliseconds of idle to allow before a 'RequiresPing' -- event is signaled. , timeout :: TimeOut -- ^ The miliseconds of idle after 'RequiresPing' is signaled -- that are necessary for the connection to be considered -- lost and signalling 'EOF'. , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u) -- ^ This action creates a lookup key for a new connection. If 'duplex' -- is 'True' and the result is already assocatied with an established -- connection, then an 'EOF' will be forced before the the new -- connection becomes active. -- , duplex :: Bool -- ^ If True, then the connection will be treated as a normal -- two-way socket. Otherwise, a readable socket is established -- with 'Listen' and a writable socket is established with -- 'Connect' and they are associated when 'makeConnKey' yields -- same value for each. } -- | Use this function to select appropriate default values for -- 'ConnectionParameters' other than 'makeConnKey'. -- -- Current defaults: -- -- * 'pingInterval' = 28000 -- -- * 'timeout' = 2000 -- -- * 'duplex' = True -- connectionDefaults :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u connectionDefaults f = ConnectionParameters { pingInterval = 28000 , timeout = 2000 , makeConnKey = f , duplex = True } -- | Instructions for a 'Server' object -- -- To issue a command, put it into the 'serverCommand' TMVar. data ServerInstruction conkey u = Quit -- ^ kill the server. This command is automatically issued when -- the server is released. | Listen PortNumber (ConnectionParameters conkey u) -- ^ listen for incomming connections | Connect SockAddr (ConnectionParameters conkey u) -- ^ connect to addresses | ConnectWithEndlessRetry SockAddr (ConnectionParameters conkey u) Miliseconds -- ^ keep retrying the connection | Ignore PortNumber -- ^ stop listening on specified port | Send conkey ByteString -- ^ send bytes to an established connection #ifdef TEST deriving instance Show conkey => Show (ServerInstruction conkey) instance Show (a -> b) where show _ = "" deriving instance Show conkey => Show (ConnectionParameters conkey) #endif -- | This type specifies which which half of a half-duplex -- connection is of interest. data InOrOut = In | Out deriving (Enum,Eq,Ord,Show,Read) -- | These events may be read from 'serverEvent' TChannel. -- data ConnectionEvent b = Got b -- ^ Arrival of data from a socket | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) -- ^ A new connection was established | ConnectFailure SockAddr -- ^ A 'Connect' command failed. | HalfConnection InOrOut -- ^ Half of a half-duplex connection is avaliable. | EOF -- ^ A connection was terminated | RequiresPing -- ^ 'pingInterval' miliseconds of idle was experienced #ifdef TEST instance Show (IO a) where show _ = "" instance Show (STM a) where show _ = "" instance Eq (ByteString -> IO Bool) where (==) _ _ = True instance Eq (IO (Maybe ByteString)) where (==) _ _ = True instance Eq (STM Bool) where (==) _ _ = True deriving instance Show b => Show (ConnectionEvent b) deriving instance Eq b => Eq (ConnectionEvent b) #endif -- | This is the per-connection state. data ConnectionRecord u = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler , cstate :: ConnectionState -- ^ used to send/receive data to the connection , cdata :: u -- ^ user data, stored in the connection map for convenience } -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. data Server a u = Server { serverCommand :: TMVar (ServerInstruction a u) , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) , serverReleaseKey :: ReleaseKey , conmap :: TVar (Map a (ConnectionRecord u)) , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) } control sv = atomically . putTMVar (serverCommand sv) -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' -- to ensure proper cleanup. For example, -- -- > import Server -- > import Control.Monad.Trans.Resource (runResourceT) -- > import Control.Monad.IO.Class (liftIO) -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent.STM.TMVar (putTMVar) -- > import Control.Concurrent.STM.TChan (readTChan) -- > -- > main = runResourceT $ do -- > sv <- server -- > let params = connectionDefaults (return . snd) -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) -- > let loop = do -- > (_,event) <- atomically $ readTChan (serverEvent sv) -- > case event of -- > Got bytes -> putStrLn $ "got: " ++ show bytes -- > _ -> return () -- > case event of EOF -> return () -- > _ -> loop -- > liftIO loop server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u) server = do (key,cmds) <- allocate (atomically newEmptyTMVar) (atomically . flip putTMVar Quit) server <- liftIO . atomically $ do tchan <- newTChan conmap <- newTVar Map.empty listenmap<- newTVar Map.empty retrymap <- newTVar Map.empty return Server { serverCommand = cmds , serverEvent = tchan , serverReleaseKey = key , conmap = conmap , listenmap = listenmap , retrymap = retrymap } liftIO $ do forkIO $ fix $ \loop -> do instr <- atomically $ takeTMVar cmds -- warn $ "instr = " <> bshow instr let again = do doit server instr -- warn $ "finished " <> bshow instr loop case instr of Quit -> closeAll server _ -> again return server where closeAll server = liftIO $ do listening <- atomically . readTVar $ listenmap server mapM_ killListener (Map.elems listening) let stopRetry (v,d) = do atomically $ writeTVar v False interruptDelay d retriers <- atomically $ do rmap <- readTVar $ retrymap server writeTVar (retrymap server) Map.empty return rmap mapM_ stopRetry (Map.elems retriers) cons <- atomically . readTVar $ conmap server atomically $ mapM_ (connClose . cstate) (Map.elems cons) atomically $ mapM_ (connWait . cstate) (Map.elems cons) atomically $ writeTVar (conmap server) Map.empty doit server (Listen port params) = liftIO $ do listening <- Map.member port `fmap` atomically (readTVar $ listenmap server) when (not listening) $ do let family = AF_INET6 sock <- socket family Stream 0 setSocketOption sock ReuseAddr 1 let address = case family of AF_INET -> SockAddrInet port iNADDR_ANY AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 fix $ \loop -> do handle (\(SomeException e)-> do warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e threadDelay 5000000 loop) $ bindSocket sock address listen sock 2 thread <- forkIO $ acceptLoop server params sock atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock) doit server (Ignore port) = liftIO $ do mb <- atomically $ do map <- readTVar $ listenmap server modifyTVar' (listenmap server) $ Map.delete port return $ Map.lookup port map maybe (return ()) killListener $ mb doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do map <- atomically $ readTVar (conmap server) let post False = (trace ("cant send: "++show bs) $ return ()) post True = return () maybe (post False) (post <=< flip connWrite bs . cstate) $ Map.lookup con map doit server (Connect addr params) = liftIO $ do mb <- atomically $ do rmap <- readTVar (retrymap server) return $ Map.lookup addr rmap maybe forkit (\(v,d) -> do b <- atomically $ readTVar v interruptDelay d when (not b) forkit) mb where forkit = void . forkIO $ do proto <- getProtocolNumber "tcp" sock <- socket (socketFamily addr) Stream proto handle (\e -> do -- let t = ioeGetErrorType e when (isDoesNotExistError e) $ return () -- warn "GOTCHA" -- warn $ "connect-error: " <> bshow e (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr)) $ do connect sock addr me <- getSocketName sock (conkey,u) <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode newConnection server params conkey u h Out return () doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do proto <- getProtocolNumber "tcp" void . forkIO $ do resultVar <- atomically newEmptyTMVar timer <- interruptableDelay (retryVar,action) <- atomically $ do let noop = return () map <- readTVar (retrymap server) let mb = Map.lookup addr map action <- maybe (return noop) (\(v,d) -> do writeTVar v False return $ interruptDelay d) mb v <- newTVar True writeTVar (retrymap server) (Map.insert addr (v,timer) map) return (v,action) action fix $ \retryLoop -> do utc <- getCurrentTime forkIO $ do sock <- socket (socketFamily addr) Stream proto handle (\e -> do -- let t = ioeGetErrorType e when (isDoesNotExistError e) $ return () -- warn "GOTCHA" -- warn $ "connect-error: " <> bshow e -- Weird hack: puting the would-be peer address -- instead of local socketName (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? sClose sock atomically $ do writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr) retry <- readTVar retryVar putTMVar resultVar retry) $ do connect sock addr me <- getSocketName sock (conkey,u) <- makeConnKey params (sock,me) h <- socketToHandle sock ReadWriteMode threads <- newConnection server params conkey u h Out atomically $ do threadsWait threads retry <- readTVar retryVar putTMVar resultVar retry retry <- atomically $ takeTMVar resultVar fin_utc <- getCurrentTime when retry $ do let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) expected = fromIntegral interval when (retry && elapsed < expected) $ do debugNoise $ "Waiting to retry " <> bshow addr void $ startDelay timer (round $ 1000 * (expected-elapsed)) debugNoise $ "retry " <> bshow (retry,addr) when retry $ retryLoop -- INTERNAL ---------------------------------------------------------- {- hWriteUntilNothing h outs = fix $ \loop -> do mb <- atomically $ takeTMVar outs case mb of Just bs -> do S.hPutStrLn h bs warn $ "wrote " <> bs loop Nothing -> do warn $ "wrote Nothing" hClose h -} connRead :: ConnectionState -> IO (Maybe ByteString) connRead (WriteOnlyConnection w) = do -- atomically $ discardContents (threadsChannel w) return Nothing connRead conn = do c <- atomically $ getThreads threadsRead c where getThreads = case conn of SaneConnection c -> return c ReadOnlyConnection c -> return c ConnectionPair c w -> do -- discardContents (threadsChannel w) return c socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 socketFamily (SockAddrUnix _) = AF_UNIX killListener (thread,sock) = do sClose sock -- killThread thread conevent con = Connection pingflag read write where pingflag = swapTVar (pingFlag (connPingTimer con)) False read = connRead con write = connWrite con newConnection server params conkey u h inout = do hSetBuffering h NoBuffering let (forward,idle_ms,timeout_ms) = case (inout,duplex params) of (Out,False) -> ( const $ return () , 0 , 0 ) _ -> ( announce . ((conkey,u),) . Got , pingInterval params , timeout params ) new <- do pinglogic <- pingMachine idle_ms timeout_ms connectionThreads h pinglogic started <- atomically $ newEmptyTMVar kontvar <- atomically newEmptyTMVar forkIO $ do getkont <- atomically $ takeTMVar kontvar kont <- atomically getkont kont atomically $ do current <- fmap (Map.lookup conkey) $ readTVar (conmap server) case current of Nothing -> do (newCon,e) <- return $ if duplex params then let newcon = SaneConnection new in ( newcon, ((conkey,u), conevent newcon) ) else ( case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new , ((conkey,u), HalfConnection inout) ) modifyTVar' (conmap server) $ Map.insert conkey ConnectionRecord { ckont = kontvar , cstate = newCon , cdata = u } announce e putTMVar kontvar $ return $ do atomically $ putTMVar started () handleEOF conkey u kontvar newCon Just what@ConnectionRecord { ckont =mvar }-> do putTMVar kontvar $ return $ return () putTMVar mvar $ do kont <- updateConMap conkey u new what putTMVar started () return kont #ifdef TEST -- enable this for 'Got' events forkIO $ do -- inout==In || duplex params then forkIO $ do warn $ "waiting gots thread: " <> bshow (conkey,inout) atomically $ takeTMVar started -- pingBump pinglogic -- start the ping timer fix $ \loop -> do -- warn $ "read thread: " <> bshow (conkey,inout) mb <- threadsRead new -- pingBump pinglogic warn $ "got: " <> bshow (mb,(conkey,inout)) maybe (return ()) (atomically . forward >=> const loop) mb warn $ "quit-gots: " <> bshow (conkey,inout) #endif return new where announce e = writeTChan (serverEvent server) e handleEOF conkey u mvar newCon = do action <- atomically . foldr1 orElse $ [ takeTMVar mvar >>= id -- passed continuation , connWait newCon >> return eof , connWaitPing newCon >>= return . sendPing -- , pingWait pingTimer >>= return . sendPing ] action :: IO () where eof = do -- warn $ "EOF " <>bshow conkey connCancelPing newCon atomically $ do connFlush newCon announce ((conkey,u),EOF) modifyTVar' (conmap server) $ Map.delete conkey -- warn $ "fin-EOF "<>bshow conkey sendPing PingTimeOut = do {- let me = connPingTimer newCon utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "TIMEOUT " <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} atomically (connClose newCon) eof sendPing PingIdle = do {- let me = connPingTimer newCon utc <- getCurrentTime let utc' = formatTime defaultTimeLocale "%s" utc warn $ "IDLE" <> bshow utc' <> " " <> bshow (pingIdle me, pingTimeOut me) -} atomically $ announce ((conkey,u),RequiresPing) handleEOF conkey u mvar newCon updateConMap conkey u new (ConnectionRecord { ckont=mvar, cstate=replaced, cdata=u0 }) = do new' <- if duplex params then do announce ((conkey,u),EOF) connClose replaced let newcon = SaneConnection new announce $ ((conkey,u),conevent newcon) return $ newcon else case replaced of WriteOnlyConnection w | inout==In -> do let newcon = ConnectionPair new w announce ((conkey,u),conevent newcon) return newcon ReadOnlyConnection r | inout==Out -> do let newcon = ConnectionPair r new announce ((conkey,u),conevent newcon) return newcon _ -> do -- connFlush todo announce ((conkey,u0), EOF) connClose replaced announce ((conkey,u), HalfConnection inout) return $ case inout of In -> ReadOnlyConnection new Out -> WriteOnlyConnection new modifyTVar' (conmap server) $ Map.insert conkey ConnectionRecord { ckont = mvar , cstate = new' , cdata = u } return $ handleEOF conkey u mvar new' acceptLoop server params sock = handle (acceptException server params sock) $ do con <- accept sock (conkey,u) <- makeConnKey params con h <- socketToHandle (fst con) ReadWriteMode newConnection server params conkey u h In acceptLoop server params sock acceptException server params sock ioerror = do sClose sock case show (ioeGetErrorType ioerror) of "resource exhausted" -> do -- try again warn ("acceptLoop: resource exhasted") threadDelay 500000 acceptLoop server params sock "invalid argument" -> do -- quit on closed socket return () message -> do -- unexpected exception warn ("acceptLoop: "<>bshow message) return () getPacket h = do hWaitForInput h (-1) hGetNonBlocking h 1024 -- | 'ConnectionThreads' is an interface to a pair of threads -- that are reading and writing a 'Handle'. data ConnectionThreads = ConnectionThreads { threadsWriter :: TMVar (Maybe ByteString) , threadsChannel :: TChan ByteString , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close , threadsPing :: PingMachine } -- | This spawns the reader and writer threads and returns a newly -- constructed 'ConnectionThreads' object. connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads connectionThreads h pinglogic = do (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan readerThread <- forkIO $ do let finished e = do hClose h -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) -- let _ = fmap ioeGetErrorType e -- type hint let _ = fmap what e where what (SomeException _) = undefined atomically $ do tryTakeTMVar outs putTMVar outs Nothing -- quit writer putTMVar doner () handle (finished . Just) $ do pingBump pinglogic -- start the ping timer fix $ \loop -> do packet <- getPacket h -- warn $ "read: " <> S.take 60 packet atomically $ writeTChan incomming packet pingBump pinglogic -- warn $ "bumped: " <> S.take 60 packet isEof <- liftIO $ hIsEOF h if isEof then finished Nothing else loop writerThread <- forkIO . fix $ \loop -> do let finished = do -- warn $ "finished write" -- hClose h -- quit reader throwTo readerThread (ErrorCall "EOF") atomically $ putTMVar donew () mb <- atomically $ readTMVar outs case mb of Just bs -> handle (\(SomeException e)->finished) (do -- warn $ "writing: " <> S.take 60 bs S.hPutStr h bs -- warn $ "wrote: " <> S.take 60 bs atomically $ takeTMVar outs loop) Nothing -> finished let wait = do readTMVar donew readTMVar doner return () return ConnectionThreads { threadsWriter = outs , threadsChannel = incomming , threadsWait = wait , threadsPing = pinglogic } -- | 'threadsWrite' writes the given 'ByteString' to the -- 'ConnectionThreads' object. It blocks until the ByteString -- is written and 'True' is returned, or the connection is -- interrupted and 'False' is returned. threadsWrite :: ConnectionThreads -> ByteString -> IO Bool threadsWrite c bs = atomically $ orElse (const False `fmap` threadsWait c) (const True `fmap` putTMVar (threadsWriter c) (Just bs)) -- | 'threadsClose' signals for the 'ConnectionThreads' object -- to quit and close the associated 'Handle'. This operation -- is non-blocking, follow it with 'threadsWait' if you want -- to wait for the operation to complete. threadsClose :: ConnectionThreads -> STM () threadsClose c = do let mvar = threadsWriter c v <- tryReadTMVar mvar case v of Just Nothing -> return () -- already closed _ -> putTMVar mvar Nothing -- | 'threadsRead' blocks until a 'ByteString' is available which -- is returned to the caller, or the connection is interrupted and -- 'Nothing' is returned. threadsRead :: ConnectionThreads -> IO (Maybe ByteString) threadsRead c = atomically $ orElse (const Nothing `fmap` threadsWait c) (Just `fmap` readTChan (threadsChannel c)) -- | A 'ConnectionState' is an interface to a single 'ConnectionThreads' -- or to a pair of 'ConnectionThreads' objects that are considered as one -- connection. data ConnectionState = SaneConnection ConnectionThreads -- ^ ordinary read/write connection | WriteOnlyConnection ConnectionThreads | ReadOnlyConnection ConnectionThreads | ConnectionPair ConnectionThreads ConnectionThreads -- ^ Two 'ConnectionThreads' objects, read operations use the -- first, write operations use the second. connWrite :: ConnectionState -> ByteString -> IO Bool connWrite (ReadOnlyConnection _) bs = return False connWrite conn bs = threadsWrite c bs where c = case conn of SaneConnection c -> c WriteOnlyConnection c -> c ConnectionPair _ c -> c mapConn :: Bool -> (ConnectionThreads -> STM ()) -> ConnectionState -> STM () mapConn both action c = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) when both $ action rem connClose :: ConnectionState -> STM () connClose c = mapConn True threadsClose c connWait :: ConnectionState -> STM () connWait c = doit -- mapConn False threadsWait c where action = threadsWait doit = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r WriteOnlyConnection w -> action w ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) threadsClose rem connPingTimer c = case c of SaneConnection rw -> threadsPing rw ReadOnlyConnection r -> threadsPing r WriteOnlyConnection w -> threadsPing w -- should be disabled. ConnectionPair r w -> threadsPing r connCancelPing c = pingCancel (connPingTimer c) connWaitPing c = pingWait (connPingTimer c) connFlush c = case c of SaneConnection rw -> waitChan rw ReadOnlyConnection r -> waitChan r WriteOnlyConnection w -> return () ConnectionPair r w -> waitChan r where waitChan t = do b <- isEmptyTChan (threadsChannel t) when (not b) retry bshow e = S.pack . show $ e warn str = S.hPutStrLn stderr str >> hFlush stderr debugNoise str = return () data PingEvent = PingIdle | PingTimeOut data PingMachine = PingMachine { pingFlag :: TVar Bool , pingInterruptable :: InterruptableDelay , pingEvent :: TMVar PingEvent , pingStarted :: TMVar Bool } pingMachine :: PingInterval -> TimeOut -> IO PingMachine pingMachine idle timeout = do d <- interruptableDelay flag <- atomically $ newTVar False canceled <- atomically $ newTVar False event <- atomically newEmptyTMVar started <- atomically $ newEmptyTMVar when (idle/=0) $ void . forkIO $ do (>>=) (atomically (readTMVar started)) $ flip when $ do fix $ \loop -> do atomically $ writeTVar flag False fin <- startDelay d (1000*idle) (>>=) (atomically (readTMVar started)) $ flip when $ do if (not fin) then loop else do -- Idle event atomically $ do tryTakeTMVar event putTMVar event PingIdle writeTVar flag True fin <- startDelay d (1000*timeout) (>>=) (atomically (readTMVar started)) $ flip when $ do me <- myThreadId if (not fin) then loop else do -- Timeout event atomically $ do tryTakeTMVar event writeTVar flag False putTMVar event PingTimeOut return PingMachine { pingFlag = flag , pingInterruptable = d , pingEvent = event , pingStarted = started } pingCancel :: PingMachine -> IO () pingCancel me = do atomically $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) False interruptDelay (pingInterruptable me) pingBump :: PingMachine -> IO () pingBump me = do atomically $ do b <- tryReadTMVar (pingStarted me) when (b/=Just False) $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) True interruptDelay (pingInterruptable me) pingWait :: PingMachine -> STM PingEvent pingWait me = takeTMVar (pingEvent me) data InterruptableDelay = InterruptableDelay { delayThread :: TMVar ThreadId } interruptableDelay :: IO InterruptableDelay interruptableDelay = do fmap InterruptableDelay $ atomically newEmptyTMVar startDelay :: InterruptableDelay -> Microseconds -> IO Bool startDelay d interval = do thread <- myThreadId handle (\(ErrorCall _)-> do debugNoise $ "delay interrupted" return False) $ do atomically $ putTMVar (delayThread d) thread threadDelay interval void . atomically $ takeTMVar (delayThread d) return True interruptDelay :: InterruptableDelay -> IO () interruptDelay d = do mthread <- atomically $ do tryTakeTMVar (delayThread d) flip (maybe $ return ()) mthread $ \thread -> do throwTo thread (ErrorCall "Interrupted delay")