summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-13 17:57:23 -0500
committerjoe <joe@jerkface.net>2014-02-13 17:57:23 -0500
commite8358efbdee377fe98e6d50c518d2a9072a3ce6e (patch)
tree33d27316700cb672115afe0bae37ad8eb59bc43e /Presence
parente3ff0df9fbdf28dfa5659a8392d89fa66c38a8df (diff)
ConnectWithEndlessRetry
Diffstat (limited to 'Presence')
-rw-r--r--Presence/Server.hs107
1 files changed, 100 insertions, 7 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 5e54ff27..1141f0b0 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -60,14 +60,16 @@ import Network.BSD
60 ( getProtocolNumber 60 ( getProtocolNumber
61 ) 61 )
62import Debug.Trace 62import Debug.Trace
63import Data.Time.Clock (UTCTime,getCurrentTime) 63import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime)
64import Data.Time.Format (formatTime) 64import Data.Time.Format (formatTime)
65import System.Locale (defaultTimeLocale) 65import System.Locale (defaultTimeLocale)
66 66
67todo = error "unimplemented" 67todo = error "unimplemented"
68 68
69type TimeOut = Int -- ^ miliseconds 69type Microseconds = Int
70type PingInterval = Int -- ^ miliseconds 70type Miliseconds = Int
71type TimeOut = Miliseconds
72type PingInterval = Miliseconds
71 73
72-- | This object is passed with the 'Listen' and 'Connect' 74-- | This object is passed with the 'Listen' and 'Connect'
73-- instructions in order to control the behavior of the 75-- instructions in order to control the behavior of the
@@ -128,6 +130,10 @@ data ServerInstruction conkey
128 -- ^ listen for incomming connections 130 -- ^ listen for incomming connections
129 | Connect SockAddr (ConnectionParameters conkey) 131 | Connect SockAddr (ConnectionParameters conkey)
130 -- ^ connect to addresses 132 -- ^ connect to addresses
133 | ConnectWithEndlessRetry SockAddr
134 (ConnectionParameters conkey)
135 Miliseconds
136 -- ^ keep retrying the connection
131 | Ignore PortNumber 137 | Ignore PortNumber
132 -- ^ stop listening on specified port 138 -- ^ stop listening on specified port
133 | Send conkey ByteString 139 | Send conkey ByteString
@@ -178,6 +184,7 @@ data Server a
178 , serverReleaseKey :: ReleaseKey 184 , serverReleaseKey :: ReleaseKey
179 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) 185 , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState))
180 , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) 186 , listenmap :: TVar (Map PortNumber (ThreadId,Socket))
187 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
181 } 188 }
182 189
183-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' 190-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
@@ -210,11 +217,13 @@ server = do
210 tchan <- newTChan 217 tchan <- newTChan
211 conmap <- newTVar Map.empty 218 conmap <- newTVar Map.empty
212 listenmap<- newTVar Map.empty 219 listenmap<- newTVar Map.empty
220 retrymap <- newTVar Map.empty
213 return Server { serverCommand = cmds 221 return Server { serverCommand = cmds
214 , serverEvent = tchan 222 , serverEvent = tchan
215 , serverReleaseKey = key 223 , serverReleaseKey = key
216 , conmap = conmap 224 , conmap = conmap
217 , listenmap = listenmap 225 , listenmap = listenmap
226 , retrymap = retrymap
218 } 227 }
219 liftIO $ do 228 liftIO $ do
220 forkIO $ fix $ \loop -> do 229 forkIO $ fix $ \loop -> do
@@ -230,6 +239,10 @@ server = do
230 closeAll server = liftIO $ do 239 closeAll server = liftIO $ do
231 listening <- atomically . readTVar $ listenmap server 240 listening <- atomically . readTVar $ listenmap server
232 mapM_ killListener (Map.elems listening) 241 mapM_ killListener (Map.elems listening)
242 let stopRetry (v,d) = do atomically $ writeTVar v False
243 interruptDelay d
244 retriers <- atomically . readTVar $ retrymap server
245 mapM_ stopRetry (Map.elems retriers)
233 cons <- atomically . readTVar $ conmap server 246 cons <- atomically . readTVar $ conmap server
234 atomically $ mapM_ (connClose . snd) (Map.elems cons) 247 atomically $ mapM_ (connClose . snd) (Map.elems cons)
235 atomically $ mapM_ (connWait . snd) (Map.elems cons) 248 atomically $ mapM_ (connWait . snd) (Map.elems cons)
@@ -278,7 +291,7 @@ server = do
278 doit server (Connect addr params) = liftIO $ do 291 doit server (Connect addr params) = liftIO $ do
279 void . forkIO $ do 292 void . forkIO $ do
280 proto <- getProtocolNumber "tcp" 293 proto <- getProtocolNumber "tcp"
281 bracketOnError 294 sock <- bracketOnError
282 (socket (socketFamily addr) Stream proto) 295 (socket (socketFamily addr) Stream proto)
283 (\sock -> do -- only done if there's an error 296 (\sock -> do -- only done if there's an error
284 -- Weird hack: puting the would-be peer address 297 -- Weird hack: puting the would-be peer address
@@ -288,12 +301,66 @@ server = do
288 atomically 301 atomically
289 $ writeTChan (serverEvent server) 302 $ writeTChan (serverEvent server)
290 $ (conkey,ConnectFailure addr)) 303 $ (conkey,ConnectFailure addr))
291 $ \sock -> do 304 $ \sock -> do connect sock addr
292 connect sock addr 305 return sock
293 me <- getSocketName sock 306 me <- getSocketName sock
294 conkey <- makeConnKey params (sock,me) 307 conkey <- makeConnKey params (sock,me)
295 h <- socketToHandle sock ReadWriteMode 308 h <- socketToHandle sock ReadWriteMode
296 newConnection server params conkey h Out 309 newConnection server params conkey h Out
310 return ()
311
312 doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do
313 proto <- getProtocolNumber "tcp"
314 void . forkIO $ do
315 resultVar <- atomically newEmptyTMVar
316 timer <- interruptableDelay
317 (retryVar,action) <- atomically $ do
318 let noop = return ()
319 map <- readTVar (retrymap server)
320 let mb = Map.lookup addr map
321 action <-
322 maybe (return noop)
323 (\(v,d) -> do writeTVar v False
324 return $ interruptDelay d)
325 mb
326 v <- newTVar True
327 writeTVar (retrymap server) (Map.insert addr (v,timer) map)
328 return (v,action)
329 action
330 fix $ \retryLoop -> do
331 utc <- getCurrentTime
332 forkIO $ do
333 sock <- bracketOnError
334 (socket (socketFamily addr) Stream proto)
335 (\sock -> do -- only done if there's an error
336 -- Weird hack: puting the would-be peer address
337 -- instead of local socketName
338 conkey <- makeConnKey params (sock,addr) -- XXX: ?
339 sClose sock
340 atomically $ do
341 writeTChan (serverEvent server)
342 $ (conkey,ConnectFailure addr)
343 retry <- readTVar retryVar
344 putTMVar resultVar retry)
345 $ \sock -> do connect sock addr
346 return sock
347 me <- getSocketName sock
348 conkey <- makeConnKey params (sock,me)
349 h <- socketToHandle sock ReadWriteMode
350 threads <- newConnection server params conkey h Out
351 atomically $ do threadsWait threads
352 retry <- readTVar retryVar
353 putTMVar resultVar retry
354 retry <- atomically $ takeTMVar resultVar
355 fin_utc <- getCurrentTime
356 when retry $ do
357 let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc)
358 expected = fromIntegral interval
359 when (retry && elapsed < expected) $ do
360 warn $ "Waiting to retry " <> bshow addr
361 startDelay timer (round $ 1000 * (expected-elapsed))
362 warn $ "retry " <> bshow (retry,addr)
363 when retry $ retryLoop
297 364
298 365
299-- INTERNAL ---------------------------------------------------------- 366-- INTERNAL ----------------------------------------------------------
@@ -397,7 +464,7 @@ newConnection server params conkey h inout = do
397 mb 464 mb
398 warn $ "quit-gots: " <> bshow (conkey,inout) 465 warn $ "quit-gots: " <> bshow (conkey,inout)
399#endif 466#endif
400 return () 467 return new
401 where 468 where
402 469
403 announce e = writeTChan (serverEvent server) e 470 announce e = writeTChan (serverEvent server) e
@@ -754,3 +821,29 @@ pingWait me = do
754 return e 821 return e
755 822
756 823
824data InterruptableDelay = InterruptableDelay
825 { delayThread :: TMVar ThreadId
826 }
827
828interruptableDelay :: IO InterruptableDelay
829interruptableDelay = do
830 fmap InterruptableDelay
831 $ atomically newEmptyTMVar
832startDelay :: InterruptableDelay -> Microseconds -> IO ()
833startDelay d interval = do
834 thread <- myThreadId
835 atomically $ putTMVar (delayThread d) thread
836 handle (\(ErrorCall _)-> warn $ "delay interrupted" ) $ do
837 threadDelay interval
838 void . atomically $ takeTMVar (delayThread d)
839
840interruptDelay :: InterruptableDelay -> IO ()
841interruptDelay d = do
842 mthread <- atomically $ do
843 tryTakeTMVar (delayThread d)
844 flip (maybe $ return ()) mthread $ \thread -> do
845 throwTo thread (ErrorCall "Interrupted delay")
846 atomically $ do
847 putTMVar (delayThread d) thread
848
849