summaryrefslogtreecommitdiff
path: root/src/Network/StreamServer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/StreamServer.hs')
-rw-r--r--src/Network/StreamServer.hs154
1 files changed, 0 insertions, 154 deletions
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs
deleted file mode 100644
index 80ed4ee2..00000000
--- a/src/Network/StreamServer.hs
+++ /dev/null
@@ -1,154 +0,0 @@
1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE TypeFamilies #-}
4{-# LANGUAGE TypeOperators #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7module Network.StreamServer
8 ( streamServer
9 , ServerHandle
10 , ServerConfig(..)
11 , withSession
12 , quitListening
13 , dummyServerHandle
14 , listenSocket
15 ) where
16
17import Data.Monoid
18import Network.Socket as Socket
19import System.Directory (removeFile)
20import System.IO
21 ( IOMode(..)
22 , stderr
23 , hFlush
24 )
25import Control.Monad
26import Control.Monad.Fix (fix)
27#ifdef THREAD_DEBUG
28import Control.Concurrent.Lifted.Instrument
29 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId
30 , killThread )
31#else
32import GHC.Conc (labelThread)
33import Control.Concurrent
34 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId
35 , killThread )
36#endif
37import Control.Exception (handle,finally)
38import System.IO.Error (tryIOError)
39import System.Mem.Weak
40import System.IO.Error
41
42-- import Data.Conduit
43import System.IO (Handle)
44import Control.Concurrent.MVar (newMVar)
45
46import Network.SocketLike
47import DPut
48import DebugTag
49
50data ServerHandle = ServerHandle Socket (Weak ThreadId)
51
52listenSocket :: ServerHandle -> RestrictedSocket
53listenSocket (ServerHandle sock _) = restrictSocket sock
54
55-- | Create a useless do-nothing 'ServerHandle'.
56dummyServerHandle :: IO ServerHandle
57dummyServerHandle = do
58 mvar <- newMVar Closed
59 let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
60 thread <- mkWeakThreadId <=< forkIO $ return ()
61 return (ServerHandle sock thread)
62
63removeSocketFile :: SockAddr -> IO ()
64removeSocketFile (SockAddrUnix fname) = removeFile fname
65removeSocketFile _ = return ()
66
67-- | Terminate the server accept-loop. Call this to shut down the server.
68quitListening :: ServerHandle -> IO ()
69quitListening (ServerHandle socket acceptThread) =
70 finally (Socket.getSocketName socket >>= removeSocketFile)
71 (do mapM_ killThread =<< deRefWeak acceptThread
72 Socket.close socket)
73
74
75-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
76-- variation. (This is not exported.)
77bshow :: Show a => a -> String
78bshow e = show e
79
80-- | Send a string to stderr. Not exported. Default 'serverWarn' when
81-- 'withSession' is used to configure the server.
82warnStderr :: String -> IO ()
83warnStderr str = dput XMisc str >> hFlush stderr
84
85data ServerConfig = ServerConfig
86 { serverWarn :: String -> IO ()
87 -- ^ Action to report warnings and errors.
88 , serverSession :: RestrictedSocket -> Int -> Handle -> IO ()
89 -- ^ Action to handle interaction with a client
90 }
91
92-- | Initialize a 'ServerConfig' using the provided session handler.
93withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig
94withSession session = ServerConfig warnStderr session
95
96-- | Launch a thread to listen at the given bind address and dispatch
97-- to session handler threads on every incoming connection. Supports
98-- IPv4 and IPv6, TCP and unix sockets.
99--
100-- The returned handle can be used with 'quitListening' to terminate the
101-- thread and prevent any new sessions from starting. Currently active
102-- session threads will not be terminated or signaled in any way.
103streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle
104streamServer cfg addrs = do
105 let warn = serverWarn cfg
106 family = case addrs of
107 SockAddrInet {}:_ -> AF_INET
108 SockAddrInet6 {}:_ -> AF_INET6
109 SockAddrUnix {}:_ -> AF_UNIX
110 [] -> AF_INET6
111 sock <- socket family Stream 0
112 setSocketOption sock ReuseAddr 1
113 let tryBind addr next _ = do
114 tryIOError (removeSocketFile addr)
115 bind sock addr
116 `catchIOError` \e -> next (Just e)
117 fix $ \loop -> let again mbe = do
118 forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e
119 threadDelay 5000000
120 loop
121 in foldr tryBind again addrs Nothing
122 listen sock maxListenQueue
123 thread <- mkWeakThreadId <=< forkIO $ do
124 myThreadId >>= flip labelThread "StreamServer.acceptLoop"
125 acceptLoop cfg sock 0
126 return (ServerHandle sock thread)
127
128-- | Not exported. This, combined with 'acceptException' form a mutually
129-- recursive loop that handles incoming connections. To quit the loop, the
130-- socket must be closed by 'quitListening'.
131acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
132acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
133 con <- accept sock
134 let conkey = n + 1
135 h <- socketToHandle (fst con) ReadWriteMode
136 forkIO $ do
137 myThreadId >>= flip labelThread "StreamServer.session"
138 serverSession cfg (restrictHandleSocket h (fst con)) conkey h
139 acceptLoop cfg sock (n + 1)
140
141acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
142acceptException cfg n sock ioerror = do
143 Socket.close sock
144 case show (ioeGetErrorType ioerror) of
145 "resource exhausted" -> do -- try again
146 serverWarn cfg $ ("acceptLoop: resource exhasted")
147 threadDelay 500000
148 acceptLoop cfg sock (n + 1)
149 "invalid argument" -> do -- quit on closed socket
150 return ()
151 message -> do -- unexpected exception
152 serverWarn cfg $ ("acceptLoop: "<>bshow message)
153 return ()
154