From d4288f5a9f87e3889a50a347ebad0a812f52938c Mon Sep 17 00:00:00 2001 From: joe Date: Sun, 12 Nov 2017 18:33:51 -0500 Subject: Updated Server module that layers on StreamServer. --- Presence/Server.hs | 141 ++++++++++++++++++++++++----------------------------- 1 file changed, 65 insertions(+), 76 deletions(-) (limited to 'Presence/Server.hs') diff --git a/Presence/Server.hs b/Presence/Server.hs index f7f99907..a621134d 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs @@ -4,16 +4,17 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE RankNTypes #-} ----------------------------------------------------------------------------- -- | -- Module : Server -- -- Maintainer : joe@jerkface.net -- Stability : experimental --- +-- -- A TCP client/server library. -- --- TODO: XXX: A newer version of this code is in the server.git repo. XXX +-- TODO: -- -- * interface tweaks -- @@ -39,7 +40,7 @@ import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,E import Control.Monad import Control.Monad.Fix -- import Control.Monad.STM -import Control.Monad.Trans.Resource +-- import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) import System.IO.Error (ioeGetErrorType,isDoesNotExistError) import System.IO @@ -53,18 +54,20 @@ import System.IO , stdout , Handle , hFlush + , hPutStrLn ) -import Network.Socket -import Network.BSD +import Network.Socket as Socket +import Network.BSD ( getProtocolNumber - ) + ) import Debug.Trace import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) import Data.Time.Format (formatTime) -import SockAddr () +-- import SockAddr () -- import System.Locale (defaultTimeLocale) -todo = error "unimplemented" +import Network.StreamServer +import Network.SocketLike hiding (sClose) type Microseconds = Int type Miliseconds = Int @@ -72,12 +75,12 @@ type TimeOut = Miliseconds type PingInterval = Miliseconds -- | This object is passed with the 'Listen' and 'Connect' --- instructions in order to control the behavior of the +-- instructions in order to control the behavior of the -- connections that are established. It is parameterized -- by a user-suplied type @conkey@ that is used as a lookup -- key for connections. data ConnectionParameters conkey u = - ConnectionParameters + ConnectionParameters { pingInterval :: PingInterval -- ^ The miliseconds of idle to allow before a 'RequiresPing' -- event is signaled. @@ -85,7 +88,7 @@ data ConnectionParameters conkey u = -- ^ The miliseconds of idle after 'RequiresPing' is signaled -- that are necessary for the connection to be considered -- lost and signalling 'EOF'. - , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u) + , makeConnKey :: RestrictedSocket -> IO (conkey,u) -- ^ This action creates a lookup key for a new connection. If 'duplex' -- is 'True' and the result is already assocatied with an established -- connection, then an 'EOF' will be forced before the the new @@ -111,7 +114,7 @@ data ConnectionParameters conkey u = -- * 'duplex' = True -- connectionDefaults - :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u + :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u connectionDefaults f = ConnectionParameters { pingInterval = 28000 , timeout = 2000 @@ -140,9 +143,9 @@ data ServerInstruction conkey u -- ^ send bytes to an established connection #ifdef TEST -deriving instance Show conkey => Show (ServerInstruction conkey) +deriving instance Show conkey => Show (ServerInstruction conkey u) instance Show (a -> b) where show _ = "" -deriving instance Show conkey => Show (ConnectionParameters conkey) +deriving instance Show conkey => Show (ConnectionParameters conkey u) #endif -- | This type specifies which which half of a half-duplex @@ -185,17 +188,22 @@ data ConnectionRecord u -- | This object accepts commands and signals events and maintains -- the list of currently listening ports and established connections. -data Server a u +data Server a u releaseKey = Server { serverCommand :: TMVar (ServerInstruction a u) , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) - , serverReleaseKey :: ReleaseKey + , serverReleaseKey :: releaseKey , conmap :: TVar (Map a (ConnectionRecord u)) - , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) + , listenmap :: TVar (Map PortNumber ServerHandle) , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) } control sv = atomically . putTMVar (serverCommand sv) +type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) + +noCleanUp :: MonadIO m => Allocate () m +noCleanUp io _ = ( (,) () ) `liftM` liftIO io + -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' -- to ensure proper cleanup. For example, -- @@ -207,7 +215,7 @@ control sv = atomically . putTMVar (serverCommand sv) -- > import Control.Concurrent.STM.TChan (readTChan) -- > -- > main = runResourceT $ do --- > sv <- server +-- > sv <- server allocate -- > let params = connectionDefaults (return . snd) -- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) -- > let loop = do @@ -218,8 +226,15 @@ control sv = atomically . putTMVar (serverCommand sv) -- > case event of EOF -> return () -- > _ -> loop -- > liftIO loop -server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u) -server = do +-- +-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp' +-- to do without automatic cleanup and be sure to remember to write 'Quit' to +-- the 'serverCommand' variable. +server :: + -- forall (m :: * -> *) a u conkey releaseKey. + (Show conkey, MonadIO m, Ord conkey) => + Allocate releaseKey m -> m (Server conkey u releaseKey) +server allocate = do (key,cmds) <- allocate (atomically newEmptyTMVar) (atomically . flip putTMVar Quit) server <- liftIO . atomically $ do @@ -245,9 +260,9 @@ server = do _ -> again return server where - closeAll server = liftIO $ do + closeAll server = do listening <- atomically . readTVar $ listenmap server - mapM_ killListener (Map.elems listening) + mapM_ quitListening (Map.elems listening) let stopRetry (v,d) = do atomically $ writeTVar v False interruptDelay d retriers <- atomically $ do @@ -261,38 +276,36 @@ server = do atomically $ writeTVar (conmap server) Map.empty - doit server (Listen port params) = liftIO $ do + doit server (Listen port params) = do listening <- Map.member port `fmap` atomically (readTVar $ listenmap server) when (not listening) $ do let family = AF_INET6 - - sock <- socket family Stream 0 - setSocketOption sock ReuseAddr 1 let address = case family of AF_INET -> SockAddrInet port iNADDR_ANY AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 - fix $ \loop -> do - handle (\(SomeException e)-> do - warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e - threadDelay 5000000 - loop) - $ bindSocket sock address - listen sock 2 - thread <- forkIO $ acceptLoop server params sock - atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock) - - doit server (Ignore port) = liftIO $ do + + sserv <- flip streamServer address ServerConfig + { serverWarn = hPutStrLn stderr + , serverSession = \sock _ h -> do + (conkey,u) <- makeConnKey params sock + _ <- newConnection server params conkey u h In + return () + } + + atomically $ listenmap server `modifyTVar'` Map.insert port sserv + + doit server (Ignore port) = do mb <- atomically $ do map <- readTVar $ listenmap server modifyTVar' (listenmap server) $ Map.delete port return $ Map.lookup port map - maybe (return ()) killListener $ mb + maybe (return ()) quitListening $ mb - doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do + doit server (Send con bs) = do -- . void . forkIO $ do map <- atomically $ readTVar (conmap server) let post False = (trace ("cant send: "++show bs) $ return ()) post True = return () @@ -300,7 +313,7 @@ server = do (post <=< flip connWrite bs . cstate) $ Map.lookup con map - doit server (Connect addr params) = liftIO $ do + doit server (Connect addr params) = do mb <- atomically $ do rmap <- readTVar (retrymap server) return $ Map.lookup addr rmap @@ -316,20 +329,19 @@ server = do handle (\e -> do -- let t = ioeGetErrorType e when (isDoesNotExistError e) $ return () -- warn "GOTCHA" -- warn $ "connect-error: " <> bshow e - (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? - sClose sock + (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? + Socket.close sock atomically $ writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr)) $ do connect sock addr - me <- getSocketName sock - (conkey,u) <- makeConnKey params (sock,me) + (conkey,u) <- makeConnKey params (restrictSocket sock) h <- socketToHandle sock ReadWriteMode newConnection server params conkey u h Out return () - doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do + doit server (ConnectWithEndlessRetry addr params interval) = do proto <- getProtocolNumber "tcp" void . forkIO $ do resultVar <- atomically newEmptyTMVar @@ -356,8 +368,8 @@ server = do -- warn $ "connect-error: " <> bshow e -- Weird hack: puting the would-be peer address -- instead of local socketName - (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? - sClose sock + (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? + Socket.close sock atomically $ do writeTChan (serverEvent server) $ ((conkey,u),ConnectFailure addr) @@ -365,8 +377,7 @@ server = do putTMVar resultVar retry) $ do connect sock addr - me <- getSocketName sock - (conkey,u) <- makeConnKey params (sock,me) + (conkey,u) <- makeConnKey params (restrictSocket sock) h <- socketToHandle sock ReadWriteMode threads <- newConnection server params conkey u h Out atomically $ do threadsWait threads @@ -416,8 +427,6 @@ socketFamily (SockAddrInet _ _) = AF_INET socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 socketFamily (SockAddrUnix _) = AF_UNIX -killListener (thread,sock) = do sClose sock - -- killThread thread conevent con = Connection pingflag read write @@ -561,26 +570,6 @@ newConnection server params conkey u h inout = do , cdata = u } return $ handleEOF conkey u mvar new' -acceptLoop server params sock = handle (acceptException server params sock) $ do - con <- accept sock - (conkey,u) <- makeConnKey params con - h <- socketToHandle (fst con) ReadWriteMode - newConnection server params conkey u h In - acceptLoop server params sock - -acceptException server params sock ioerror = do - sClose sock - case show (ioeGetErrorType ioerror) of - "resource exhausted" -> do -- try again - warn ("acceptLoop: resource exhasted") - threadDelay 500000 - acceptLoop server params sock - "invalid argument" -> do -- quit on closed socket - return () - message -> do -- unexpected exception - warn ("acceptLoop: "<>bshow message) - return () - getPacket h = do hWaitForInput h (-1) @@ -622,7 +611,7 @@ connectionThreads h pinglogic = do atomically $ writeTChan incomming packet pingBump pinglogic -- warn $ "bumped: " <> S.take 60 packet - isEof <- liftIO $ hIsEOF h + isEof <- hIsEOF h if isEof then finished Nothing else loop writerThread <- forkIO . fix $ \loop -> do @@ -710,7 +699,7 @@ mapConn both action c = ConnectionPair r w -> do rem <- orElse (const w `fmap` action r) (const r `fmap` action w) - when both $ action rem + when both $ action rem connClose :: ConnectionState -> STM () connClose c = mapConn True threadsClose c @@ -719,7 +708,7 @@ connWait :: ConnectionState -> STM () connWait c = doit -- mapConn False threadsWait c where action = threadsWait - doit = + doit = case c of SaneConnection rw -> action rw ReadOnlyConnection r -> action r @@ -821,7 +810,7 @@ pingWait :: PingMachine -> STM PingEvent pingWait me = takeTMVar (pingEvent me) -data InterruptableDelay = InterruptableDelay +data InterruptableDelay = InterruptableDelay { delayThread :: TMVar ThreadId } @@ -835,7 +824,7 @@ startDelay d interval = do thread <- myThreadId handle (\(ErrorCall _)-> do debugNoise $ "delay interrupted" - return False) $ do + return False) $ do atomically $ putTMVar (delayThread d) thread threadDelay interval void . atomically $ takeTMVar (delayThread d) -- cgit v1.2.3