From 66b7039f00faa00055353f99b8dfeee77694ae1c Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 13 Nov 2017 10:14:57 -0500 Subject: Consolidated redundant implementations of InterruptibleDelay. --- Presence/Server.hs | 123 ++++++++++++++++++++++++----------------------------- 1 file changed, 56 insertions(+), 67 deletions(-) (limited to 'Presence/Server.hs') diff --git a/Presence/Server.hs b/Presence/Server.hs index a621134d..f82a93c5 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs @@ -1,10 +1,11 @@ {-# OPTIONS_HADDOCK prune #-} {-# LANGUAGE CPP #-} -{-# LANGUAGE StandaloneDeriving #-} +{-# LANGUAGE DoAndIfThenElse #-} +{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TupleSections #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE RankNTypes #-} ----------------------------------------------------------------------------- -- | -- Module : Server @@ -18,7 +19,6 @@ -- -- * interface tweaks -- -{-# LANGUAGE DoAndIfThenElse #-} module Server where import Data.ByteString (ByteString,hGetNonBlocking) @@ -36,14 +36,14 @@ import Control.Concurrent.STM -- import Control.Concurrent.STM.TMVar -- import Control.Concurrent.STM.TChan -- import Control.Concurrent.STM.Delay -import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) +import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..)) import Control.Monad import Control.Monad.Fix -- import Control.Monad.STM -- import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) -import System.IO.Error (ioeGetErrorType,isDoesNotExistError) -import System.IO +import System.IO.Error (isDoesNotExistError) +import System.IO ( IOMode(..) , hSetBuffering , BufferMode(..) @@ -51,7 +51,6 @@ import System.IO , hClose , hIsEOF , stderr - , stdout , Handle , hFlush , hPutStrLn @@ -61,11 +60,11 @@ import Network.BSD ( getProtocolNumber ) import Debug.Trace -import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) -import Data.Time.Format (formatTime) +import Data.Time.Clock (getCurrentTime,diffUTCTime) -- import SockAddr () -- import System.Locale (defaultTimeLocale) +import InterruptibleDelay import Network.StreamServer import Network.SocketLike hiding (sClose) @@ -150,7 +149,7 @@ deriving instance Show conkey => Show (ConnectionParameters conkey u) -- | This type specifies which which half of a half-duplex -- connection is of interest. -data InOrOut = In | Out +data InOrOut = In | Out deriving (Enum,Eq,Ord,Show,Read) -- | These events may be read from 'serverEvent' TChannel. @@ -181,22 +180,23 @@ deriving instance Eq b => Eq (ConnectionEvent b) -- | This is the per-connection state. data ConnectionRecord u - = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler - , cstate :: ConnectionState -- ^ used to send/receive data to the connection - , cdata :: u -- ^ user data, stored in the connection map for convenience + = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler + , cstate :: ConnectionState -- ^ used to send/receive data to the connection + , cdata :: u -- ^ user data, stored in the connection map for convenience } -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. data Server a u releaseKey - = Server { serverCommand :: TMVar (ServerInstruction a u) - , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) + = Server { serverCommand :: TMVar (ServerInstruction a u) + , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) , serverReleaseKey :: releaseKey - , conmap :: TVar (Map a (ConnectionRecord u)) - , listenmap :: TVar (Map PortNumber ServerHandle) - , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) + , conmap :: TVar (Map a (ConnectionRecord u)) + , listenmap :: TVar (Map PortNumber ServerHandle) + , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) } +control :: Server a u releaseKey -> ServerInstruction a u -> IO () control sv = atomically . putTMVar (serverCommand sv) type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) @@ -213,7 +213,7 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io -- > import Control.Monad.STM (atomically) -- > import Control.Concurrent.STM.TMVar (putTMVar) -- > import Control.Concurrent.STM.TChan (readTChan) --- > +-- > -- > main = runResourceT $ do -- > sv <- server allocate -- > let params = connectionDefaults (return . snd) @@ -299,7 +299,7 @@ server allocate = do atomically $ listenmap server `modifyTVar'` Map.insert port sserv doit server (Ignore port) = do - mb <- atomically $ do + mb <- atomically $ do map <- readTVar $ listenmap server modifyTVar' (listenmap server) $ Map.delete port return $ Map.lookup port map @@ -322,7 +322,7 @@ server allocate = do interruptDelay d when (not b) forkit) mb - where + where forkit = void . forkIO $ do proto <- getProtocolNumber "tcp" sock <- socket (socketFamily addr) Stream proto @@ -331,8 +331,8 @@ server allocate = do -- warn $ "connect-error: " <> bshow e (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? Socket.close sock - atomically - $ writeTChan (serverEvent server) + atomically + $ writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr)) $ do connect sock addr @@ -345,7 +345,7 @@ server allocate = do proto <- getProtocolNumber "tcp" void . forkIO $ do resultVar <- atomically newEmptyTMVar - timer <- interruptableDelay + timer <- interruptibleDelay (retryVar,action) <- atomically $ do let noop = return () map <- readTVar (retrymap server) @@ -371,7 +371,7 @@ server allocate = do (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? Socket.close sock atomically $ do - writeTChan (serverEvent server) + writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr) retry <- readTVar retryVar putTMVar resultVar retry) @@ -396,7 +396,7 @@ server allocate = do -- INTERNAL ---------------------------------------------------------- - + {- hWriteUntilNothing h outs = fix $ \loop -> do @@ -423,18 +423,27 @@ connRead conn = do -- discardContents (threadsChannel w) return c +socketFamily :: SockAddr -> Family socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 socketFamily (SockAddrUnix _) = AF_UNIX - +conevent :: ConnectionState -> ConnectionEvent b conevent con = Connection pingflag read write where pingflag = swapTVar (pingFlag (connPingTimer con)) False read = connRead con write = connWrite con +newConnection :: Ord a + => Server a u1 releaseKey + -> ConnectionParameters conkey u + -> a + -> u1 + -> Handle + -> InOrOut + -> IO ConnectionThreads newConnection server params conkey u h inout = do hSetBuffering h NoBuffering let (forward,idle_ms,timeout_ms) = @@ -571,7 +580,7 @@ newConnection server params conkey u h inout = do return $ handleEOF conkey u mvar new' - +getPacket :: Handle -> IO ByteString getPacket h = do hWaitForInput h (-1) hGetNonBlocking h 1024 @@ -718,6 +727,7 @@ connWait c = doit -- mapConn False threadsWait c (const r `fmap` action w) threadsClose rem +connPingTimer :: ConnectionState -> PingMachine connPingTimer c = case c of SaneConnection rw -> threadsPing rw @@ -725,10 +735,13 @@ connPingTimer c = WriteOnlyConnection w -> threadsPing w -- should be disabled. ConnectionPair r w -> threadsPing r +connCancelPing :: ConnectionState -> IO () connCancelPing c = pingCancel (connPingTimer c) -connWaitPing c = pingWait (connPingTimer c) +connWaitPing :: ConnectionState -> STM PingEvent +connWaitPing c = pingWait (connPingTimer c) +connFlush :: ConnectionState -> STM () connFlush c = case c of SaneConnection rw -> waitChan rw @@ -740,23 +753,28 @@ connFlush c = b <- isEmptyTChan (threadsChannel t) when (not b) retry +bshow :: Show a => a -> ByteString bshow e = S.pack . show $ e + +warn :: ByteString -> IO () warn str = S.hPutStrLn stderr str >> hFlush stderr + +debugNoise :: Monad m => t -> m () debugNoise str = return () data PingEvent = PingIdle | PingTimeOut data PingMachine = PingMachine - { pingFlag :: TVar Bool - , pingInterruptable :: InterruptableDelay - , pingEvent :: TMVar PingEvent - , pingStarted :: TMVar Bool + { pingFlag :: TVar Bool + , pingInterruptible :: InterruptibleDelay + , pingEvent :: TMVar PingEvent + , pingStarted :: TMVar Bool } pingMachine :: PingInterval -> TimeOut -> IO PingMachine pingMachine idle timeout = do - d <- interruptableDelay + d <- interruptibleDelay flag <- atomically $ newTVar False canceled <- atomically $ newTVar False event <- atomically newEmptyTMVar @@ -786,7 +804,7 @@ pingMachine idle timeout = do putTMVar event PingTimeOut return PingMachine { pingFlag = flag - , pingInterruptable = d + , pingInterruptible = d , pingEvent = event , pingStarted = started } @@ -795,7 +813,7 @@ pingCancel :: PingMachine -> IO () pingCancel me = do atomically $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) False - interruptDelay (pingInterruptable me) + interruptDelay (pingInterruptible me) pingBump :: PingMachine -> IO () pingBump me = do @@ -804,37 +822,8 @@ pingBump me = do when (b/=Just False) $ do tryTakeTMVar (pingStarted me) putTMVar (pingStarted me) True - interruptDelay (pingInterruptable me) + interruptDelay (pingInterruptible me) pingWait :: PingMachine -> STM PingEvent pingWait me = takeTMVar (pingEvent me) - -data InterruptableDelay = InterruptableDelay - { delayThread :: TMVar ThreadId - } - -interruptableDelay :: IO InterruptableDelay -interruptableDelay = do - fmap InterruptableDelay - $ atomically newEmptyTMVar - -startDelay :: InterruptableDelay -> Microseconds -> IO Bool -startDelay d interval = do - thread <- myThreadId - handle (\(ErrorCall _)-> do - debugNoise $ "delay interrupted" - return False) $ do - atomically $ putTMVar (delayThread d) thread - threadDelay interval - void . atomically $ takeTMVar (delayThread d) - return True - -interruptDelay :: InterruptableDelay -> IO () -interruptDelay d = do - mthread <- atomically $ do - tryTakeTMVar (delayThread d) - flip (maybe $ return ()) mthread $ \thread -> do - throwTo thread (ErrorCall "Interrupted delay") - - -- cgit v1.2.3