-- | This module implements a bare-bones TCP or Unix socket server. {-# LANGUAGE CPP #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} module Network.StreamServer ( streamServer , ServerHandle , ServerConfig(..) , withSession , quitListening , dummyServerHandle ) where import Data.Monoid import Network.Socket as Socket import Data.ByteString.Char8 ( hGetNonBlocking ) import qualified Data.ByteString.Char8 as S ( hPutStrLn ) import System.Directory (removeFile) import System.IO ( IOMode(..) , hSetBuffering , BufferMode(..) , hWaitForInput , hClose , hIsEOF , hPutStrLn , stderr , hFlush ) import Control.Monad import Control.Monad.Fix (fix) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId , killThread ) #else import GHC.Conc (labelThread) import Control.Concurrent ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId , killThread ) #endif import Control.Exception (catch,handle,try,finally) import System.IO.Error (tryIOError) import System.Mem.Weak import System.IO.Error -- import Data.Conduit import Control.Monad.IO.Class (MonadIO (liftIO)) import qualified Data.ByteString as S (ByteString) import System.IO (Handle) import Control.Concurrent.MVar (newMVar) import Network.SocketLike data ServerHandle = ServerHandle Socket (Weak ThreadId) -- | Create a useless do-nothing 'ServerHandle'. dummyServerHandle :: IO ServerHandle dummyServerHandle = do mvar <- newMVar Closed let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar thread <- mkWeakThreadId <=< forkIO $ return () return (ServerHandle sock thread) removeSocketFile :: SockAddr -> IO () removeSocketFile (SockAddrUnix fname) = removeFile fname removeSocketFile _ = return () -- | Terminate the server accept-loop. Call this to shut down the server. quitListening :: ServerHandle -> IO () quitListening (ServerHandle socket acceptThread) = finally (Socket.getSocketName socket >>= removeSocketFile) (do mapM_ killThread =<< deRefWeak acceptThread Socket.close socket) -- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString' -- variation. (This is not exported.) bshow :: Show a => a -> String bshow e = show e -- | Send a string to stderr. Not exported. Default 'serverWarn' when -- 'withSession' is used to configure the server. warnStderr :: String -> IO () warnStderr str = hPutStrLn stderr str >> hFlush stderr data ServerConfig = ServerConfig { serverWarn :: String -> IO () -- ^ Action to report warnings and errors. , serverSession :: RestrictedSocket -> Int -> Handle -> IO () -- ^ Action to handle interaction with a client } -- | Initialize a 'ServerConfig' using the provided session handler. withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig withSession session = ServerConfig warnStderr session -- | Launch a thread to listen at the given bind address and dispatch -- to session handler threads on every incoming connection. Supports -- IPv4 and IPv6, TCP and unix sockets. -- -- The returned handle can be used with 'quitListening' to terminate the -- thread and prevent any new sessions from starting. Currently active -- session threads will not be terminated or signaled in any way. streamServer :: ServerConfig -> SockAddr -> IO ServerHandle streamServer cfg addr = do let warn = serverWarn cfg family = case addr of SockAddrInet {} -> AF_INET SockAddrInet6 {} -> AF_INET6 SockAddrUnix {} -> AF_UNIX sock <- socket family Stream 0 setSocketOption sock ReuseAddr 1 fix $ \loop -> tryIOError (removeSocketFile addr) >> bind sock addr `catchIOError` \e -> do warn $ "bind-error: " <> bshow addr <> " " <> bshow e threadDelay 5000000 loop listen sock maxListenQueue thread <- mkWeakThreadId <=< forkIO $ do myThreadId >>= flip labelThread "StreamServer.acceptLoop" acceptLoop cfg sock 0 return (ServerHandle sock thread) -- | Not exported. This, combined with 'acceptException' form a mutually -- recursive loop that handles incoming connections. To quit the loop, the -- socket must be closed by 'quitListening'. acceptLoop :: ServerConfig -> Socket -> Int -> IO () acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do con <- accept sock let conkey = n + 1 h <- socketToHandle (fst con) ReadWriteMode forkIO $ do myThreadId >>= flip labelThread "StreamServer.session" serverSession cfg (restrictHandleSocket h (fst con)) conkey h acceptLoop cfg sock (n + 1) acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () acceptException cfg n sock ioerror = do Socket.close sock case show (ioeGetErrorType ioerror) of "resource exhausted" -> do -- try again serverWarn cfg $ ("acceptLoop: resource exhasted") threadDelay 500000 acceptLoop cfg sock (n + 1) "invalid argument" -> do -- quit on closed socket return () message -> do -- unexpected exception serverWarn cfg $ ("acceptLoop: "<>bshow message) return ()