diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /dht/PingMachine.hs | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'dht/PingMachine.hs')
-rw-r--r-- | dht/PingMachine.hs | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/dht/PingMachine.hs b/dht/PingMachine.hs new file mode 100644 index 00000000..ccf5b1d3 --- /dev/null +++ b/dht/PingMachine.hs | |||
@@ -0,0 +1,161 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TupleSections #-} | ||
3 | module PingMachine where | ||
4 | |||
5 | import Control.Monad | ||
6 | import Data.Function | ||
7 | #ifdef THREAD_DEBUG | ||
8 | import Control.Concurrent.Lifted.Instrument | ||
9 | #else | ||
10 | import Control.Concurrent.Lifted | ||
11 | import GHC.Conc (labelThread) | ||
12 | #endif | ||
13 | import Control.Concurrent.STM | ||
14 | |||
15 | import InterruptibleDelay | ||
16 | |||
17 | type Miliseconds = Int | ||
18 | type TimeOut = Miliseconds | ||
19 | type PingInterval = Miliseconds | ||
20 | |||
21 | -- | Events that occur as a result of the 'PingMachine' watchdog. | ||
22 | -- | ||
23 | -- Use 'pingWait' to wait for one of these to occur. | ||
24 | data PingEvent | ||
25 | = PingIdle -- ^ You should send a ping if you observe this event. | ||
26 | | PingTimeOut -- ^ You should give up on the connection in case of this event. | ||
27 | |||
28 | data PingMachine = PingMachine | ||
29 | { pingFlag :: TVar Bool | ||
30 | , pingInterruptible :: InterruptibleDelay | ||
31 | , pingEvent :: TMVar PingEvent | ||
32 | , pingStarted :: TMVar Bool | ||
33 | } | ||
34 | |||
35 | -- | Fork a thread to monitor a connection for a ping timeout. | ||
36 | -- | ||
37 | -- If 'pingBump' is not invoked after a idle is signaled, a timeout event will | ||
38 | -- occur. When that happens, even if the caller chooses to ignore this event, | ||
39 | -- the watchdog thread will be terminated and no more ping events will be | ||
40 | -- signaled. | ||
41 | -- | ||
42 | -- An idle connection will be signaled by: | ||
43 | -- | ||
44 | -- (1) 'pingFlag' is set 'True' | ||
45 | -- | ||
46 | -- (2) 'pingWait' returns 'PingIdle' | ||
47 | -- | ||
48 | -- Either may be tested to determine whether a ping should be sent but | ||
49 | -- 'pingFlag' is difficult to use properly because it is up to the caller to | ||
50 | -- remember that the ping is already in progress. | ||
51 | forkPingMachine | ||
52 | :: String | ||
53 | -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
54 | -> TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
55 | -> IO PingMachine | ||
56 | forkPingMachine label idle timeout = do | ||
57 | d <- interruptibleDelay | ||
58 | flag <- atomically $ newTVar False | ||
59 | canceled <- atomically $ newTVar False | ||
60 | event <- atomically newEmptyTMVar | ||
61 | started <- atomically $ newEmptyTMVar | ||
62 | when (idle/=0) $ void . forkIO $ do | ||
63 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
64 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
65 | fix $ \loop -> do | ||
66 | atomically $ writeTVar flag False | ||
67 | fin <- startDelay d (1000*idle) | ||
68 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
69 | if (not fin) then loop | ||
70 | else do | ||
71 | -- Idle event | ||
72 | atomically $ do | ||
73 | tryTakeTMVar event | ||
74 | putTMVar event PingIdle | ||
75 | writeTVar flag True | ||
76 | fin <- startDelay d (1000*timeout) | ||
77 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
78 | me <- myThreadId | ||
79 | if (not fin) then loop | ||
80 | else do | ||
81 | -- Timeout event | ||
82 | atomically $ do | ||
83 | tryTakeTMVar event | ||
84 | writeTVar flag False | ||
85 | putTMVar event PingTimeOut | ||
86 | return PingMachine | ||
87 | { pingFlag = flag | ||
88 | , pingInterruptible = d | ||
89 | , pingEvent = event | ||
90 | , pingStarted = started | ||
91 | } | ||
92 | |||
93 | -- | like 'forkPingMachine' but the timeout and idle parameters can be changed dynamically | ||
94 | -- Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread | ||
95 | -- regardless of idle value. | ||
96 | forkPingMachineDynamic | ||
97 | :: String | ||
98 | -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary. | ||
99 | -> TVar TimeOut -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'. | ||
100 | -> IO PingMachine | ||
101 | forkPingMachineDynamic label idleV timeoutV = do | ||
102 | d <- interruptibleDelay | ||
103 | flag <- atomically $ newTVar False | ||
104 | canceled <- atomically $ newTVar False | ||
105 | event <- atomically newEmptyTMVar | ||
106 | started <- atomically $ newEmptyTMVar | ||
107 | void . forkIO $ do | ||
108 | myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog") | ||
109 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
110 | fix $ \loop -> do | ||
111 | atomically $ writeTVar flag False | ||
112 | (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV | ||
113 | fin <- startDelay d (1000*idle) | ||
114 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
115 | if (not fin) then loop | ||
116 | else do | ||
117 | -- Idle event | ||
118 | atomically $ do | ||
119 | tryTakeTMVar event | ||
120 | putTMVar event PingIdle | ||
121 | writeTVar flag True | ||
122 | fin <- startDelay d (1000*timeout) | ||
123 | (>>=) (atomically (readTMVar started)) $ flip when $ do | ||
124 | me <- myThreadId | ||
125 | if (not fin) then loop | ||
126 | else do | ||
127 | -- Timeout event | ||
128 | atomically $ do | ||
129 | tryTakeTMVar event | ||
130 | writeTVar flag False | ||
131 | putTMVar event PingTimeOut | ||
132 | return PingMachine | ||
133 | { pingFlag = flag | ||
134 | , pingInterruptible = d | ||
135 | , pingEvent = event | ||
136 | , pingStarted = started | ||
137 | } | ||
138 | |||
139 | -- | Terminate the watchdog thread. Call this upon connection close. | ||
140 | -- | ||
141 | -- You should ensure no threads are waiting on 'pingWait' because there is no | ||
142 | -- 'PingEvent' signaling termination. | ||
143 | pingCancel :: PingMachine -> IO () | ||
144 | pingCancel me = do | ||
145 | atomically $ do tryTakeTMVar (pingStarted me) | ||
146 | putTMVar (pingStarted me) False | ||
147 | interruptDelay (pingInterruptible me) | ||
148 | |||
149 | -- | Reset the ping timer. Call this regularly to prevent 'PingTimeOut'. | ||
150 | pingBump :: PingMachine -> IO () | ||
151 | pingBump me = do | ||
152 | atomically $ do | ||
153 | b <- tryReadTMVar (pingStarted me) | ||
154 | when (b/=Just False) $ do | ||
155 | tryTakeTMVar (pingStarted me) | ||
156 | putTMVar (pingStarted me) True | ||
157 | interruptDelay (pingInterruptible me) | ||
158 | |||
159 | -- | Retries until a 'PingEvent' occurs. | ||
160 | pingWait :: PingMachine -> STM PingEvent | ||
161 | pingWait me = takeTMVar (pingEvent me) | ||