summaryrefslogtreecommitdiff
path: root/src/Network/StreamServer.hs
blob: a6cead0ed0cc952964fb1eb525425faccdd37f43 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
-- | This module implements a bare-bones TCP or Unix socket server.
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
module Network.StreamServer
    ( streamServer
    , ServerHandle
    , ServerConfig(..)
    , withSession
    , quitListening
    , dummyServerHandle
    ) where

import Data.Monoid
import Network.Socket as Socket
import Data.ByteString.Char8
    ( hGetNonBlocking
    )
import qualified Data.ByteString.Char8 as S
    ( hPutStrLn
    )
import System.Directory (removeFile)
import System.IO
    ( IOMode(..)
    , hSetBuffering
    , BufferMode(..)
    , hWaitForInput
    , hClose
    , hIsEOF
    , hPutStrLn
    , stderr
    , hFlush
    )
import Control.Monad
import Control.Monad.Fix (fix)
import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId)
import Control.Exception (catch,handle,try,finally)
import System.IO.Error (tryIOError)
import System.Mem.Weak
import System.IO.Error

-- import Data.Conduit
import Control.Monad.IO.Class (MonadIO (liftIO))
import qualified Data.ByteString as S (ByteString)
import System.IO (Handle)
import Control.Concurrent.MVar (newMVar)

import Network.SocketLike

data ServerHandle = ServerHandle Socket (Weak ThreadId)


-- | Create a useless do-nothing 'ServerHandle'.
dummyServerHandle :: IO ServerHandle
dummyServerHandle = do
    mvar <- newMVar Closed
    let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
    thread <- mkWeakThreadId <=< forkIO $ return ()
    return (ServerHandle sock thread)

removeSocketFile :: SockAddr -> IO ()
removeSocketFile (SockAddrUnix fname) = removeFile fname
removeSocketFile _                    = return ()

-- | Terminate the server accept-loop.  Call this to shut down the server.
quitListening :: ServerHandle -> IO ()
quitListening (ServerHandle socket _) =
    finally (Socket.getSocketName socket >>= removeSocketFile)
            (Socket.close socket)


-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
-- variation.  (This is not exported.)
bshow :: Show a => a -> String
bshow e = show e

-- | Send a string to stderr.  Not exported.  Default 'serverWarn' when
-- 'withSession' is used to configure the server.
warnStderr :: String -> IO ()
warnStderr str = hPutStrLn stderr str >> hFlush stderr

data ServerConfig = ServerConfig
    { serverWarn    :: String -> IO ()
    -- ^ Action to report warnings and errors.
    , serverSession :: RestrictedSocket -> Int -> Handle -> IO ()
    -- ^ Action to handle interaction with a client
    }

-- | Initialize a 'ServerConfig' using the provided session handler.
withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig
withSession session = ServerConfig warnStderr session

-- | Launch a thread to listen at the given bind address and dispatch
-- to session handler threads on every incomming connection. Supports
-- IPv4 and IPv6, TCP and unix sockets.
--
-- The returned handle can be used with 'quitListening' to terminate the
-- thread and prevent any new sessions from starting.  Currently active
-- session threads will not be terminated or signaled in any way.
streamServer :: ServerConfig -> SockAddr -> IO ServerHandle
streamServer cfg addr = do
    let warn   = serverWarn cfg
        family = case addr of
                    SockAddrInet  {} -> AF_INET
                    SockAddrInet6 {} -> AF_INET6
                    SockAddrUnix  {} -> AF_UNIX
    sock <- socket family Stream 0
    setSocketOption sock ReuseAddr 1
    fix $ \loop ->
        tryIOError (removeSocketFile addr) >> bind sock addr
        `catchIOError` \e -> do warn $ "bind-error: " <> bshow addr <> " " <> bshow e
                                threadDelay 5000000
                                loop
    listen sock maxListenQueue
    thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0
    return (ServerHandle sock thread)

-- | Not exported.  This, combined with 'acceptException' form a mutually recursive
-- loop that handles incomming connections.  To quit the loop, the socket must be
-- closed by 'quitListening'.
acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
    con <- accept sock
    let conkey = n + 1
    h <- socketToHandle (fst con) ReadWriteMode
    forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h
    acceptLoop cfg sock (n + 1)

acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
acceptException cfg n sock ioerror = do
    Socket.close sock
    case show (ioeGetErrorType ioerror) of
      "resource exhausted" -> do -- try again
                                 serverWarn cfg $ ("acceptLoop: resource exhasted")
                                 threadDelay 500000
                                 acceptLoop cfg sock (n + 1)
      "invalid argument"   -> do -- quit on closed socket
                                 return ()
      message              -> do -- unexpected exception
                                 serverWarn cfg $ ("acceptLoop: "<>bshow message)
                                 return ()