summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-12 18:33:51 -0500
committerjoe <joe@jerkface.net>2017-11-12 18:33:51 -0500
commitd4288f5a9f87e3889a50a347ebad0a812f52938c (patch)
tree4457ab4fde9ea5f38a3772f5eefc4982f8a6397a /Presence/Server.hs
parentd646a84764f46ee4f772d7c1f2edeecd9a18ec54 (diff)
Updated Server module that layers on StreamServer.
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs141
1 files changed, 65 insertions, 76 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index f7f99907..a621134d 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -4,16 +4,17 @@
4{-# LANGUAGE OverloadedStrings #-} 4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE TupleSections #-} 5{-# LANGUAGE TupleSections #-}
6{-# LANGUAGE FlexibleInstances #-} 6{-# LANGUAGE FlexibleInstances #-}
7{-# LANGUAGE RankNTypes #-}
7----------------------------------------------------------------------------- 8-----------------------------------------------------------------------------
8-- | 9-- |
9-- Module : Server 10-- Module : Server
10-- 11--
11-- Maintainer : joe@jerkface.net 12-- Maintainer : joe@jerkface.net
12-- Stability : experimental 13-- Stability : experimental
13-- 14--
14-- A TCP client/server library. 15-- A TCP client/server library.
15-- 16--
16-- TODO: XXX: A newer version of this code is in the server.git repo. XXX 17-- TODO:
17-- 18--
18-- * interface tweaks 19-- * interface tweaks
19-- 20--
@@ -39,7 +40,7 @@ import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,E
39import Control.Monad 40import Control.Monad
40import Control.Monad.Fix 41import Control.Monad.Fix
41-- import Control.Monad.STM 42-- import Control.Monad.STM
42import Control.Monad.Trans.Resource 43-- import Control.Monad.Trans.Resource
43import Control.Monad.IO.Class (MonadIO (liftIO)) 44import Control.Monad.IO.Class (MonadIO (liftIO))
44import System.IO.Error (ioeGetErrorType,isDoesNotExistError) 45import System.IO.Error (ioeGetErrorType,isDoesNotExistError)
45import System.IO 46import System.IO
@@ -53,18 +54,20 @@ import System.IO
53 , stdout 54 , stdout
54 , Handle 55 , Handle
55 , hFlush 56 , hFlush
57 , hPutStrLn
56 ) 58 )
57import Network.Socket 59import Network.Socket as Socket
58import Network.BSD 60import Network.BSD
59 ( getProtocolNumber 61 ( getProtocolNumber
60 ) 62 )
61import Debug.Trace 63import Debug.Trace
62import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) 64import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime)
63import Data.Time.Format (formatTime) 65import Data.Time.Format (formatTime)
64import SockAddr () 66-- import SockAddr ()
65-- import System.Locale (defaultTimeLocale) 67-- import System.Locale (defaultTimeLocale)
66 68
67todo = error "unimplemented" 69import Network.StreamServer
70import Network.SocketLike hiding (sClose)
68 71
69type Microseconds = Int 72type Microseconds = Int
70type Miliseconds = Int 73type Miliseconds = Int
@@ -72,12 +75,12 @@ type TimeOut = Miliseconds
72type PingInterval = Miliseconds 75type PingInterval = Miliseconds
73 76
74-- | This object is passed with the 'Listen' and 'Connect' 77-- | This object is passed with the 'Listen' and 'Connect'
75-- instructions in order to control the behavior of the 78-- instructions in order to control the behavior of the
76-- connections that are established. It is parameterized 79-- connections that are established. It is parameterized
77-- by a user-suplied type @conkey@ that is used as a lookup 80-- by a user-suplied type @conkey@ that is used as a lookup
78-- key for connections. 81-- key for connections.
79data ConnectionParameters conkey u = 82data ConnectionParameters conkey u =
80 ConnectionParameters 83 ConnectionParameters
81 { pingInterval :: PingInterval 84 { pingInterval :: PingInterval
82 -- ^ The miliseconds of idle to allow before a 'RequiresPing' 85 -- ^ The miliseconds of idle to allow before a 'RequiresPing'
83 -- event is signaled. 86 -- event is signaled.
@@ -85,7 +88,7 @@ data ConnectionParameters conkey u =
85 -- ^ The miliseconds of idle after 'RequiresPing' is signaled 88 -- ^ The miliseconds of idle after 'RequiresPing' is signaled
86 -- that are necessary for the connection to be considered 89 -- that are necessary for the connection to be considered
87 -- lost and signalling 'EOF'. 90 -- lost and signalling 'EOF'.
88 , makeConnKey :: (Socket,SockAddr) -> IO (conkey,u) 91 , makeConnKey :: RestrictedSocket -> IO (conkey,u)
89 -- ^ This action creates a lookup key for a new connection. If 'duplex' 92 -- ^ This action creates a lookup key for a new connection. If 'duplex'
90 -- is 'True' and the result is already assocatied with an established 93 -- is 'True' and the result is already assocatied with an established
91 -- connection, then an 'EOF' will be forced before the the new 94 -- connection, then an 'EOF' will be forced before the the new
@@ -111,7 +114,7 @@ data ConnectionParameters conkey u =
111-- * 'duplex' = True 114-- * 'duplex' = True
112-- 115--
113connectionDefaults 116connectionDefaults
114 :: ((Socket, SockAddr) -> IO (conkey,u)) -> ConnectionParameters conkey u 117 :: (RestrictedSocket -> IO (conkey,u)) -> ConnectionParameters conkey u
115connectionDefaults f = ConnectionParameters 118connectionDefaults f = ConnectionParameters
116 { pingInterval = 28000 119 { pingInterval = 28000
117 , timeout = 2000 120 , timeout = 2000
@@ -140,9 +143,9 @@ data ServerInstruction conkey u
140 -- ^ send bytes to an established connection 143 -- ^ send bytes to an established connection
141 144
142#ifdef TEST 145#ifdef TEST
143deriving instance Show conkey => Show (ServerInstruction conkey) 146deriving instance Show conkey => Show (ServerInstruction conkey u)
144instance Show (a -> b) where show _ = "<function>" 147instance Show (a -> b) where show _ = "<function>"
145deriving instance Show conkey => Show (ConnectionParameters conkey) 148deriving instance Show conkey => Show (ConnectionParameters conkey u)
146#endif 149#endif
147 150
148-- | This type specifies which which half of a half-duplex 151-- | This type specifies which which half of a half-duplex
@@ -185,17 +188,22 @@ data ConnectionRecord u
185 188
186-- | This object accepts commands and signals events and maintains 189-- | This object accepts commands and signals events and maintains
187-- the list of currently listening ports and established connections. 190-- the list of currently listening ports and established connections.
188data Server a u 191data Server a u releaseKey
189 = Server { serverCommand :: TMVar (ServerInstruction a u) 192 = Server { serverCommand :: TMVar (ServerInstruction a u)
190 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) 193 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString)
191 , serverReleaseKey :: ReleaseKey 194 , serverReleaseKey :: releaseKey
192 , conmap :: TVar (Map a (ConnectionRecord u)) 195 , conmap :: TVar (Map a (ConnectionRecord u))
193 , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) 196 , listenmap :: TVar (Map PortNumber ServerHandle)
194 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) 197 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
195 } 198 }
196 199
197control sv = atomically . putTMVar (serverCommand sv) 200control sv = atomically . putTMVar (serverCommand sv)
198 201
202type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
203
204noCleanUp :: MonadIO m => Allocate () m
205noCleanUp io _ = ( (,) () ) `liftM` liftIO io
206
199-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' 207-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
200-- to ensure proper cleanup. For example, 208-- to ensure proper cleanup. For example,
201-- 209--
@@ -207,7 +215,7 @@ control sv = atomically . putTMVar (serverCommand sv)
207-- > import Control.Concurrent.STM.TChan (readTChan) 215-- > import Control.Concurrent.STM.TChan (readTChan)
208-- > 216-- >
209-- > main = runResourceT $ do 217-- > main = runResourceT $ do
210-- > sv <- server 218-- > sv <- server allocate
211-- > let params = connectionDefaults (return . snd) 219-- > let params = connectionDefaults (return . snd)
212-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params) 220-- > liftIO . atomically $ putTMVar (serverCommand sv) (Listen 2942 params)
213-- > let loop = do 221-- > let loop = do
@@ -218,8 +226,15 @@ control sv = atomically . putTMVar (serverCommand sv)
218-- > case event of EOF -> return () 226-- > case event of EOF -> return ()
219-- > _ -> loop 227-- > _ -> loop
220-- > liftIO loop 228-- > liftIO loop
221server :: (Show a,Ord a, MonadIO m, MonadResource m) => m (Server a u) 229--
222server = do 230-- Using 'Control.Monad.Trans.Resource.ResourceT' is optional. Pass 'noCleanUp'
231-- to do without automatic cleanup and be sure to remember to write 'Quit' to
232-- the 'serverCommand' variable.
233server ::
234 -- forall (m :: * -> *) a u conkey releaseKey.
235 (Show conkey, MonadIO m, Ord conkey) =>
236 Allocate releaseKey m -> m (Server conkey u releaseKey)
237server allocate = do
223 (key,cmds) <- allocate (atomically newEmptyTMVar) 238 (key,cmds) <- allocate (atomically newEmptyTMVar)
224 (atomically . flip putTMVar Quit) 239 (atomically . flip putTMVar Quit)
225 server <- liftIO . atomically $ do 240 server <- liftIO . atomically $ do
@@ -245,9 +260,9 @@ server = do
245 _ -> again 260 _ -> again
246 return server 261 return server
247 where 262 where
248 closeAll server = liftIO $ do 263 closeAll server = do
249 listening <- atomically . readTVar $ listenmap server 264 listening <- atomically . readTVar $ listenmap server
250 mapM_ killListener (Map.elems listening) 265 mapM_ quitListening (Map.elems listening)
251 let stopRetry (v,d) = do atomically $ writeTVar v False 266 let stopRetry (v,d) = do atomically $ writeTVar v False
252 interruptDelay d 267 interruptDelay d
253 retriers <- atomically $ do 268 retriers <- atomically $ do
@@ -261,38 +276,36 @@ server = do
261 atomically $ writeTVar (conmap server) Map.empty 276 atomically $ writeTVar (conmap server) Map.empty
262 277
263 278
264 doit server (Listen port params) = liftIO $ do 279 doit server (Listen port params) = do
265 280
266 listening <- Map.member port 281 listening <- Map.member port
267 `fmap` atomically (readTVar $ listenmap server) 282 `fmap` atomically (readTVar $ listenmap server)
268 when (not listening) $ do 283 when (not listening) $ do
269 284
270 let family = AF_INET6 285 let family = AF_INET6
271
272 sock <- socket family Stream 0
273 setSocketOption sock ReuseAddr 1
274 let address = 286 let address =
275 case family of 287 case family of
276 AF_INET -> SockAddrInet port iNADDR_ANY 288 AF_INET -> SockAddrInet port iNADDR_ANY
277 AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0 289 AF_INET6 -> SockAddrInet6 port 0 iN6ADDR_ANY 0
278 fix $ \loop -> do 290
279 handle (\(SomeException e)-> do 291 sserv <- flip streamServer address ServerConfig
280 warn $ "BIND-ERROR:"<>bshow address <> " " <> bshow e 292 { serverWarn = hPutStrLn stderr
281 threadDelay 5000000 293 , serverSession = \sock _ h -> do
282 loop) 294 (conkey,u) <- makeConnKey params sock
283 $ bindSocket sock address 295 _ <- newConnection server params conkey u h In
284 listen sock 2 296 return ()
285 thread <- forkIO $ acceptLoop server params sock 297 }
286 atomically $ listenmap server `modifyTVar'` Map.insert port (thread,sock) 298
287 299 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
288 doit server (Ignore port) = liftIO $ do 300
301 doit server (Ignore port) = do
289 mb <- atomically $ do 302 mb <- atomically $ do
290 map <- readTVar $ listenmap server 303 map <- readTVar $ listenmap server
291 modifyTVar' (listenmap server) $ Map.delete port 304 modifyTVar' (listenmap server) $ Map.delete port
292 return $ Map.lookup port map 305 return $ Map.lookup port map
293 maybe (return ()) killListener $ mb 306 maybe (return ()) quitListening $ mb
294 307
295 doit server (Send con bs) = liftIO $ do -- . void . forkIO $ do 308 doit server (Send con bs) = do -- . void . forkIO $ do
296 map <- atomically $ readTVar (conmap server) 309 map <- atomically $ readTVar (conmap server)
297 let post False = (trace ("cant send: "++show bs) $ return ()) 310 let post False = (trace ("cant send: "++show bs) $ return ())
298 post True = return () 311 post True = return ()
@@ -300,7 +313,7 @@ server = do
300 (post <=< flip connWrite bs . cstate) 313 (post <=< flip connWrite bs . cstate)
301 $ Map.lookup con map 314 $ Map.lookup con map
302 315
303 doit server (Connect addr params) = liftIO $ do 316 doit server (Connect addr params) = do
304 mb <- atomically $ do 317 mb <- atomically $ do
305 rmap <- readTVar (retrymap server) 318 rmap <- readTVar (retrymap server)
306 return $ Map.lookup addr rmap 319 return $ Map.lookup addr rmap
@@ -316,20 +329,19 @@ server = do
316 handle (\e -> do -- let t = ioeGetErrorType e 329 handle (\e -> do -- let t = ioeGetErrorType e
317 when (isDoesNotExistError e) $ return () -- warn "GOTCHA" 330 when (isDoesNotExistError e) $ return () -- warn "GOTCHA"
318 -- warn $ "connect-error: " <> bshow e 331 -- warn $ "connect-error: " <> bshow e
319 (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? 332 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
320 sClose sock 333 Socket.close sock
321 atomically 334 atomically
322 $ writeTChan (serverEvent server) 335 $ writeTChan (serverEvent server)
323 $ ((conkey,u),ConnectFailure addr)) 336 $ ((conkey,u),ConnectFailure addr))
324 $ do 337 $ do
325 connect sock addr 338 connect sock addr
326 me <- getSocketName sock 339 (conkey,u) <- makeConnKey params (restrictSocket sock)
327 (conkey,u) <- makeConnKey params (sock,me)
328 h <- socketToHandle sock ReadWriteMode 340 h <- socketToHandle sock ReadWriteMode
329 newConnection server params conkey u h Out 341 newConnection server params conkey u h Out
330 return () 342 return ()
331 343
332 doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do 344 doit server (ConnectWithEndlessRetry addr params interval) = do
333 proto <- getProtocolNumber "tcp" 345 proto <- getProtocolNumber "tcp"
334 void . forkIO $ do 346 void . forkIO $ do
335 resultVar <- atomically newEmptyTMVar 347 resultVar <- atomically newEmptyTMVar
@@ -356,8 +368,8 @@ server = do
356 -- warn $ "connect-error: " <> bshow e 368 -- warn $ "connect-error: " <> bshow e
357 -- Weird hack: puting the would-be peer address 369 -- Weird hack: puting the would-be peer address
358 -- instead of local socketName 370 -- instead of local socketName
359 (conkey,u) <- makeConnKey params (sock,addr) -- XXX: ? 371 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
360 sClose sock 372 Socket.close sock
361 atomically $ do 373 atomically $ do
362 writeTChan (serverEvent server) 374 writeTChan (serverEvent server)
363 $ ((conkey,u),ConnectFailure addr) 375 $ ((conkey,u),ConnectFailure addr)
@@ -365,8 +377,7 @@ server = do
365 putTMVar resultVar retry) 377 putTMVar resultVar retry)
366 $ do 378 $ do
367 connect sock addr 379 connect sock addr
368 me <- getSocketName sock 380 (conkey,u) <- makeConnKey params (restrictSocket sock)
369 (conkey,u) <- makeConnKey params (sock,me)
370 h <- socketToHandle sock ReadWriteMode 381 h <- socketToHandle sock ReadWriteMode
371 threads <- newConnection server params conkey u h Out 382 threads <- newConnection server params conkey u h Out
372 atomically $ do threadsWait threads 383 atomically $ do threadsWait threads
@@ -416,8 +427,6 @@ socketFamily (SockAddrInet _ _) = AF_INET
416socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 427socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
417socketFamily (SockAddrUnix _) = AF_UNIX 428socketFamily (SockAddrUnix _) = AF_UNIX
418 429
419killListener (thread,sock) = do sClose sock
420 -- killThread thread
421 430
422 431
423conevent con = Connection pingflag read write 432conevent con = Connection pingflag read write
@@ -561,26 +570,6 @@ newConnection server params conkey u h inout = do
561 , cdata = u } 570 , cdata = u }
562 return $ handleEOF conkey u mvar new' 571 return $ handleEOF conkey u mvar new'
563 572
564acceptLoop server params sock = handle (acceptException server params sock) $ do
565 con <- accept sock
566 (conkey,u) <- makeConnKey params con
567 h <- socketToHandle (fst con) ReadWriteMode
568 newConnection server params conkey u h In
569 acceptLoop server params sock
570
571acceptException server params sock ioerror = do
572 sClose sock
573 case show (ioeGetErrorType ioerror) of
574 "resource exhausted" -> do -- try again
575 warn ("acceptLoop: resource exhasted")
576 threadDelay 500000
577 acceptLoop server params sock
578 "invalid argument" -> do -- quit on closed socket
579 return ()
580 message -> do -- unexpected exception
581 warn ("acceptLoop: "<>bshow message)
582 return ()
583
584 573
585 574
586getPacket h = do hWaitForInput h (-1) 575getPacket h = do hWaitForInput h (-1)
@@ -622,7 +611,7 @@ connectionThreads h pinglogic = do
622 atomically $ writeTChan incomming packet 611 atomically $ writeTChan incomming packet
623 pingBump pinglogic 612 pingBump pinglogic
624 -- warn $ "bumped: " <> S.take 60 packet 613 -- warn $ "bumped: " <> S.take 60 packet
625 isEof <- liftIO $ hIsEOF h 614 isEof <- hIsEOF h
626 if isEof then finished Nothing else loop 615 if isEof then finished Nothing else loop
627 616
628 writerThread <- forkIO . fix $ \loop -> do 617 writerThread <- forkIO . fix $ \loop -> do
@@ -710,7 +699,7 @@ mapConn both action c =
710 ConnectionPair r w -> do 699 ConnectionPair r w -> do
711 rem <- orElse (const w `fmap` action r) 700 rem <- orElse (const w `fmap` action r)
712 (const r `fmap` action w) 701 (const r `fmap` action w)
713 when both $ action rem 702 when both $ action rem
714 703
715connClose :: ConnectionState -> STM () 704connClose :: ConnectionState -> STM ()
716connClose c = mapConn True threadsClose c 705connClose c = mapConn True threadsClose c
@@ -719,7 +708,7 @@ connWait :: ConnectionState -> STM ()
719connWait c = doit -- mapConn False threadsWait c 708connWait c = doit -- mapConn False threadsWait c
720 where 709 where
721 action = threadsWait 710 action = threadsWait
722 doit = 711 doit =
723 case c of 712 case c of
724 SaneConnection rw -> action rw 713 SaneConnection rw -> action rw
725 ReadOnlyConnection r -> action r 714 ReadOnlyConnection r -> action r
@@ -821,7 +810,7 @@ pingWait :: PingMachine -> STM PingEvent
821pingWait me = takeTMVar (pingEvent me) 810pingWait me = takeTMVar (pingEvent me)
822 811
823 812
824data InterruptableDelay = InterruptableDelay 813data InterruptableDelay = InterruptableDelay
825 { delayThread :: TMVar ThreadId 814 { delayThread :: TMVar ThreadId
826 } 815 }
827 816
@@ -835,7 +824,7 @@ startDelay d interval = do
835 thread <- myThreadId 824 thread <- myThreadId
836 handle (\(ErrorCall _)-> do 825 handle (\(ErrorCall _)-> do
837 debugNoise $ "delay interrupted" 826 debugNoise $ "delay interrupted"
838 return False) $ do 827 return False) $ do
839 atomically $ putTMVar (delayThread d) thread 828 atomically $ putTMVar (delayThread d) thread
840 threadDelay interval 829 threadDelay interval
841 void . atomically $ takeTMVar (delayThread d) 830 void . atomically $ takeTMVar (delayThread d)