summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-13 10:14:57 -0500
committerjoe <joe@jerkface.net>2017-11-13 10:14:57 -0500
commit66b7039f00faa00055353f99b8dfeee77694ae1c (patch)
tree89d99e2fb7cc7f1a769baedba655c234e5bce1c8
parent5e8f82e436c03e1c59e69d5c9eb0e5a14284dd87 (diff)
Consolidated redundant implementations of InterruptibleDelay.
-rw-r--r--Presence/Server.hs123
1 files changed, 56 insertions, 67 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index a621134d..f82a93c5 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -1,10 +1,11 @@
1{-# OPTIONS_HADDOCK prune #-} 1{-# OPTIONS_HADDOCK prune #-}
2{-# LANGUAGE CPP #-} 2{-# LANGUAGE CPP #-}
3{-# LANGUAGE StandaloneDeriving #-} 3{-# LANGUAGE DoAndIfThenElse #-}
4{-# LANGUAGE FlexibleInstances #-}
4{-# LANGUAGE OverloadedStrings #-} 5{-# LANGUAGE OverloadedStrings #-}
6{-# LANGUAGE RankNTypes #-}
7{-# LANGUAGE StandaloneDeriving #-}
5{-# LANGUAGE TupleSections #-} 8{-# LANGUAGE TupleSections #-}
6{-# LANGUAGE FlexibleInstances #-}
7{-# LANGUAGE RankNTypes #-}
8----------------------------------------------------------------------------- 9-----------------------------------------------------------------------------
9-- | 10-- |
10-- Module : Server 11-- Module : Server
@@ -18,7 +19,6 @@
18-- 19--
19-- * interface tweaks 20-- * interface tweaks
20-- 21--
21{-# LANGUAGE DoAndIfThenElse #-}
22module Server where 22module Server where
23 23
24import Data.ByteString (ByteString,hGetNonBlocking) 24import Data.ByteString (ByteString,hGetNonBlocking)
@@ -36,14 +36,14 @@ import Control.Concurrent.STM
36-- import Control.Concurrent.STM.TMVar 36-- import Control.Concurrent.STM.TMVar
37-- import Control.Concurrent.STM.TChan 37-- import Control.Concurrent.STM.TChan
38-- import Control.Concurrent.STM.Delay 38-- import Control.Concurrent.STM.Delay
39import Control.Exception ({-evaluate,-}handle,SomeException(..),bracketOnError,ErrorCall(..)) 39import Control.Exception ({-evaluate,-}handle,SomeException(..),ErrorCall(..))
40import Control.Monad 40import Control.Monad
41import Control.Monad.Fix 41import Control.Monad.Fix
42-- import Control.Monad.STM 42-- import Control.Monad.STM
43-- import Control.Monad.Trans.Resource 43-- import Control.Monad.Trans.Resource
44import Control.Monad.IO.Class (MonadIO (liftIO)) 44import Control.Monad.IO.Class (MonadIO (liftIO))
45import System.IO.Error (ioeGetErrorType,isDoesNotExistError) 45import System.IO.Error (isDoesNotExistError)
46import System.IO 46import System.IO
47 ( IOMode(..) 47 ( IOMode(..)
48 , hSetBuffering 48 , hSetBuffering
49 , BufferMode(..) 49 , BufferMode(..)
@@ -51,7 +51,6 @@ import System.IO
51 , hClose 51 , hClose
52 , hIsEOF 52 , hIsEOF
53 , stderr 53 , stderr
54 , stdout
55 , Handle 54 , Handle
56 , hFlush 55 , hFlush
57 , hPutStrLn 56 , hPutStrLn
@@ -61,11 +60,11 @@ import Network.BSD
61 ( getProtocolNumber 60 ( getProtocolNumber
62 ) 61 )
63import Debug.Trace 62import Debug.Trace
64import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) 63import Data.Time.Clock (getCurrentTime,diffUTCTime)
65import Data.Time.Format (formatTime)
66-- import SockAddr () 64-- import SockAddr ()
67-- import System.Locale (defaultTimeLocale) 65-- import System.Locale (defaultTimeLocale)
68 66
67import InterruptibleDelay
69import Network.StreamServer 68import Network.StreamServer
70import Network.SocketLike hiding (sClose) 69import Network.SocketLike hiding (sClose)
71 70
@@ -150,7 +149,7 @@ deriving instance Show conkey => Show (ConnectionParameters conkey u)
150 149
151-- | This type specifies which which half of a half-duplex 150-- | This type specifies which which half of a half-duplex
152-- connection is of interest. 151-- connection is of interest.
153data InOrOut = In | Out 152data InOrOut = In | Out
154 deriving (Enum,Eq,Ord,Show,Read) 153 deriving (Enum,Eq,Ord,Show,Read)
155 154
156-- | These events may be read from 'serverEvent' TChannel. 155-- | These events may be read from 'serverEvent' TChannel.
@@ -181,22 +180,23 @@ deriving instance Eq b => Eq (ConnectionEvent b)
181 180
182-- | This is the per-connection state. 181-- | This is the per-connection state.
183data ConnectionRecord u 182data ConnectionRecord u
184 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler 183 = ConnectionRecord { ckont :: TMVar (STM (IO ())) -- ^ used to pass a continuation to update the eof-handler
185 , cstate :: ConnectionState -- ^ used to send/receive data to the connection 184 , cstate :: ConnectionState -- ^ used to send/receive data to the connection
186 , cdata :: u -- ^ user data, stored in the connection map for convenience 185 , cdata :: u -- ^ user data, stored in the connection map for convenience
187 } 186 }
188 187
189-- | This object accepts commands and signals events and maintains 188-- | This object accepts commands and signals events and maintains
190-- the list of currently listening ports and established connections. 189-- the list of currently listening ports and established connections.
191data Server a u releaseKey 190data Server a u releaseKey
192 = Server { serverCommand :: TMVar (ServerInstruction a u) 191 = Server { serverCommand :: TMVar (ServerInstruction a u)
193 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) 192 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString)
194 , serverReleaseKey :: releaseKey 193 , serverReleaseKey :: releaseKey
195 , conmap :: TVar (Map a (ConnectionRecord u)) 194 , conmap :: TVar (Map a (ConnectionRecord u))
196 , listenmap :: TVar (Map PortNumber ServerHandle) 195 , listenmap :: TVar (Map PortNumber ServerHandle)
197 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) 196 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
198 } 197 }
199 198
199control :: Server a u releaseKey -> ServerInstruction a u -> IO ()
200control sv = atomically . putTMVar (serverCommand sv) 200control sv = atomically . putTMVar (serverCommand sv)
201 201
202type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) 202type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
@@ -213,7 +213,7 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io
213-- > import Control.Monad.STM (atomically) 213-- > import Control.Monad.STM (atomically)
214-- > import Control.Concurrent.STM.TMVar (putTMVar) 214-- > import Control.Concurrent.STM.TMVar (putTMVar)
215-- > import Control.Concurrent.STM.TChan (readTChan) 215-- > import Control.Concurrent.STM.TChan (readTChan)
216-- > 216-- >
217-- > main = runResourceT $ do 217-- > main = runResourceT $ do
218-- > sv <- server allocate 218-- > sv <- server allocate
219-- > let params = connectionDefaults (return . snd) 219-- > let params = connectionDefaults (return . snd)
@@ -299,7 +299,7 @@ server allocate = do
299 atomically $ listenmap server `modifyTVar'` Map.insert port sserv 299 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
300 300
301 doit server (Ignore port) = do 301 doit server (Ignore port) = do
302 mb <- atomically $ do 302 mb <- atomically $ do
303 map <- readTVar $ listenmap server 303 map <- readTVar $ listenmap server
304 modifyTVar' (listenmap server) $ Map.delete port 304 modifyTVar' (listenmap server) $ Map.delete port
305 return $ Map.lookup port map 305 return $ Map.lookup port map
@@ -322,7 +322,7 @@ server allocate = do
322 interruptDelay d 322 interruptDelay d
323 when (not b) forkit) 323 when (not b) forkit)
324 mb 324 mb
325 where 325 where
326 forkit = void . forkIO $ do 326 forkit = void . forkIO $ do
327 proto <- getProtocolNumber "tcp" 327 proto <- getProtocolNumber "tcp"
328 sock <- socket (socketFamily addr) Stream proto 328 sock <- socket (socketFamily addr) Stream proto
@@ -331,8 +331,8 @@ server allocate = do
331 -- warn $ "connect-error: " <> bshow e 331 -- warn $ "connect-error: " <> bshow e
332 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? 332 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
333 Socket.close sock 333 Socket.close sock
334 atomically 334 atomically
335 $ writeTChan (serverEvent server) 335 $ writeTChan (serverEvent server)
336 $ ((conkey,u),ConnectFailure addr)) 336 $ ((conkey,u),ConnectFailure addr))
337 $ do 337 $ do
338 connect sock addr 338 connect sock addr
@@ -345,7 +345,7 @@ server allocate = do
345 proto <- getProtocolNumber "tcp" 345 proto <- getProtocolNumber "tcp"
346 void . forkIO $ do 346 void . forkIO $ do
347 resultVar <- atomically newEmptyTMVar 347 resultVar <- atomically newEmptyTMVar
348 timer <- interruptableDelay 348 timer <- interruptibleDelay
349 (retryVar,action) <- atomically $ do 349 (retryVar,action) <- atomically $ do
350 let noop = return () 350 let noop = return ()
351 map <- readTVar (retrymap server) 351 map <- readTVar (retrymap server)
@@ -371,7 +371,7 @@ server allocate = do
371 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ? 371 (conkey,u) <- makeConnKey params (restrictSocket sock) -- XXX: ?
372 Socket.close sock 372 Socket.close sock
373 atomically $ do 373 atomically $ do
374 writeTChan (serverEvent server) 374 writeTChan (serverEvent server)
375 $ ((conkey,u),ConnectFailure addr) 375 $ ((conkey,u),ConnectFailure addr)
376 retry <- readTVar retryVar 376 retry <- readTVar retryVar
377 putTMVar resultVar retry) 377 putTMVar resultVar retry)
@@ -396,7 +396,7 @@ server allocate = do
396 396
397 397
398-- INTERNAL ---------------------------------------------------------- 398-- INTERNAL ----------------------------------------------------------
399 399
400{- 400{-
401hWriteUntilNothing h outs = 401hWriteUntilNothing h outs =
402 fix $ \loop -> do 402 fix $ \loop -> do
@@ -423,18 +423,27 @@ connRead conn = do
423 -- discardContents (threadsChannel w) 423 -- discardContents (threadsChannel w)
424 return c 424 return c
425 425
426socketFamily :: SockAddr -> Family
426socketFamily (SockAddrInet _ _) = AF_INET 427socketFamily (SockAddrInet _ _) = AF_INET
427socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 428socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
428socketFamily (SockAddrUnix _) = AF_UNIX 429socketFamily (SockAddrUnix _) = AF_UNIX
429 430
430 431
431 432conevent :: ConnectionState -> ConnectionEvent b
432conevent con = Connection pingflag read write 433conevent con = Connection pingflag read write
433 where 434 where
434 pingflag = swapTVar (pingFlag (connPingTimer con)) False 435 pingflag = swapTVar (pingFlag (connPingTimer con)) False
435 read = connRead con 436 read = connRead con
436 write = connWrite con 437 write = connWrite con
437 438
439newConnection :: Ord a
440 => Server a u1 releaseKey
441 -> ConnectionParameters conkey u
442 -> a
443 -> u1
444 -> Handle
445 -> InOrOut
446 -> IO ConnectionThreads
438newConnection server params conkey u h inout = do 447newConnection server params conkey u h inout = do
439 hSetBuffering h NoBuffering 448 hSetBuffering h NoBuffering
440 let (forward,idle_ms,timeout_ms) = 449 let (forward,idle_ms,timeout_ms) =
@@ -571,7 +580,7 @@ newConnection server params conkey u h inout = do
571 return $ handleEOF conkey u mvar new' 580 return $ handleEOF conkey u mvar new'
572 581
573 582
574 583getPacket :: Handle -> IO ByteString
575getPacket h = do hWaitForInput h (-1) 584getPacket h = do hWaitForInput h (-1)
576 hGetNonBlocking h 1024 585 hGetNonBlocking h 1024
577 586
@@ -718,6 +727,7 @@ connWait c = doit -- mapConn False threadsWait c
718 (const r `fmap` action w) 727 (const r `fmap` action w)
719 threadsClose rem 728 threadsClose rem
720 729
730connPingTimer :: ConnectionState -> PingMachine
721connPingTimer c = 731connPingTimer c =
722 case c of 732 case c of
723 SaneConnection rw -> threadsPing rw 733 SaneConnection rw -> threadsPing rw
@@ -725,10 +735,13 @@ connPingTimer c =
725 WriteOnlyConnection w -> threadsPing w -- should be disabled. 735 WriteOnlyConnection w -> threadsPing w -- should be disabled.
726 ConnectionPair r w -> threadsPing r 736 ConnectionPair r w -> threadsPing r
727 737
738connCancelPing :: ConnectionState -> IO ()
728connCancelPing c = pingCancel (connPingTimer c) 739connCancelPing c = pingCancel (connPingTimer c)
729connWaitPing c = pingWait (connPingTimer c)
730 740
741connWaitPing :: ConnectionState -> STM PingEvent
742connWaitPing c = pingWait (connPingTimer c)
731 743
744connFlush :: ConnectionState -> STM ()
732connFlush c = 745connFlush c =
733 case c of 746 case c of
734 SaneConnection rw -> waitChan rw 747 SaneConnection rw -> waitChan rw
@@ -740,23 +753,28 @@ connFlush c =
740 b <- isEmptyTChan (threadsChannel t) 753 b <- isEmptyTChan (threadsChannel t)
741 when (not b) retry 754 when (not b) retry
742 755
756bshow :: Show a => a -> ByteString
743bshow e = S.pack . show $ e 757bshow e = S.pack . show $ e
758
759warn :: ByteString -> IO ()
744warn str = S.hPutStrLn stderr str >> hFlush stderr 760warn str = S.hPutStrLn stderr str >> hFlush stderr
761
762debugNoise :: Monad m => t -> m ()
745debugNoise str = return () 763debugNoise str = return ()
746 764
747 765
748data PingEvent = PingIdle | PingTimeOut 766data PingEvent = PingIdle | PingTimeOut
749 767
750data PingMachine = PingMachine 768data PingMachine = PingMachine
751 { pingFlag :: TVar Bool 769 { pingFlag :: TVar Bool
752 , pingInterruptable :: InterruptableDelay 770 , pingInterruptible :: InterruptibleDelay
753 , pingEvent :: TMVar PingEvent 771 , pingEvent :: TMVar PingEvent
754 , pingStarted :: TMVar Bool 772 , pingStarted :: TMVar Bool
755 } 773 }
756 774
757pingMachine :: PingInterval -> TimeOut -> IO PingMachine 775pingMachine :: PingInterval -> TimeOut -> IO PingMachine
758pingMachine idle timeout = do 776pingMachine idle timeout = do
759 d <- interruptableDelay 777 d <- interruptibleDelay
760 flag <- atomically $ newTVar False 778 flag <- atomically $ newTVar False
761 canceled <- atomically $ newTVar False 779 canceled <- atomically $ newTVar False
762 event <- atomically newEmptyTMVar 780 event <- atomically newEmptyTMVar
@@ -786,7 +804,7 @@ pingMachine idle timeout = do
786 putTMVar event PingTimeOut 804 putTMVar event PingTimeOut
787 return PingMachine 805 return PingMachine
788 { pingFlag = flag 806 { pingFlag = flag
789 , pingInterruptable = d 807 , pingInterruptible = d
790 , pingEvent = event 808 , pingEvent = event
791 , pingStarted = started 809 , pingStarted = started
792 } 810 }
@@ -795,7 +813,7 @@ pingCancel :: PingMachine -> IO ()
795pingCancel me = do 813pingCancel me = do
796 atomically $ do tryTakeTMVar (pingStarted me) 814 atomically $ do tryTakeTMVar (pingStarted me)
797 putTMVar (pingStarted me) False 815 putTMVar (pingStarted me) False
798 interruptDelay (pingInterruptable me) 816 interruptDelay (pingInterruptible me)
799 817
800pingBump :: PingMachine -> IO () 818pingBump :: PingMachine -> IO ()
801pingBump me = do 819pingBump me = do
@@ -804,37 +822,8 @@ pingBump me = do
804 when (b/=Just False) $ do 822 when (b/=Just False) $ do
805 tryTakeTMVar (pingStarted me) 823 tryTakeTMVar (pingStarted me)
806 putTMVar (pingStarted me) True 824 putTMVar (pingStarted me) True
807 interruptDelay (pingInterruptable me) 825 interruptDelay (pingInterruptible me)
808 826
809pingWait :: PingMachine -> STM PingEvent 827pingWait :: PingMachine -> STM PingEvent
810pingWait me = takeTMVar (pingEvent me) 828pingWait me = takeTMVar (pingEvent me)
811 829
812
813data InterruptableDelay = InterruptableDelay
814 { delayThread :: TMVar ThreadId
815 }
816
817interruptableDelay :: IO InterruptableDelay
818interruptableDelay = do
819 fmap InterruptableDelay
820 $ atomically newEmptyTMVar
821
822startDelay :: InterruptableDelay -> Microseconds -> IO Bool
823startDelay d interval = do
824 thread <- myThreadId
825 handle (\(ErrorCall _)-> do
826 debugNoise $ "delay interrupted"
827 return False) $ do
828 atomically $ putTMVar (delayThread d) thread
829 threadDelay interval
830 void . atomically $ takeTMVar (delayThread d)
831 return True
832
833interruptDelay :: InterruptableDelay -> IO ()
834interruptDelay d = do
835 mthread <- atomically $ do
836 tryTakeTMVar (delayThread d)
837 flip (maybe $ return ()) mthread $ \thread -> do
838 throwTo thread (ErrorCall "Interrupted delay")
839
840