diff options
author | joe <joe@jerkface.net> | 2017-01-22 05:31:14 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-22 05:31:14 -0500 |
commit | 8cf4de73d77197032fd8ebfc4e4f3a00b287e0e7 (patch) | |
tree | 6ee5d529caf714851223d2da9f22eb1510d5cfee | |
parent | 1c8cbe8fc66466936b4f889b3893ca3c23478631 (diff) |
New: DHT deamon and command-line interface.
-rw-r--r-- | bittorrent.cabal | 20 | ||||
-rw-r--r-- | examples/dht.hs | 80 | ||||
-rw-r--r-- | examples/dhtd.hs | 192 | ||||
-rw-r--r-- | src/Network/SocketLike.hs | 104 | ||||
-rw-r--r-- | src/Network/StreamServer.hs | 143 |
5 files changed, 539 insertions, 0 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index 0fddf5d0..fbd17397 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -83,6 +83,8 @@ library | |||
83 | Network.BitTorrent.DHT.Routing | 83 | Network.BitTorrent.DHT.Routing |
84 | Network.BitTorrent.DHT.Session | 84 | Network.BitTorrent.DHT.Session |
85 | Network.BitTorrent.DHT.Token | 85 | Network.BitTorrent.DHT.Token |
86 | Network.StreamServer | ||
87 | Network.SocketLike | ||
86 | other-modules: Paths_bittorrent | 88 | other-modules: Paths_bittorrent |
87 | if !flag(dht-only) | 89 | if !flag(dht-only) |
88 | exposed-modules: Network.BitTorrent | 90 | exposed-modules: Network.BitTorrent |
@@ -336,6 +338,24 @@ benchmark bench | |||
336 | , criterion | 338 | , criterion |
337 | ghc-options: -O2 -fforce-recomp | 339 | ghc-options: -O2 -fforce-recomp |
338 | 340 | ||
341 | executable dht | ||
342 | hs-source-dirs: examples | ||
343 | main-is: dht.hs | ||
344 | default-language: Haskell2010 | ||
345 | build-depends: base, haskeline, network, bytestring | ||
346 | |||
347 | executable dhtd | ||
348 | hs-source-dirs: examples | ||
349 | main-is: dhtd.hs | ||
350 | default-language: Haskell2010 | ||
351 | build-depends: base, network, bytestring | ||
352 | , mtl | ||
353 | , lifted-base | ||
354 | , pretty | ||
355 | , data-default | ||
356 | , monad-logger | ||
357 | , bittorrent | ||
358 | |||
339 | -- Utility to work with torrent files. | 359 | -- Utility to work with torrent files. |
340 | executable mktorrent | 360 | executable mktorrent |
341 | if !flag(examples) | 361 | if !flag(examples) |
diff --git a/examples/dht.hs b/examples/dht.hs new file mode 100644 index 00000000..feeee9ff --- /dev/null +++ b/examples/dht.hs | |||
@@ -0,0 +1,80 @@ | |||
1 | {-# LANGUAGE NondecreasingIndentation #-} | ||
2 | import Control.Monad | ||
3 | import Control.Monad.Fix | ||
4 | import Control.Monad.IO.Class | ||
5 | import Data.Char | ||
6 | import Network.Socket as Socket | ||
7 | import System.Console.Haskeline | ||
8 | import System.Environment | ||
9 | import System.Exit | ||
10 | import System.IO | ||
11 | import System.IO.Unsafe | ||
12 | import qualified Data.ByteString as B | ||
13 | |||
14 | -- | Reads one character. If it is not a digit, | ||
15 | -- then it is discarded and 'Nothing' is returned. | ||
16 | hReadDigit :: Handle -> IO (Maybe Char) | ||
17 | hReadDigit h = do c <- hGetChar h | ||
18 | return $ guard (isDigit c) >> pure c | ||
19 | |||
20 | -- | Expected input: "nnn:..." | ||
21 | -- Here we read the digit sequence "nnn" and drop the colon | ||
22 | -- as it is the first non-digit. | ||
23 | hReadInt :: Handle -> IO Int | ||
24 | hReadInt h = do | ||
25 | nstr <- fix $ \readDigits -> | ||
26 | maybe (return []) -- dropped non-digit character | ||
27 | (($ unsafeInterleaveIO readDigits) . fmap . (:)) | ||
28 | =<< hReadDigit h | ||
29 | readIO nstr :: IO Int | ||
30 | |||
31 | |||
32 | -- | Read a length prefixed string from a handle. | ||
33 | -- The format is "nnn:..." where /nnn/ is an ascii-encoded character count | ||
34 | -- and /.../ is the sequence of characters | ||
35 | -- | ||
36 | -- Note: The first byte after the count is ignored and discarded. | ||
37 | readResponse :: Handle -> IO String | ||
38 | readResponse h = do | ||
39 | n <- hReadInt h | ||
40 | sequence $ replicate n (hGetChar h) | ||
41 | |||
42 | -- | Send a command to the dhtd daemon and then print the response. | ||
43 | sendCommand :: Handle -> String -> InputT IO () | ||
44 | sendCommand h cmd = do resp <- liftIO $ do hPutStrLn h cmd | ||
45 | readResponse h | ||
46 | outputStrLn resp | ||
47 | |||
48 | -- | Get one line of input and send it to the daemon, then run the | ||
49 | -- passed continuation if it wasn't "quit". | ||
50 | interactiveMode :: Handle -> InputT IO () -> InputT IO () | ||
51 | interactiveMode h repl = do | ||
52 | minput <- getInputLine "dht> " | ||
53 | case minput of | ||
54 | Nothing -> return () | ||
55 | Just "quit" -> sendCommand h "quit" >> return () | ||
56 | Just cmd -> sendCommand h cmd >> repl | ||
57 | |||
58 | main :: IO () | ||
59 | main = do | ||
60 | -- Open the control socket to the daemon. | ||
61 | h <- liftIO $ handle (\e -> do hPutStrLn stderr (show (e ::IOError)) | ||
62 | exitFailure) | ||
63 | $ do sock <- socket AF_UNIX Stream defaultProtocol | ||
64 | connect sock (SockAddrUnix "dht.sock") | ||
65 | socketToHandle sock ReadWriteMode | ||
66 | |||
67 | -- Haskeline's default looks only at our stdin and not our stdout. | ||
68 | -- That's a bad idea because we can take input from the command line. | ||
69 | behavior <- do | ||
70 | useTerminal <- and <$> mapM hIsTerminalDevice [stdin,stdout] | ||
71 | return $ if useTerminal then preferTerm else useFileHandle stdin | ||
72 | |||
73 | runInputTBehaviorWithPrefs behavior defaultPrefs defaultSettings $ do | ||
74 | |||
75 | -- A command may be specified on the command line | ||
76 | -- or else we enter an interactive shell. | ||
77 | args <- dropWhile isSpace . unwords <$> liftIO getArgs | ||
78 | case args of | ||
79 | (_:_) -> sendCommand h args >> sendCommand h "quit" | ||
80 | _ -> fix $ interactiveMode h | ||
diff --git a/examples/dhtd.hs b/examples/dhtd.hs new file mode 100644 index 00000000..6bf48595 --- /dev/null +++ b/examples/dhtd.hs | |||
@@ -0,0 +1,192 @@ | |||
1 | {-# LANGUAGE NondecreasingIndentation #-} | ||
2 | {-# LANGUAGE FlexibleContexts #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | {-# LANGUAGE OverloadedStrings #-} | ||
5 | {-# LANGUAGE ScopedTypeVariables #-} | ||
6 | {-# LANGUAGE TupleSections #-} | ||
7 | |||
8 | import Control.Arrow; | ||
9 | import Control.Concurrent | ||
10 | import Control.Exception.Lifted as Lifted | ||
11 | import Control.Monad | ||
12 | import Control.Monad.Logger | ||
13 | import Control.Monad.Reader | ||
14 | import Data.Char | ||
15 | import Data.Default | ||
16 | import Data.List as L | ||
17 | import Data.Maybe | ||
18 | import qualified Data.ByteString as B (ByteString,writeFile,readFile) | ||
19 | ; import Data.ByteString (ByteString) | ||
20 | import System.IO | ||
21 | import System.IO.Error | ||
22 | import Text.PrettyPrint.HughesPJClass | ||
23 | import Text.Printf | ||
24 | import Control.Monad.Reader.Class | ||
25 | |||
26 | import Network.BitTorrent.Address | ||
27 | import Network.BitTorrent.DHT | ||
28 | import qualified Network.BitTorrent.DHT.Routing as R | ||
29 | import Network.BitTorrent.DHT.Session | ||
30 | import Network.SocketLike | ||
31 | import Network.StreamServer | ||
32 | |||
33 | mkNodeAddr :: SockAddr -> NodeAddr IPv4 | ||
34 | mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) | ||
35 | (fromMaybe 0 $ sockAddrPort addr) -- FIXME | ||
36 | |||
37 | btBindAddr :: String -> Bool -> IO (NodeAddr IPv4) | ||
38 | btBindAddr s b = mkNodeAddr <$> getBindAddress s b | ||
39 | |||
40 | printReport :: MonadIO m => [(String,String)] -> m () | ||
41 | printReport kvs = liftIO $ do | ||
42 | putStrLn (showReport kvs) | ||
43 | hFlush stdout | ||
44 | |||
45 | showReport :: [(String,String)] -> String | ||
46 | showReport kvs = do | ||
47 | let colwidth = maximum $ map (length . fst) kvs | ||
48 | (k,v) <- kvs | ||
49 | concat [ printf " %-*s" (colwidth+1) k, v, "\n" ] | ||
50 | |||
51 | showEnry :: Show a => (NodeInfo a, t) -> [Char] | ||
52 | showEnry (n,_) = intercalate " " | ||
53 | [ show $ pPrint (nodeId n) | ||
54 | , show $ nodeAddr n | ||
55 | ] | ||
56 | |||
57 | printTable :: DHT IPv4 () | ||
58 | printTable = do | ||
59 | t <- showTable | ||
60 | liftIO $ do | ||
61 | putStrLn t | ||
62 | hFlush stdout | ||
63 | |||
64 | showTable :: DHT IPv4 String | ||
65 | showTable = do | ||
66 | nodes <- R.toList <$> getTable | ||
67 | return $ showReport | ||
68 | $ map (show *** showEnry) | ||
69 | $ concat $ zipWith map (map (,) [0::Int ..]) nodes | ||
70 | |||
71 | bootstrapNodes :: IO [NodeAddr IPv4] | ||
72 | bootstrapNodes = mapMaybe fromAddr | ||
73 | <$> mapM resolveHostName defaultBootstrapNodes | ||
74 | |||
75 | -- ExtendedCaps (Map.singleton | ||
76 | |||
77 | noDebugPrints :: LogSource -> LogLevel -> Bool | ||
78 | noDebugPrints _ = \case LevelDebug -> False | ||
79 | _ -> True | ||
80 | |||
81 | noLogging :: LogSource -> LogLevel -> Bool | ||
82 | noLogging _ _ = False | ||
83 | |||
84 | |||
85 | resume :: DHT IPv4 (Maybe B.ByteString) | ||
86 | resume = do | ||
87 | restore_attempt <- liftIO $ tryIOError $ B.readFile "dht-nodes.dat" | ||
88 | saved_nodes <- | ||
89 | either (const $ do liftIO $ putStrLn "Error reading dht-nodes.dat" | ||
90 | return Nothing) | ||
91 | (return . Just) | ||
92 | restore_attempt | ||
93 | return saved_nodes | ||
94 | |||
95 | godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b | ||
96 | godht f = do | ||
97 | a <- btBindAddr "8008" False | ||
98 | dht def { optTimeout = 5 } a (const $ const True) $ do | ||
99 | me0 <- asks tentativeNodeId | ||
100 | printReport [("tentative node-id",show $ pPrint me0) | ||
101 | ,("listen-address", show a) | ||
102 | ] | ||
103 | f a me0 | ||
104 | |||
105 | marshalForClient :: String -> String | ||
106 | marshalForClient s = show (length s) ++ ":" ++ s | ||
107 | |||
108 | hPutClient :: Handle -> String -> IO () | ||
109 | hPutClient h s = hPutStr h (marshalForClient s) | ||
110 | |||
111 | clientSession :: Node IPv4 -> MVar () -> RestrictedSocket -> Int -> Handle -> IO () | ||
112 | clientSession st signalQuit sock n h = do | ||
113 | line <- map toLower . dropWhile isSpace <$> hGetLine h | ||
114 | let cmd action = action >> clientSession st signalQuit sock n h | ||
115 | case line of | ||
116 | |||
117 | "quit" -> hPutClient h "goodbye." >> hClose h | ||
118 | |||
119 | "stop" -> do hPutClient h "Terminating DHT Daemon." | ||
120 | hClose h | ||
121 | putMVar signalQuit () | ||
122 | |||
123 | "ls" -> cmd $ join $ runDHT st $ do | ||
124 | tbl <- getTable | ||
125 | t <- showTable | ||
126 | me <- myNodeIdAccordingTo (read "8.8.8.8:6881") | ||
127 | ip <- routableAddress | ||
128 | return $ do | ||
129 | hPutClient h $ unlines | ||
130 | [ t | ||
131 | , showReport | ||
132 | [ ("node-id", show $ pPrint me) | ||
133 | , ("internet address", show ip) | ||
134 | , ("buckets", show $ R.shape tbl)] | ||
135 | ] | ||
136 | |||
137 | _ -> cmd $ hPutClient h "error." | ||
138 | |||
139 | main :: IO () | ||
140 | main = do | ||
141 | godht $ \a me0 -> do | ||
142 | printTable | ||
143 | bs <- liftIO bootstrapNodes | ||
144 | `onException` | ||
145 | (Lifted.ioError $ userError "unable to resolve bootstrap nodes") | ||
146 | saved_nodes <- resume | ||
147 | |||
148 | when (isJust saved_nodes) $ do | ||
149 | b <- isBootstrapped | ||
150 | tbl <- getTable | ||
151 | bc <- optBucketCount <$> asks options | ||
152 | printTable | ||
153 | me <- case concat $ R.toList tbl of | ||
154 | (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) | ||
155 | _ -> return me0 | ||
156 | printReport [("node-id",show $ pPrint me) | ||
157 | ,("listen-address", show a) | ||
158 | ,("bootstrapped", show b) | ||
159 | ,("buckets", show $ R.shape tbl) | ||
160 | ,("optBucketCount", show bc) | ||
161 | ,("dht-nodes.dat", "Running bootstrap...") | ||
162 | ] | ||
163 | |||
164 | st <- ask | ||
165 | waitForSignal <- liftIO $ do | ||
166 | signalQuit <- newEmptyMVar | ||
167 | srv <- streamServer (withSession $ clientSession st signalQuit) (SockAddrUnix "dht.sock") | ||
168 | return $ liftIO $ do | ||
169 | () <- takeMVar signalQuit | ||
170 | quitListening srv | ||
171 | |||
172 | bootstrap saved_nodes bs | ||
173 | |||
174 | b <- isBootstrapped | ||
175 | tbl <- getTable | ||
176 | bc <- optBucketCount <$> asks options | ||
177 | printTable | ||
178 | ip <- routableAddress | ||
179 | me <- case concat $ R.toList tbl of | ||
180 | (n,_):_ -> myNodeIdAccordingTo (nodeAddr n) | ||
181 | _ -> return me0 | ||
182 | printReport [("node-id",show $ pPrint me) | ||
183 | ,("internet address", show ip) | ||
184 | ,("listen-address", show a) | ||
185 | ,("bootstrapped", show b) | ||
186 | ,("buckets", show $ R.shape tbl) | ||
187 | ,("optBucketCount", show bc) | ||
188 | ] | ||
189 | snapshot >>= liftIO . B.writeFile "dht-nodes.dat" | ||
190 | |||
191 | waitForSignal | ||
192 | |||
diff --git a/src/Network/SocketLike.hs b/src/Network/SocketLike.hs new file mode 100644 index 00000000..2aa78e3e --- /dev/null +++ b/src/Network/SocketLike.hs | |||
@@ -0,0 +1,104 @@ | |||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
2 | -- | | ||
3 | -- | ||
4 | -- A socket could be used indirectly via a 'System.IO.Handle' or a conduit from | ||
5 | -- Michael Snoyman's conduit package. But doing so presents an encapsulation | ||
6 | -- problem. Do we allow access to the underlying socket and trust that it wont | ||
7 | -- be used in an unsafe way? Or do we protect it at the higher level and deny | ||
8 | -- access to various state information? | ||
9 | -- | ||
10 | -- The 'SocketLike' class enables the approach that provides a safe wrapper to | ||
11 | -- the underlying socket and gives access to various state information without | ||
12 | -- enabling direct reads or writes. | ||
13 | module Network.SocketLike | ||
14 | ( SocketLike(..) | ||
15 | , RestrictedSocket | ||
16 | , restrictSocket | ||
17 | , restrictHandleSocket | ||
18 | -- * Re-exports | ||
19 | -- | ||
20 | -- | To make the 'SocketLike' methods less awkward to use, the types | ||
21 | -- 'CUInt', 'SockAddr', and 'PortNumber' are re-exported. | ||
22 | , CUInt | ||
23 | , PortNumber | ||
24 | , SockAddr(..) | ||
25 | ) where | ||
26 | |||
27 | import Network.Socket | ||
28 | ( PortNumber | ||
29 | , SockAddr | ||
30 | ) | ||
31 | import Foreign.C.Types ( CUInt ) | ||
32 | |||
33 | import qualified Network.Socket as NS | ||
34 | import System.IO (Handle,hClose,hIsOpen) | ||
35 | |||
36 | -- | A safe (mostly read-only) interface to a 'NS.Socket'. Note that despite | ||
37 | -- how this class is named, it provides no access to typical 'NS.Socket' uses | ||
38 | -- like sending or receiving network packets. | ||
39 | class SocketLike sock where | ||
40 | -- | See 'NS.getSocketName' | ||
41 | getSocketName :: sock -> IO SockAddr | ||
42 | -- | See 'NS.getPeerName' | ||
43 | getPeerName :: sock -> IO SockAddr | ||
44 | -- | See 'NS.getPeerCred' | ||
45 | getPeerCred :: sock -> IO (CUInt, CUInt, CUInt) | ||
46 | -- | See 'NS.socketPort' | ||
47 | socketPort :: sock -> IO PortNumber | ||
48 | -- | See 'NS.sIsConnected' | ||
49 | -- | ||
50 | -- __Warning__: Don't rely on this method if it's possible the socket was | ||
51 | -- converted into a 'Handle'. | ||
52 | sIsConnected :: sock -> IO Bool | ||
53 | -- | See 'NS.sIsBound' | ||
54 | sIsBound :: sock -> IO Bool | ||
55 | -- | See 'NS.sIsListening' | ||
56 | sIsListening :: sock -> IO Bool | ||
57 | -- | See 'NS.sIsReadable' | ||
58 | sIsReadable :: sock -> IO Bool | ||
59 | -- | See 'NS.sIsWritable' | ||
60 | sIsWritable :: sock -> IO Bool | ||
61 | |||
62 | -- | This is the only exposed write-access method to the | ||
63 | -- underlying state. Usually implemented by 'NS.close' | ||
64 | sClose :: sock -> IO () | ||
65 | |||
66 | instance SocketLike NS.Socket where | ||
67 | getSocketName = NS.getSocketName | ||
68 | getPeerName = NS.getPeerName | ||
69 | getPeerCred = NS.getPeerCred | ||
70 | socketPort = NS.socketPort | ||
71 | sIsConnected = NS.sIsConnected -- warning: this is always False if the socket | ||
72 | -- was converted to a Handle | ||
73 | sIsBound = NS.sIsBound | ||
74 | sIsListening = NS.sIsListening | ||
75 | sIsReadable = NS.sIsReadable | ||
76 | sIsWritable = NS.sIsWritable | ||
77 | |||
78 | sClose = NS.sClose | ||
79 | |||
80 | -- | An encapsulated socket. Data reads and writes are not possible. | ||
81 | data RestrictedSocket = Restricted (Maybe Handle) NS.Socket deriving Show | ||
82 | |||
83 | instance SocketLike RestrictedSocket where | ||
84 | getSocketName (Restricted mb sock) = NS.getSocketName sock | ||
85 | getPeerName (Restricted mb sock) = NS.getPeerName sock | ||
86 | getPeerCred (Restricted mb sock) = NS.getPeerCred sock | ||
87 | socketPort (Restricted mb sock) = NS.socketPort sock | ||
88 | sIsConnected (Restricted mb sock) = maybe (NS.sIsConnected sock) (hIsOpen) mb | ||
89 | sIsBound (Restricted mb sock) = NS.sIsBound sock | ||
90 | sIsListening (Restricted mb sock) = NS.sIsListening sock | ||
91 | sIsReadable (Restricted mb sock) = NS.sIsReadable sock | ||
92 | sIsWritable (Restricted mb sock) = NS.sIsWritable sock | ||
93 | sClose (Restricted mb sock) = maybe (NS.sClose sock) (\h -> hClose h >> NS.sClose sock) mb | ||
94 | |||
95 | -- | Create a 'RestrictedSocket' that explicitly disallows sending or | ||
96 | -- receiving data. | ||
97 | restrictSocket :: NS.Socket -> RestrictedSocket | ||
98 | restrictSocket socket = Restricted Nothing socket | ||
99 | |||
100 | -- | Build a 'RestrictedSocket' for which 'sClose' will close the given | ||
101 | -- 'Handle'. It is intended that this 'Handle' was obtained via | ||
102 | -- 'NS.socketToHandle'. | ||
103 | restrictHandleSocket :: Handle -> NS.Socket -> RestrictedSocket | ||
104 | restrictHandleSocket h socket = Restricted (Just h) socket | ||
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs new file mode 100644 index 00000000..a6cead0e --- /dev/null +++ b/src/Network/StreamServer.hs | |||
@@ -0,0 +1,143 @@ | |||
1 | -- | This module implements a bare-bones TCP or Unix socket server. | ||
2 | {-# LANGUAGE TypeFamilies #-} | ||
3 | {-# LANGUAGE TypeOperators #-} | ||
4 | {-# LANGUAGE OverloadedStrings #-} | ||
5 | {-# LANGUAGE RankNTypes #-} | ||
6 | module Network.StreamServer | ||
7 | ( streamServer | ||
8 | , ServerHandle | ||
9 | , ServerConfig(..) | ||
10 | , withSession | ||
11 | , quitListening | ||
12 | , dummyServerHandle | ||
13 | ) where | ||
14 | |||
15 | import Data.Monoid | ||
16 | import Network.Socket as Socket | ||
17 | import Data.ByteString.Char8 | ||
18 | ( hGetNonBlocking | ||
19 | ) | ||
20 | import qualified Data.ByteString.Char8 as S | ||
21 | ( hPutStrLn | ||
22 | ) | ||
23 | import System.Directory (removeFile) | ||
24 | import System.IO | ||
25 | ( IOMode(..) | ||
26 | , hSetBuffering | ||
27 | , BufferMode(..) | ||
28 | , hWaitForInput | ||
29 | , hClose | ||
30 | , hIsEOF | ||
31 | , hPutStrLn | ||
32 | , stderr | ||
33 | , hFlush | ||
34 | ) | ||
35 | import Control.Monad | ||
36 | import Control.Monad.Fix (fix) | ||
37 | import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) | ||
38 | import Control.Exception (catch,handle,try,finally) | ||
39 | import System.IO.Error (tryIOError) | ||
40 | import System.Mem.Weak | ||
41 | import System.IO.Error | ||
42 | |||
43 | -- import Data.Conduit | ||
44 | import Control.Monad.IO.Class (MonadIO (liftIO)) | ||
45 | import qualified Data.ByteString as S (ByteString) | ||
46 | import System.IO (Handle) | ||
47 | import Control.Concurrent.MVar (newMVar) | ||
48 | |||
49 | import Network.SocketLike | ||
50 | |||
51 | data ServerHandle = ServerHandle Socket (Weak ThreadId) | ||
52 | |||
53 | |||
54 | -- | Create a useless do-nothing 'ServerHandle'. | ||
55 | dummyServerHandle :: IO ServerHandle | ||
56 | dummyServerHandle = do | ||
57 | mvar <- newMVar Closed | ||
58 | let sock = MkSocket 0 AF_UNSPEC NoSocketType 0 mvar | ||
59 | thread <- mkWeakThreadId <=< forkIO $ return () | ||
60 | return (ServerHandle sock thread) | ||
61 | |||
62 | removeSocketFile :: SockAddr -> IO () | ||
63 | removeSocketFile (SockAddrUnix fname) = removeFile fname | ||
64 | removeSocketFile _ = return () | ||
65 | |||
66 | -- | Terminate the server accept-loop. Call this to shut down the server. | ||
67 | quitListening :: ServerHandle -> IO () | ||
68 | quitListening (ServerHandle socket _) = | ||
69 | finally (Socket.getSocketName socket >>= removeSocketFile) | ||
70 | (Socket.close socket) | ||
71 | |||
72 | |||
73 | -- | It's 'bshow' instead of 'show' to enable swapping in a 'ByteString' | ||
74 | -- variation. (This is not exported.) | ||
75 | bshow :: Show a => a -> String | ||
76 | bshow e = show e | ||
77 | |||
78 | -- | Send a string to stderr. Not exported. Default 'serverWarn' when | ||
79 | -- 'withSession' is used to configure the server. | ||
80 | warnStderr :: String -> IO () | ||
81 | warnStderr str = hPutStrLn stderr str >> hFlush stderr | ||
82 | |||
83 | data ServerConfig = ServerConfig | ||
84 | { serverWarn :: String -> IO () | ||
85 | -- ^ Action to report warnings and errors. | ||
86 | , serverSession :: RestrictedSocket -> Int -> Handle -> IO () | ||
87 | -- ^ Action to handle interaction with a client | ||
88 | } | ||
89 | |||
90 | -- | Initialize a 'ServerConfig' using the provided session handler. | ||
91 | withSession :: (RestrictedSocket -> Int -> Handle -> IO ()) -> ServerConfig | ||
92 | withSession session = ServerConfig warnStderr session | ||
93 | |||
94 | -- | Launch a thread to listen at the given bind address and dispatch | ||
95 | -- to session handler threads on every incomming connection. Supports | ||
96 | -- IPv4 and IPv6, TCP and unix sockets. | ||
97 | -- | ||
98 | -- The returned handle can be used with 'quitListening' to terminate the | ||
99 | -- thread and prevent any new sessions from starting. Currently active | ||
100 | -- session threads will not be terminated or signaled in any way. | ||
101 | streamServer :: ServerConfig -> SockAddr -> IO ServerHandle | ||
102 | streamServer cfg addr = do | ||
103 | let warn = serverWarn cfg | ||
104 | family = case addr of | ||
105 | SockAddrInet {} -> AF_INET | ||
106 | SockAddrInet6 {} -> AF_INET6 | ||
107 | SockAddrUnix {} -> AF_UNIX | ||
108 | sock <- socket family Stream 0 | ||
109 | setSocketOption sock ReuseAddr 1 | ||
110 | fix $ \loop -> | ||
111 | tryIOError (removeSocketFile addr) >> bind sock addr | ||
112 | `catchIOError` \e -> do warn $ "bind-error: " <> bshow addr <> " " <> bshow e | ||
113 | threadDelay 5000000 | ||
114 | loop | ||
115 | listen sock maxListenQueue | ||
116 | thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 | ||
117 | return (ServerHandle sock thread) | ||
118 | |||
119 | -- | Not exported. This, combined with 'acceptException' form a mutually recursive | ||
120 | -- loop that handles incomming connections. To quit the loop, the socket must be | ||
121 | -- closed by 'quitListening'. | ||
122 | acceptLoop :: ServerConfig -> Socket -> Int -> IO () | ||
123 | acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | ||
124 | con <- accept sock | ||
125 | let conkey = n + 1 | ||
126 | h <- socketToHandle (fst con) ReadWriteMode | ||
127 | forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h | ||
128 | acceptLoop cfg sock (n + 1) | ||
129 | |||
130 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | ||
131 | acceptException cfg n sock ioerror = do | ||
132 | Socket.close sock | ||
133 | case show (ioeGetErrorType ioerror) of | ||
134 | "resource exhausted" -> do -- try again | ||
135 | serverWarn cfg $ ("acceptLoop: resource exhasted") | ||
136 | threadDelay 500000 | ||
137 | acceptLoop cfg sock (n + 1) | ||
138 | "invalid argument" -> do -- quit on closed socket | ||
139 | return () | ||
140 | message -> do -- unexpected exception | ||
141 | serverWarn cfg $ ("acceptLoop: "<>bshow message) | ||
142 | return () | ||
143 | |||