diff options
author | joe <joe@jerkface.net> | 2017-11-13 10:14:57 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-13 10:14:57 -0500 |
commit | 66b7039f00faa00055353f99b8dfeee77694ae1c (patch) | |
tree | 89d99e2fb7cc7f1a769baedba655c234e5bce1c8 | |
parent | 5e8f82e436c03e1c59e69d5c9eb0e5a14284dd87 (diff) |
Consolidated redundant implementations of InterruptibleDelay.
-rw-r--r-- | Presence/Server.hs | 123 |
1 files changed, 56 insertions, 67 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index a621134d..f82a93c5 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -1,10 +1,11 @@ | |||
1 | {-# OPTIONS_HADDOCK prune #-} | 1 | {-# OPTIONS_HADDOCK prune #-} |
2 | {-# LANGUAGE CPP #-} | 2 | {-# LANGUAGE CPP #-} |
3 | {-# LANGUAGE StandaloneDeriving #-} | 3 | {-# LANGUAGE DoAndIfThenElse #-} |
4 | {-# LANGUAGE FlexibleInstances #-} | ||
4 | {-# LANGUAGE OverloadedStrings #-} | 5 | {-# LANGUAGE OverloadedStrings #-} |
6 | {-# LANGUAGE RankNTypes #-} | ||
7 | {-# LANGUAGE StandaloneDeriving #-} | ||
5 | {-# LANGUAGE TupleSections #-} | 8 | {-# LANGUAGE TupleSections #-} |
6 | {-# LANGUAGE FlexibleInstances #-} | ||
7 | {-# LANGUAGE RankNTypes #-} | ||
8 | ----------------------------------------------------------------------------- | 9 | ----------------------------------------------------------------------------- |
9 | -- | | 10 | -- | |
10 | -- Module : Server | 11 | -- Module : Server |
@@ -18,7 +19,6 @@ | |||
18 | -- | 19 | -- |
19 | -- * interface tweaks | 20 | -- * interface tweaks |
20 | -- | 21 | -- |
21 | {-# LANGUAGE DoAndIfThenElse #-} | ||
22 | module Server where | 22 | module Server where |
23 | 23 | ||
24 | import Data.ByteString (ByteString,hGetNonBlocking) | 24 | import Data.ByteString (ByteString,hGetNonBlocking) |
@@ -36,14 +36,14 @@ import Control.Concurrent.STM | |||
36 | -- import Control.Concurrent.STM.TMVar | 36 | -- import Control.Concurrent.STM.TMVar |
37 | -- import Control.Concurrent.STM.TChan | 37 | -- import Control.Concurrent.STM.TChan |
38 | -- import Control.Concurrent.STM.Delay | 38 | -- import Control.Concurrent.STM.Delay |
39 | import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) | 39 | import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..)) |
40 | import Control.Monad | 40 | import Control.Monad |
41 | import Control.Monad.Fix | 41 | import Control.Monad.Fix |
42 | -- import Control.Monad.STM | 42 | -- import Control.Monad.STM |
43 | -- import Control.Monad.Trans.Resource | 43 | -- import Control.Monad.Trans.Resource |
44 | import Control.Monad.IO.Class (MonadIO (liftIO)) | 44 | import Control.Monad.IO.Class (MonadIO (liftIO)) |
45 | import System.IO.Error (ioeGetErrorType,isDoesNotExistError) | 45 | import System.IO.Error (isDoesNotExistError) |
46 | import System.IO | 46 | import System.IO |
47 | ( IOMode(..) | 47 | ( IOMode(..) |
48 | , hSetBuffering | 48 | , hSetBuffering |
49 | , BufferMode(..) | 49 | , BufferMode(..) |
@@ -51,7 +51,6 @@ import System.IO | |||
51 | , hClose | 51 | , hClose |
52 | , hIsEOF | 52 | , hIsEOF |
53 | , stderr | 53 | , stderr |
54 | , stdout | ||
55 | , Handle | 54 | , Handle |
56 | , hFlush | 55 | , hFlush |
57 | , hPutStrLn | 56 | , hPutStrLn |
@@ -61,11 +60,11 @@ import Network.BSD | |||
61 | ( getProtocolNumber | 60 | ( getProtocolNumber |
62 | ) | 61 | ) |
63 | import Debug.Trace | 62 | import Debug.Trace |
64 | import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) | 63 | import Data.Time.Clock (getCurrentTime,diffUTCTime) |
65 | import Data.Time.Format (formatTime) | ||
66 | -- import SockAddr () | 64 | -- import SockAddr () |
67 | -- import System.Locale (defaultTimeLocale) | 65 | -- import System.Locale (defaultTimeLocale) |
68 | 66 | ||
67 | import InterruptibleDelay | ||
69 | import Network.StreamServer | 68 | import Network.StreamServer |
70 | import Network.SocketLike hiding (sClose) | 69 | import Network.SocketLike hiding (sClose) |
71 | 70 | ||
@@ -150,7 +149,7 @@ deriving instance Show conkey => Show (ConnectionParameters conkey u) | |||
150 | 149 | ||
151 | -- | This type specifies which which half of a half-duplex | 150 | -- | This type specifies which which half of a half-duplex |
152 | -- connection is of interest. | 151 | -- connection is of interest. |
153 | data InOrOut = In | Out | 152 | data InOrOut = In | Out |
154 | deriving (Enum,Eq,Ord,Show,Read) | 153 | deriving (Enum,Eq,Ord,Show,Read) |
155 | 154 | ||
156 | -- | These events may be read from 'serverEvent' TChannel. | 155 | -- | These events may be read from 'serverEvent' TChannel. |
@@ -181,22 +180,23 @@ deriving instance Eq b => Eq (ConnectionEvent b) | |||
181 | 180 | ||
182 | -- | This is the per-connection state. | 181 | -- | This is the per-connection state. |
183 | data ConnectionRecord u | 182 | data ConnectionRecord u |
184 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler | 183 | = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler |
185 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection | 184 | , cstate :: ConnectionState -- ^ used to send/receive data to the connection |
186 | , cdata :: u -- ^ user data, stored in the connection map for convenience | 185 | , cdata :: u -- ^ user data, stored in the connection map for convenience |
187 | } | 186 | } |
188 | 187 | ||
189 | -- | This object accepts commands and signals events and maintains | 188 | -- | This object accepts commands and signals events and maintains |
190 | -- the list of currently listening ports and established connections. | 189 | -- the list of currently listening ports and established connections. |
191 | data Server a u releaseKey | 190 | data Server a u releaseKey |
192 | = Server { serverCommand :: TMVar (ServerInstruction a u) | 191 | = Server { serverCommand :: TMVar (ServerInstruction a u) |
193 | , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) | 192 | , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) |
194 | , serverReleaseKey :: releaseKey | 193 | , serverReleaseKey :: releaseKey |
195 | , conmap :: TVar (Map a (ConnectionRecord u)) | 194 | , conmap :: TVar (Map a (ConnectionRecord u)) |
196 | , listenmap :: TVar (Map PortNumber ServerHandle) | 195 | , listenmap :: TVar (Map PortNumber ServerHandle) |
197 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | 196 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) |
198 | } | 197 | } |
199 | 198 | ||
199 | control :: Server a u releaseKey -> ServerInstruction a u -> IO () | ||
200 | control sv = atomically . putTMVar (serverCommand sv) | 200 | control sv = atomically . putTMVar (serverCommand sv) |
201 | 201 | ||
202 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) | 202 | type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) |
@@ -213,7 +213,7 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io | |||
213 | -- > import Control.Monad.STM (atomically) | 213 | -- > import Control.Monad.STM (atomically) |
214 | -- > import Control.Concurrent.STM.TMVar (putTMVar) | 214 | -- > import Control.Concurrent.STM.TMVar (putTMVar) |
215 | -- > import Control.Concurrent.STM.TChan (readTChan) | 215 | -- > import Control.Concurrent.STM.TChan (readTChan) |
216 | -- > | 216 | -- > |
217 | -- > main = runResourceT $ do | 217 | -- > main = runResourceT $ do |
218 | -- > sv <- server allocate | 218 | -- > sv <- server allocate |
219 | -- > let params = connectionDefaults (return . snd) | 219 | -- > let params = connectionDefaults (return . snd) |
@@ -299,7 +299,7 @@ server allocate = do | |||
299 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv | 299 | atomically $ listenmap server `modifyTVar'` Map.insert port sserv |
300 | 300 | ||
301 | doit server (Ignore port) = do | 301 | doit server (Ignore port) = do |
302 | mb <- atomically $ do | 302 | mb <- atomically $ do |
303 | map <- readTVar $ listenmap server | 303 | map <- readTVar $ listenmap server |
304 | modifyTVar' (listenmap server) $ Map.delete port | 304 | modifyTVar' (listenmap server) $ Map.delete port |
305 | return $ Map.lookup port map | 305 | return $ Map.lookup port map |
@@ -322,7 +322,7 @@ server allocate = do | |||
322 | interruptDelay d | 322 | interruptDelay d |
323 | when (not b) forkit) | 323 | when (not b) forkit) |
324 | mb | 324 | mb |
325 | where | 325 | where |
326 | forkit = void . forkIO $ do | 326 | forkit = void . forkIO $ do |
327 | proto <- getProtocolNumber "tcp" | 327 | proto <- getProtocolNumber "tcp" |
328 | sock <- socket (socketFamily addr) Stream proto | 328 | sock <- socket (socketFamily addr) Stream proto |
@@ -331,8 +331,8 @@ server allocate = do | |||
331 | -- warn $ "connect-error: " <> bshow e | 331 | -- warn $ "connect-error: " <> bshow e |
332 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? | 332 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? |
333 | Socket.close sock | 333 | Socket.close sock |
334 | atomically | 334 | atomically |
335 | $ writeTChan (serverEvent server) | 335 | $ writeTChan (serverEvent server) |
336 | $ ((conkey,u),ConnectFailure addr)) | 336 | $ ((conkey,u),ConnectFailure addr)) |
337 | $ do | 337 | $ do |
338 | connect sock addr | 338 | connect sock addr |
@@ -345,7 +345,7 @@ server allocate = do | |||
345 | proto <- getProtocolNumber "tcp" | 345 | proto <- getProtocolNumber "tcp" |
346 | void . forkIO $ do | 346 | void . forkIO $ do |
347 | resultVar <- atomically newEmptyTMVar | 347 | resultVar <- atomically newEmptyTMVar |
348 | timer <- interruptableDelay | 348 | timer <- interruptibleDelay |
349 | (retryVar,action) <- atomically $ do | 349 | (retryVar,action) <- atomically $ do |
350 | let noop = return () | 350 | let noop = return () |
351 | map <- readTVar (retrymap server) | 351 | map <- readTVar (retrymap server) |
@@ -371,7 +371,7 @@ server allocate = do | |||
371 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? | 371 | (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? |
372 | Socket.close sock | 372 | Socket.close sock |
373 | atomically $ do | 373 | atomically $ do |
374 | writeTChan (serverEvent server) | 374 | writeTChan (serverEvent server) |
375 | $ ((conkey,u),ConnectFailure addr) | 375 | $ ((conkey,u),ConnectFailure addr) |
376 | retry <- readTVar retryVar | 376 | retry <- readTVar retryVar |
377 | putTMVar resultVar retry) | 377 | putTMVar resultVar retry) |
@@ -396,7 +396,7 @@ server allocate = do | |||
396 | 396 | ||
397 | 397 | ||
398 | -- INTERNAL ---------------------------------------------------------- | 398 | -- INTERNAL ---------------------------------------------------------- |
399 | 399 | ||
400 | {- | 400 | {- |
401 | hWriteUntilNothing h outs = | 401 | hWriteUntilNothing h outs = |
402 | fix $ \loop -> do | 402 | fix $ \loop -> do |
@@ -423,18 +423,27 @@ connRead conn = do | |||
423 | -- discardContents (threadsChannel w) | 423 | -- discardContents (threadsChannel w) |
424 | return c | 424 | return c |
425 | 425 | ||
426 | socketFamily :: SockAddr -> Family | ||
426 | socketFamily (SockAddrInet _ _) = AF_INET | 427 | socketFamily (SockAddrInet _ _) = AF_INET |
427 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | 428 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 |
428 | socketFamily (SockAddrUnix _) = AF_UNIX | 429 | socketFamily (SockAddrUnix _) = AF_UNIX |
429 | 430 | ||
430 | 431 | ||
431 | 432 | conevent :: ConnectionState -> ConnectionEvent b | |
432 | conevent con = Connection pingflag read write | 433 | conevent con = Connection pingflag read write |
433 | where | 434 | where |
434 | pingflag = swapTVar (pingFlag (connPingTimer con)) False | 435 | pingflag = swapTVar (pingFlag (connPingTimer con)) False |
435 | read = connRead con | 436 | read = connRead con |
436 | write = connWrite con | 437 | write = connWrite con |
437 | 438 | ||
439 | newConnection :: Ord a | ||
440 | => Server a u1 releaseKey | ||
441 | -> ConnectionParameters conkey u | ||
442 | -> a | ||
443 | -> u1 | ||
444 | -> Handle | ||
445 | -> InOrOut | ||
446 | -> IO ConnectionThreads | ||
438 | newConnection server params conkey u h inout = do | 447 | newConnection server params conkey u h inout = do |
439 | hSetBuffering h NoBuffering | 448 | hSetBuffering h NoBuffering |
440 | let (forward,idle_ms,timeout_ms) = | 449 | let (forward,idle_ms,timeout_ms) = |
@@ -571,7 +580,7 @@ newConnection server params conkey u h inout = do | |||
571 | return $ handleEOF conkey u mvar new' | 580 | return $ handleEOF conkey u mvar new' |
572 | 581 | ||
573 | 582 | ||
574 | 583 | getPacket :: Handle -> IO ByteString | |
575 | getPacket h = do hWaitForInput h (-1) | 584 | getPacket h = do hWaitForInput h (-1) |
576 | hGetNonBlocking h 1024 | 585 | hGetNonBlocking h 1024 |
577 | 586 | ||
@@ -718,6 +727,7 @@ connWait c = doit -- mapConn False threadsWait c | |||
718 | (const r `fmap` action w) | 727 | (const r `fmap` action w) |
719 | threadsClose rem | 728 | threadsClose rem |
720 | 729 | ||
730 | connPingTimer :: ConnectionState -> PingMachine | ||
721 | connPingTimer c = | 731 | connPingTimer c = |
722 | case c of | 732 | case c of |
723 | SaneConnection rw -> threadsPing rw | 733 | SaneConnection rw -> threadsPing rw |
@@ -725,10 +735,13 @@ connPingTimer c = | |||
725 | WriteOnlyConnection w -> threadsPing w -- should be disabled. | 735 | WriteOnlyConnection w -> threadsPing w -- should be disabled. |
726 | ConnectionPair r w -> threadsPing r | 736 | ConnectionPair r w -> threadsPing r |
727 | 737 | ||
738 | connCancelPing :: ConnectionState -> IO () | ||
728 | connCancelPing c = pingCancel (connPingTimer c) | 739 | connCancelPing c = pingCancel (connPingTimer c) |
729 | connWaitPing c = pingWait (connPingTimer c) | ||
730 | 740 | ||
741 | connWaitPing :: ConnectionState -> STM PingEvent | ||
742 | connWaitPing c = pingWait (connPingTimer c) | ||
731 | 743 | ||
744 | connFlush :: ConnectionState -> STM () | ||
732 | connFlush c = | 745 | connFlush c = |
733 | case c of | 746 | case c of |
734 | SaneConnection rw -> waitChan rw | 747 | SaneConnection rw -> waitChan rw |
@@ -740,23 +753,28 @@ connFlush c = | |||
740 | b <- isEmptyTChan (threadsChannel t) | 753 | b <- isEmptyTChan (threadsChannel t) |
741 | when (not b) retry | 754 | when (not b) retry |
742 | 755 | ||
756 | bshow :: Show a => a -> ByteString | ||
743 | bshow e = S.pack . show $ e | 757 | bshow e = S.pack . show $ e |
758 | |||
759 | warn :: ByteString -> IO () | ||
744 | warn str = S.hPutStrLn stderr str >> hFlush stderr | 760 | warn str = S.hPutStrLn stderr str >> hFlush stderr |
761 | |||
762 | debugNoise :: Monad m => t -> m () | ||
745 | debugNoise str = return () | 763 | debugNoise str = return () |
746 | 764 | ||
747 | 765 | ||
748 | data PingEvent = PingIdle | PingTimeOut | 766 | data PingEvent = PingIdle | PingTimeOut |
749 | 767 | ||
750 | data PingMachine = PingMachine | 768 | data PingMachine = PingMachine |
751 | { pingFlag :: TVar Bool | 769 | { pingFlag :: TVar Bool |
752 | , pingInterruptable :: InterruptableDelay | 770 | , pingInterruptible :: InterruptibleDelay |
753 | , pingEvent :: TMVar PingEvent | 771 | , pingEvent :: TMVar PingEvent |
754 | , pingStarted :: TMVar Bool | 772 | , pingStarted :: TMVar Bool |
755 | } | 773 | } |
756 | 774 | ||
757 | pingMachine :: PingInterval -> TimeOut -> IO PingMachine | 775 | pingMachine :: PingInterval -> TimeOut -> IO PingMachine |
758 | pingMachine idle timeout = do | 776 | pingMachine idle timeout = do |
759 | d <- interruptableDelay | 777 | d <- interruptibleDelay |
760 | flag <- atomically $ newTVar False | 778 | flag <- atomically $ newTVar False |
761 | canceled <- atomically $ newTVar False | 779 | canceled <- atomically $ newTVar False |
762 | event <- atomically newEmptyTMVar | 780 | event <- atomically newEmptyTMVar |
@@ -786,7 +804,7 @@ pingMachine idle timeout = do | |||
786 | putTMVar event PingTimeOut | 804 | putTMVar event PingTimeOut |
787 | return PingMachine | 805 | return PingMachine |
788 | { pingFlag = flag | 806 | { pingFlag = flag |
789 | , pingInterruptable = d | 807 | , pingInterruptible = d |
790 | , pingEvent = event | 808 | , pingEvent = event |
791 | , pingStarted = started | 809 | , pingStarted = started |
792 | } | 810 | } |
@@ -795,7 +813,7 @@ pingCancel :: PingMachine -> IO () | |||
795 | pingCancel me = do | 813 | pingCancel me = do |
796 | atomically $ do tryTakeTMVar (pingStarted me) | 814 | atomically $ do tryTakeTMVar (pingStarted me) |
797 | putTMVar (pingStarted me) False | 815 | putTMVar (pingStarted me) False |
798 | interruptDelay (pingInterruptable me) | 816 | interruptDelay (pingInterruptible me) |
799 | 817 | ||
800 | pingBump :: PingMachine -> IO () | 818 | pingBump :: PingMachine -> IO () |
801 | pingBump me = do | 819 | pingBump me = do |
@@ -804,37 +822,8 @@ pingBump me = do | |||
804 | when (b/=Just False) $ do | 822 | when (b/=Just False) $ do |
805 | tryTakeTMVar (pingStarted me) | 823 | tryTakeTMVar (pingStarted me) |
806 | putTMVar (pingStarted me) True | 824 | putTMVar (pingStarted me) True |
807 | interruptDelay (pingInterruptable me) | 825 | interruptDelay (pingInterruptible me) |
808 | 826 | ||
809 | pingWait :: PingMachine -> STM PingEvent | 827 | pingWait :: PingMachine -> STM PingEvent |
810 | pingWait me = takeTMVar (pingEvent me) | 828 | pingWait me = takeTMVar (pingEvent me) |
811 | 829 | ||
812 | |||
813 | data InterruptableDelay = InterruptableDelay | ||
814 | { delayThread :: TMVar ThreadId | ||
815 | } | ||
816 | |||
817 | interruptableDelay :: IO InterruptableDelay | ||
818 | interruptableDelay = do | ||
819 | fmap InterruptableDelay | ||
820 | $ atomically newEmptyTMVar | ||
821 | |||
822 | startDelay :: InterruptableDelay -> Microseconds -> IO Bool | ||
823 | startDelay d interval = do | ||
824 | thread <- myThreadId | ||
825 | handle (\(ErrorCall _)-> do | ||
826 | debugNoise $ "delay interrupted" | ||
827 | return False) $ do | ||
828 | atomically $ putTMVar (delayThread d) thread | ||
829 | threadDelay interval | ||
830 | void . atomically $ takeTMVar (delayThread d) | ||
831 | return True | ||
832 | |||
833 | interruptDelay :: InterruptableDelay -> IO () | ||
834 | interruptDelay d = do | ||
835 | mthread <- atomically $ do | ||
836 | tryTakeTMVar (delayThread d) | ||
837 | flip (maybe $ return ()) mthread $ \thread -> do | ||
838 | throwTo thread (ErrorCall "Interrupted delay") | ||
839 | |||
840 | |||