summaryrefslogtreecommitdiff
path: root/dht/src/Network/StreamServer.hs
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 15:35:23 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-03 17:26:06 -0500
commit31b799222cb76cd0002d9a3cc5b340a7b6fed139 (patch)
tree8b834e455529fb270375e4967d1acad56553544f /dht/src/Network/StreamServer.hs
parent1e03ed3670a8386ede93a09fa0c67785e7da6478 (diff)
server library.
Diffstat (limited to 'dht/src/Network/StreamServer.hs')
-rw-r--r--dht/src/Network/StreamServer.hs167
1 files changed, 0 insertions, 167 deletions
diff --git a/dht/src/Network/StreamServer.hs b/dht/src/Network/StreamServer.hs
deleted file mode 100644
index 1da612ce..00000000
--- a/dht/src/Network/StreamServer.hs
+++ /dev/null
@@ -1,167 +0,0 @@
1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
3{-# LANGUAGE TypeFamilies #-}
4{-# LANGUAGE TypeOperators #-}
5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7module Network.StreamServer
8 ( streamServer
9 , ServerHandle
10 , getAcceptLoopThreadId
11 , ServerConfig(..)
12 , withSession
13 , quitListening
14 --, dummyServerHandle
15 , listenSocket
16 , Local(..)
17 , Remote(..)
18 ) where
19
20import Data.Monoid
21import Network.Socket as Socket
22import System.Directory (removeFile)
23import System.IO
24 ( IOMode(..)
25 , stderr
26 , hFlush
27 )
28import Control.Monad
29import Control.Monad.Fix (fix)
30#ifdef THREAD_DEBUG
31import Control.Concurrent.Lifted.Instrument
32 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId
33 , killThread )
34#else
35import GHC.Conc (labelThread)
36import Control.Concurrent
37 ( forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId
38 , killThread )
39#endif
40import Control.Exception (handle,finally)
41import System.IO.Error (tryIOError)
42import System.Mem.Weak
43import System.IO.Error
44
45-- import Data.Conduit
46import System.IO (Handle)
47import Control.Concurrent.MVar (newMVar)
48
49import Network.SocketLike
50import DPut
51import DebugTag
52
53data ServerHandle = ServerHandle Socket (Weak ThreadId)
54
55-- | Useful for testing.
56getAcceptLoopThreadId :: ServerHandle -> IO (Weak ThreadId)
57getAcceptLoopThreadId (ServerHandle _ t) = return t
58
59listenSocket :: ServerHandle -> RestrictedSocket
60listenSocket (ServerHandle sock _) = restrictSocket sock
61
62{- // Removed, bit-rotted and there are no call sites
63-- | Create a useless do-nothing 'ServerHandle'.
64dummyServerHandle :: IO ServerHandle
65dummyServerHandle = do
66 mvar <- newMVar Closed
67 let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar
68 thread <- mkWeakThreadId <=< forkIO $ return ()
69 return (ServerHandle sock thread)
70-}
71
72removeSocketFile :: SockAddr -> IO ()
73removeSocketFile (SockAddrUnix fname) = removeFile fname
74removeSocketFile _ = return ()
75
76-- | Terminate the server accept-loop. Call this to shut down the server.
77quitListening :: ServerHandle -> IO ()
78quitListening (ServerHandle socket acceptThread) =
79 finally (Socket.getSocketName socket >>= removeSocketFile)
80 (do mapM_ killThread =<< deRefWeak acceptThread
81 Socket.close socket)
82
83
84-- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString'
85-- variation. (This is not exported.)
86bshow :: Show a => a -> String
87bshow e = show e
88
89-- | Send a string to stderr. Not exported. Default 'serverWarn' when
90-- 'withSession' is used to configure the server.
91warnStderr :: String -> IO ()
92warnStderr str = dput XMisc str >> hFlush stderr
93
94newtype Local a = Local a deriving (Eq,Ord,Show)
95newtype Remote a = Remote a deriving (Eq,Ord,Show)
96
97data ServerConfig = ServerConfig
98 { serverWarn :: String -> IO ()
99 -- ^ Action to report warnings and errors.
100 , serverSession :: ( RestrictedSocket, (Local SockAddr, Remote SockAddr)) -> Int -> Handle -> IO ()
101 -- ^ Action to handle interaction with a client
102 }
103
104-- | Initialize a 'ServerConfig' using the provided session handler.
105withSession :: ((RestrictedSocket,(Local SockAddr,Remote SockAddr)) -> Int -> Handle -> IO ()) -> ServerConfig
106withSession session = ServerConfig warnStderr session
107
108-- | Launch a thread to listen at the given bind address and dispatch
109-- to session handler threads on every incoming connection. Supports
110-- IPv4 and IPv6, TCP and unix sockets.
111--
112-- The returned handle can be used with 'quitListening' to terminate the
113-- thread and prevent any new sessions from starting. Currently active
114-- session threads will not be terminated or signaled in any way.
115streamServer :: ServerConfig -> [SockAddr] -> IO ServerHandle
116streamServer cfg addrs = do
117 let warn = serverWarn cfg
118 family = case addrs of
119 SockAddrInet {}:_ -> AF_INET
120 SockAddrInet6 {}:_ -> AF_INET6
121 SockAddrUnix {}:_ -> AF_UNIX
122 [] -> AF_INET6
123 sock <- socket family Stream 0
124 setSocketOption sock ReuseAddr 1
125 let tryBind addr next _ = do
126 tryIOError (removeSocketFile addr)
127 bind sock addr
128 `catchIOError` \e -> next (Just e)
129 fix $ \loop -> let again mbe = do
130 forM_ mbe $ \e -> warn $ "bind-error: " <> bshow addrs <> " " <> bshow e
131 threadDelay 5000000
132 loop
133 in foldr tryBind again addrs Nothing
134 listen sock maxListenQueue
135 thread <- mkWeakThreadId <=< forkIO $ do
136 bindaddr <- Socket.getSocketName sock
137 myThreadId >>= flip labelThread ("StreamServer.acceptLoop." <> bshow bindaddr)
138 acceptLoop cfg sock 0
139 return (ServerHandle sock thread)
140
141-- | Not exported. This, combined with 'acceptException' form a mutually
142-- recursive loop that handles incoming connections. To quit the loop, the
143-- socket must be closed by 'quitListening'.
144acceptLoop :: ServerConfig -> Socket -> Int -> IO ()
145acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
146 (con,raddr) <- accept sock
147 let conkey = n + 1
148 laddr <- Socket.getSocketName con
149 h <- socketToHandle con ReadWriteMode
150 forkIO $ do
151 myThreadId >>= flip labelThread "StreamServer.session"
152 serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h
153 acceptLoop cfg sock (n + 1)
154
155acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()
156acceptException cfg n sock ioerror = do
157 case show (ioeGetErrorType ioerror) of
158 "resource exhausted" -> do -- try again (ioeGetErrorType ioerror == fullErrorType)
159 serverWarn cfg $ ("acceptLoop: resource exhasted")
160 threadDelay 500000
161 acceptLoop cfg sock (n + 1)
162 "invalid argument" -> do -- quit on closed socket
163 Socket.close sock
164 message -> do -- unexpected exception
165 serverWarn cfg $ ("acceptLoop: "<>bshow message)
166 Socket.close sock
167