summaryrefslogtreecommitdiff
path: root/dht/PingMachine.hs
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-09-28 13:43:29 -0400
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:27:53 -0500
commit11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch)
tree5716463275c2d3e902889db619908ded2a73971c /dht/PingMachine.hs
parentadd2c76bced51fde5e9917e7449ef52be70faf87 (diff)
Factor out some new libraries
word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search
Diffstat (limited to 'dht/PingMachine.hs')
-rw-r--r--dht/PingMachine.hs161
1 files changed, 161 insertions, 0 deletions
diff --git a/dht/PingMachine.hs b/dht/PingMachine.hs
new file mode 100644
index 00000000..ccf5b1d3
--- /dev/null
+++ b/dht/PingMachine.hs
@@ -0,0 +1,161 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE TupleSections #-}
3module PingMachine where
4
5import Control.Monad
6import Data.Function
7#ifdef THREAD_DEBUG
8import Control.Concurrent.Lifted.Instrument
9#else
10import Control.Concurrent.Lifted
11import GHC.Conc (labelThread)
12#endif
13import Control.Concurrent.STM
14
15import InterruptibleDelay
16
17type Miliseconds = Int
18type TimeOut = Miliseconds
19type PingInterval = Miliseconds
20
21-- | Events that occur as a result of the 'PingMachine' watchdog.
22--
23-- Use 'pingWait' to wait for one of these to occur.
24data PingEvent
25 = PingIdle -- ^ You should send a ping if you observe this event.
26 | PingTimeOut -- ^ You should give up on the connection in case of this event.
27
28data PingMachine = PingMachine
29 { pingFlag :: TVar Bool
30 , pingInterruptible :: InterruptibleDelay
31 , pingEvent :: TMVar PingEvent
32 , pingStarted :: TMVar Bool
33 }
34
35-- | Fork a thread to monitor a connection for a ping timeout.
36--
37-- If 'pingBump' is not invoked after a idle is signaled, a timeout event will
38-- occur. When that happens, even if the caller chooses to ignore this event,
39-- the watchdog thread will be terminated and no more ping events will be
40-- signaled.
41--
42-- An idle connection will be signaled by:
43--
44-- (1) 'pingFlag' is set 'True'
45--
46-- (2) 'pingWait' returns 'PingIdle'
47--
48-- Either may be tested to determine whether a ping should be sent but
49-- 'pingFlag' is difficult to use properly because it is up to the caller to
50-- remember that the ping is already in progress.
51forkPingMachine
52 :: String
53 -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
54 -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
55 -> IO PingMachine
56forkPingMachine label idle timeout = do
57 d <- interruptibleDelay
58 flag <- atomically $ newTVar False
59 canceled <- atomically $ newTVar False
60 event <- atomically newEmptyTMVar
61 started <- atomically $ newEmptyTMVar
62 when (idle/=0) $ void . forkIO $ do
63 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
64 (>>=) (atomically (readTMVar started)) $ flip when $ do
65 fix $ \loop -> do
66 atomically $ writeTVar flag False
67 fin <- startDelay d (1000*idle)
68 (>>=) (atomically (readTMVar started)) $ flip when $ do
69 if (not fin) then loop
70 else do
71 -- Idle event
72 atomically $ do
73 tryTakeTMVar event
74 putTMVar event PingIdle
75 writeTVar flag True
76 fin <- startDelay d (1000*timeout)
77 (>>=) (atomically (readTMVar started)) $ flip when $ do
78 me <- myThreadId
79 if (not fin) then loop
80 else do
81 -- Timeout event
82 atomically $ do
83 tryTakeTMVar event
84 writeTVar flag False
85 putTMVar event PingTimeOut
86 return PingMachine
87 { pingFlag = flag
88 , pingInterruptible = d
89 , pingEvent = event
90 , pingStarted = started
91 }
92
93-- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically
94-- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread
95-- regardless of idle value.
96forkPingMachineDynamic
97 :: String
98 -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
99 -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
100 -> IO PingMachine
101forkPingMachineDynamic label idleV timeoutV = do
102 d <- interruptibleDelay
103 flag <- atomically $ newTVar False
104 canceled <- atomically $ newTVar False
105 event <- atomically newEmptyTMVar
106 started <- atomically $ newEmptyTMVar
107 void . forkIO $ do
108 myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
109 (>>=) (atomically (readTMVar started)) $ flip when $ do
110 fix $ \loop -> do
111 atomically $ writeTVar flag False
112 (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV
113 fin <- startDelay d (1000*idle)
114 (>>=) (atomically (readTMVar started)) $ flip when $ do
115 if (not fin) then loop
116 else do
117 -- Idle event
118 atomically $ do
119 tryTakeTMVar event
120 putTMVar event PingIdle
121 writeTVar flag True
122 fin <- startDelay d (1000*timeout)
123 (>>=) (atomically (readTMVar started)) $ flip when $ do
124 me <- myThreadId
125 if (not fin) then loop
126 else do
127 -- Timeout event
128 atomically $ do
129 tryTakeTMVar event
130 writeTVar flag False
131 putTMVar event PingTimeOut
132 return PingMachine
133 { pingFlag = flag
134 , pingInterruptible = d
135 , pingEvent = event
136 , pingStarted = started
137 }
138
139-- | Terminate the watchdog thread. Call this upon connection close.
140--
141-- You should ensure no threads are waiting on 'pingWait' because there is no
142-- 'PingEvent' signaling termination.
143pingCancel :: PingMachine -> IO ()
144pingCancel me = do
145 atomically $ do tryTakeTMVar (pingStarted me)
146 putTMVar (pingStarted me) False
147 interruptDelay (pingInterruptible me)
148
149-- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'.
150pingBump :: PingMachine -> IO ()
151pingBump me = do
152 atomically $ do
153 b <- tryReadTMVar (pingStarted me)
154 when (b/=Just False) $ do
155 tryTakeTMVar (pingStarted me)
156 putTMVar (pingStarted me) True
157 interruptDelay (pingInterruptible me)
158
159-- | Retries until a 'PingEvent' occurs.
160pingWait :: PingMachine -> STM PingEvent
161pingWait me = takeTMVar (pingEvent me)