diff options
Diffstat (limited to 'server/src/Control/Concurrent/PingMachine.hs')
-rw-r--r-- | server/src/Control/Concurrent/PingMachine.hs | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/server/src/Control/Concurrent/PingMachine.hs b/server/src/Control/Concurrent/PingMachine.hs new file mode 100644 index 00000000..5de0e2e5 --- /dev/null +++ b/server/src/Control/Concurrent/PingMachine.hs | |||
@@ -0,0 +1,163 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE NondecreasingIndentation #-} | ||
3 | {-# LANGUAGE TupleSections #-} | ||
4 | module Control.Concurrent.PingMachine where | ||
5 | |||
6 | import Control.Monad | ||
7 | import Data.Function | ||
8 | #ifdef THREAD_DEBUG | ||
9 | import Control.Concurrent.Lifted.Instrument | ||
10 | #else | ||
11 | import Control.Concurrent (forkIO) | ||
12 | import Control.Concurrent.Lifted | ||
13 | import GHC.Conc (labelThread) | ||
14 | #endif | ||
15 | import Control.Concurrent.STM | ||
16 | |||
17 | import Control.Concurrent.Delay | ||
18 | |||
19 | type Miliseconds = Int | ||
20 | type TimeOut = Miliseconds | ||
21 | type PingInterval = Miliseconds | ||
22 | |||
23 | -- | Events that occur as a result of the 'PingMachine' watchdog. | ||
24 | -- | ||
25 | -- Use 'pingWait' to wait for one of these to occur. | ||
26 | data PingEvent | ||
27 | = PingIdle -- ^ You should send a ping if you observe this event. | ||
28 | | PingTimeOut -- ^ You should give up on the connection in case of this event. | ||
29 | |||
30 | data PingMachine = PingMachine | ||
31 | { pingFlag :: TVar Bool | ||
32 | , pingInterruptible :: InterruptibleDelay | ||
33 | , pingEvent :: TMVar PingEvent | ||
34 | , pingStarted :: TMVar Bool | ||
35 | } | ||
36 | |||
37 | -- | Fork a thread to monitor a connection for a ping timeout. | ||
38 | -- | ||
39 | -- If 'pingBump' is not invoked after a idle is signaled, a timeout event will | ||
40 | -- occur. When that happens, even if the caller chooses to ignore this event, | ||
41 | -- the watchdog thread will be terminated and no more ping events will be | ||
42 | -- signaled. | ||
43 | -- | ||
44 | -- An idle connection will be signaled by: | ||
45 | -- | ||
46 | -- (1) 'pingFlag' is set 'True' | ||
47 | -- | ||
48 | -- (2) 'pingWait' returns 'PingIdle' | ||
49 | -- | ||
50 | -- Either may be tested to determine whether a ping should be sent but | ||
51 | -- 'pingFlag' is difficult to use properly because it is up to the caller to | ||
52 | -- remember that the ping is already in progress. | ||
53 | forkPingMachine | ||
54 | :: String | ||
55 | -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
56 | -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
57 | -> IO PingMachine | ||
58 | forkPingMachine label idle timeout = do | ||
59 | d <- interruptibleDelay | ||
60 | flag <- atomically $ newTVar False | ||
61 | canceled <- atomically $ newTVar False | ||
62 | event <- atomically newEmptyTMVar | ||
63 | started <- atomically $ newEmptyTMVar | ||
64 | when (idle/=0) $ void . forkIO $ do | ||
65 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
66 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
67 | fix $ \loop -> do | ||
68 | atomically $ writeTVar flag False | ||
69 | fin <- startDelay d (1000*idle) | ||
70 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
71 | if (not fin) then loop | ||
72 | else do | ||
73 | -- Idle event | ||
74 | atomically $ do | ||
75 | tryTakeTMVar event | ||
76 | putTMVar event PingIdle | ||
77 | writeTVar flag True | ||
78 | fin <- startDelay d (1000*timeout) | ||
79 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
80 | me <- myThreadId | ||
81 | if (not fin) then loop | ||
82 | else do | ||
83 | -- Timeout event | ||
84 | atomically $ do | ||
85 | tryTakeTMVar event | ||
86 | writeTVar flag False | ||
87 | putTMVar event PingTimeOut | ||
88 | return PingMachine | ||
89 | { pingFlag = flag | ||
90 | , pingInterruptible = d | ||
91 | , pingEvent = event | ||
92 | , pingStarted = started | ||
93 | } | ||
94 | |||
95 | -- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically | ||
96 | -- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread | ||
97 | -- regardless of idle value. | ||
98 | forkPingMachineDynamic | ||
99 | :: String | ||
100 | -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
101 | -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
102 | -> IO PingMachine | ||
103 | forkPingMachineDynamic label idleV timeoutV = do | ||
104 | d <- interruptibleDelay | ||
105 | flag <- atomically $ newTVar False | ||
106 | canceled <- atomically $ newTVar False | ||
107 | event <- atomically newEmptyTMVar | ||
108 | started <- atomically $ newEmptyTMVar | ||
109 | void . forkIO $ do | ||
110 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
111 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
112 | fix $ \loop -> do | ||
113 | atomically $ writeTVar flag False | ||
114 | (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV | ||
115 | fin <- startDelay d (1000*idle) | ||
116 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
117 | if (not fin) then loop | ||
118 | else do | ||
119 | -- Idle event | ||
120 | atomically $ do | ||
121 | tryTakeTMVar event | ||
122 | putTMVar event PingIdle | ||
123 | writeTVar flag True | ||
124 | fin <- startDelay d (1000*timeout) | ||
125 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
126 | me <- myThreadId | ||
127 | if (not fin) then loop | ||
128 | else do | ||
129 | -- Timeout event | ||
130 | atomically $ do | ||
131 | tryTakeTMVar event | ||
132 | writeTVar flag False | ||
133 | putTMVar event PingTimeOut | ||
134 | return PingMachine | ||
135 | { pingFlag = flag | ||
136 | , pingInterruptible = d | ||
137 | , pingEvent = event | ||
138 | , pingStarted = started | ||
139 | } | ||
140 | |||
141 | -- | Terminate the watchdog thread. Call this upon connection close. | ||
142 | -- | ||
143 | -- You should ensure no threads are waiting on 'pingWait' because there is no | ||
144 | -- 'PingEvent' signaling termination. | ||
145 | pingCancel :: PingMachine -> IO () | ||
146 | pingCancel me = do | ||
147 | atomically $ do tryTakeTMVar (pingStarted me) | ||
148 | putTMVar (pingStarted me) False | ||
149 | interruptDelay (pingInterruptible me) | ||
150 | |||
151 | -- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'. | ||
152 | pingBump :: PingMachine -> IO () | ||
153 | pingBump me = do | ||
154 | atomically $ do | ||
155 | b <- tryReadTMVar (pingStarted me) | ||
156 | when (b/=Just False) $ do | ||
157 | tryTakeTMVar (pingStarted me) | ||
158 | putTMVar (pingStarted me) True | ||
159 | interruptDelay (pingInterruptible me) | ||
160 | |||
161 | -- | Retries until a 'PingEvent' occurs. | ||
162 | pingWait :: PingMachine -> STM PingEvent | ||
163 | pingWait me = takeTMVar (pingEvent me) | ||