summaryrefslogtreecommitdiff
path: root/dht/PingMachine.hs
blob: ccf5b1d34383301e59226a751266dbd3cc5b09e2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
{-# LANGUAGE CPP #-}
{-# LANGUAGE TupleSections #-}
module PingMachine where

import Control.Monad
import Data.Function
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc                  (labelThread)
#endif
import Control.Concurrent.STM

import InterruptibleDelay

type Miliseconds = Int
type TimeOut = Miliseconds
type PingInterval = Miliseconds

-- | Events that occur as a result of the 'PingMachine' watchdog.
--
-- Use 'pingWait' to wait for one of these to occur.
data PingEvent
    = PingIdle    -- ^ You should send a ping if you observe this event.
    | PingTimeOut -- ^ You should give up on the connection in case of this event.

data PingMachine = PingMachine
    { pingFlag          :: TVar Bool
    , pingInterruptible :: InterruptibleDelay
    , pingEvent         :: TMVar PingEvent
    , pingStarted       :: TMVar Bool
    }

-- | Fork a thread to monitor a connection for a ping timeout.
--
-- If 'pingBump' is not invoked after a idle is signaled, a timeout event will
-- occur.  When that happens, even if the caller chooses to ignore this event,
-- the watchdog thread will be terminated and no more ping events will be
-- signaled.
--
-- An idle connection will be signaled by:
--
--   (1) 'pingFlag' is set 'True'
--
--   (2) 'pingWait' returns 'PingIdle'
--
-- Either may be tested to determine whether a ping should be sent but
-- 'pingFlag' is difficult to use properly because it is up to the caller to
-- remember that the ping is already in progress.
forkPingMachine
    :: String
    -> PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
    -> TimeOut      -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
    -> IO PingMachine
forkPingMachine label idle timeout = do
    d <- interruptibleDelay
    flag <- atomically $ newTVar False
    canceled <- atomically $ newTVar False
    event <- atomically newEmptyTMVar
    started <- atomically $ newEmptyTMVar
    when (idle/=0) $ void . forkIO $ do
        myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
        (>>=) (atomically (readTMVar started)) $ flip when $ do
        fix $ \loop -> do
            atomically $ writeTVar flag False
            fin <- startDelay d (1000*idle)
            (>>=) (atomically (readTMVar started)) $ flip when $ do
            if (not fin) then loop
            else do
            -- Idle event
            atomically $ do
                tryTakeTMVar event
                putTMVar event PingIdle
                writeTVar flag True
            fin <- startDelay d (1000*timeout)
            (>>=) (atomically (readTMVar started)) $ flip when $ do
            me <- myThreadId
            if (not fin) then loop
            else do
            -- Timeout event
            atomically $ do
                tryTakeTMVar event
                writeTVar flag False
                putTMVar event PingTimeOut
    return PingMachine
        { pingFlag          = flag
        , pingInterruptible = d
        , pingEvent         = event
        , pingStarted       = started
        }

-- | like 'forkPingMachine'  but the timeout and idle parameters can be changed dynamically
--  Unlike 'forkPingMachine', 'forkPingMachineDynamic' always launches a thread
--  regardless of idle value.
forkPingMachineDynamic
    :: String
    -> TVar PingInterval -- ^ Milliseconds of idle before a ping is considered necessary.
    -> TVar TimeOut      -- ^ Milliseconds after 'PingIdle' before we signal 'PingTimeOut'.
    -> IO PingMachine
forkPingMachineDynamic label idleV timeoutV = do
    d <- interruptibleDelay
    flag <- atomically $ newTVar False
    canceled <- atomically $ newTVar False
    event <- atomically newEmptyTMVar
    started <- atomically $ newEmptyTMVar
    void . forkIO $ do
        myThreadId >>= flip labelThread ("Ping." ++ label) -- ("ping.watchdog")
        (>>=) (atomically (readTMVar started)) $ flip when $ do
        fix $ \loop -> do
            atomically $ writeTVar flag False
            (idle,timeout) <- atomically $ (,) <$> readTVar idleV <*> readTVar timeoutV
            fin <- startDelay d (1000*idle)
            (>>=) (atomically (readTMVar started)) $ flip when $ do
            if (not fin) then loop
            else do
            -- Idle event
            atomically $ do
                tryTakeTMVar event
                putTMVar event PingIdle
                writeTVar flag True
            fin <- startDelay d (1000*timeout)
            (>>=) (atomically (readTMVar started)) $ flip when $ do
            me <- myThreadId
            if (not fin) then loop
            else do
            -- Timeout event
            atomically $ do
                tryTakeTMVar event
                writeTVar flag False
                putTMVar event PingTimeOut
    return PingMachine
        { pingFlag          = flag
        , pingInterruptible = d
        , pingEvent         = event
        , pingStarted       = started
        }

-- | Terminate the watchdog thread.  Call this upon connection close.
--
-- You should ensure no threads are waiting on 'pingWait' because there is no
-- 'PingEvent' signaling termination.
pingCancel :: PingMachine -> IO ()
pingCancel me = do
    atomically $ do tryTakeTMVar (pingStarted me)
                    putTMVar (pingStarted me) False
    interruptDelay (pingInterruptible me)

-- | Reset the ping timer.  Call this regularly to prevent 'PingTimeOut'.
pingBump :: PingMachine -> IO ()
pingBump me = do
    atomically $ do
            b <- tryReadTMVar (pingStarted me)
            when (b/=Just False) $ do
                tryTakeTMVar (pingStarted me)
                putTMVar (pingStarted me) True
    interruptDelay (pingInterruptible me)

-- | Retries until a 'PingEvent' occurs.
pingWait :: PingMachine -> STM PingEvent
pingWait me = takeTMVar (pingEvent me)