diff options
author | joe <joe@jerkface.net> | 2014-02-13 17:57:23 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-13 17:57:23 -0500 |
commit | e8358efbdee377fe98e6d50c518d2a9072a3ce6e (patch) | |
tree | 33d27316700cb672115afe0bae37ad8eb59bc43e /Presence | |
parent | e3ff0df9fbdf28dfa5659a8392d89fa66c38a8df (diff) |
ConnectWithEndlessRetry
Diffstat (limited to 'Presence')
-rw-r--r-- | Presence/Server.hs | 107 |
1 files changed, 100 insertions, 7 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 5e54ff27..1141f0b0 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -60,14 +60,16 @@ import Network.BSD | |||
60 | ( getProtocolNumber | 60 | ( getProtocolNumber |
61 | ) | 61 | ) |
62 | import Debug.Trace | 62 | import Debug.Trace |
63 | import Data.Time.Clock (UTCTime,getCurrentTime) | 63 | import Data.Time.Clock (UTCTime,getCurrentTime,diffUTCTime) |
64 | import Data.Time.Format (formatTime) | 64 | import Data.Time.Format (formatTime) |
65 | import System.Locale (defaultTimeLocale) | 65 | import System.Locale (defaultTimeLocale) |
66 | 66 | ||
67 | todo = error "unimplemented" | 67 | todo = error "unimplemented" |
68 | 68 | ||
69 | type TimeOut = Int -- ^ miliseconds | 69 | type Microseconds = Int |
70 | type PingInterval = Int -- ^ miliseconds | 70 | type Miliseconds = Int |
71 | type TimeOut = Miliseconds | ||
72 | type PingInterval = Miliseconds | ||
71 | 73 | ||
72 | -- | This object is passed with the 'Listen' and 'Connect' | 74 | -- | This object is passed with the 'Listen' and 'Connect' |
73 | -- instructions in order to control the behavior of the | 75 | -- instructions in order to control the behavior of the |
@@ -128,6 +130,10 @@ data ServerInstruction conkey | |||
128 | -- ^ listen for incomming connections | 130 | -- ^ listen for incomming connections |
129 | | Connect SockAddr (ConnectionParameters conkey) | 131 | | Connect SockAddr (ConnectionParameters conkey) |
130 | -- ^ connect to addresses | 132 | -- ^ connect to addresses |
133 | | ConnectWithEndlessRetry SockAddr | ||
134 | (ConnectionParameters conkey) | ||
135 | Miliseconds | ||
136 | -- ^ keep retrying the connection | ||
131 | | Ignore PortNumber | 137 | | Ignore PortNumber |
132 | -- ^ stop listening on specified port | 138 | -- ^ stop listening on specified port |
133 | | Send conkey ByteString | 139 | | Send conkey ByteString |
@@ -178,6 +184,7 @@ data Server a | |||
178 | , serverReleaseKey :: ReleaseKey | 184 | , serverReleaseKey :: ReleaseKey |
179 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) | 185 | , conmap :: TVar (Map a (TMVar (STM (IO ())), ConnectionState)) |
180 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) | 186 | , listenmap :: TVar (Map PortNumber (ThreadId,Socket)) |
187 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | ||
181 | } | 188 | } |
182 | 189 | ||
183 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | 190 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' |
@@ -210,11 +217,13 @@ server = do | |||
210 | tchan <- newTChan | 217 | tchan <- newTChan |
211 | conmap <- newTVar Map.empty | 218 | conmap <- newTVar Map.empty |
212 | listenmap<- newTVar Map.empty | 219 | listenmap<- newTVar Map.empty |
220 | retrymap <- newTVar Map.empty | ||
213 | return Server { serverCommand = cmds | 221 | return Server { serverCommand = cmds |
214 | , serverEvent = tchan | 222 | , serverEvent = tchan |
215 | , serverReleaseKey = key | 223 | , serverReleaseKey = key |
216 | , conmap = conmap | 224 | , conmap = conmap |
217 | , listenmap = listenmap | 225 | , listenmap = listenmap |
226 | , retrymap = retrymap | ||
218 | } | 227 | } |
219 | liftIO $ do | 228 | liftIO $ do |
220 | forkIO $ fix $ \loop -> do | 229 | forkIO $ fix $ \loop -> do |
@@ -230,6 +239,10 @@ server = do | |||
230 | closeAll server = liftIO $ do | 239 | closeAll server = liftIO $ do |
231 | listening <- atomically . readTVar $ listenmap server | 240 | listening <- atomically . readTVar $ listenmap server |
232 | mapM_ killListener (Map.elems listening) | 241 | mapM_ killListener (Map.elems listening) |
242 | let stopRetry (v,d) = do atomically $ writeTVar v False | ||
243 | interruptDelay d | ||
244 | retriers <- atomically . readTVar $ retrymap server | ||
245 | mapM_ stopRetry (Map.elems retriers) | ||
233 | cons <- atomically . readTVar $ conmap server | 246 | cons <- atomically . readTVar $ conmap server |
234 | atomically $ mapM_ (connClose . snd) (Map.elems cons) | 247 | atomically $ mapM_ (connClose . snd) (Map.elems cons) |
235 | atomically $ mapM_ (connWait . snd) (Map.elems cons) | 248 | atomically $ mapM_ (connWait . snd) (Map.elems cons) |
@@ -278,7 +291,7 @@ server = do | |||
278 | doit server (Connect addr params) = liftIO $ do | 291 | doit server (Connect addr params) = liftIO $ do |
279 | void . forkIO $ do | 292 | void . forkIO $ do |
280 | proto <- getProtocolNumber "tcp" | 293 | proto <- getProtocolNumber "tcp" |
281 | bracketOnError | 294 | sock <- bracketOnError |
282 | (socket (socketFamily addr) Stream proto) | 295 | (socket (socketFamily addr) Stream proto) |
283 | (\sock -> do -- only done if there's an error | 296 | (\sock -> do -- only done if there's an error |
284 | -- Weird hack: puting the would-be peer address | 297 | -- Weird hack: puting the would-be peer address |
@@ -288,12 +301,66 @@ server = do | |||
288 | atomically | 301 | atomically |
289 | $ writeTChan (serverEvent server) | 302 | $ writeTChan (serverEvent server) |
290 | $ (conkey,ConnectFailure addr)) | 303 | $ (conkey,ConnectFailure addr)) |
291 | $ \sock -> do | 304 | $ \sock -> do connect sock addr |
292 | connect sock addr | 305 | return sock |
293 | me <- getSocketName sock | 306 | me <- getSocketName sock |
294 | conkey <- makeConnKey params (sock,me) | 307 | conkey <- makeConnKey params (sock,me) |
295 | h <- socketToHandle sock ReadWriteMode | 308 | h <- socketToHandle sock ReadWriteMode |
296 | newConnection server params conkey h Out | 309 | newConnection server params conkey h Out |
310 | return () | ||
311 | |||
312 | doit server (ConnectWithEndlessRetry addr params interval) = liftIO $ do | ||
313 | proto <- getProtocolNumber "tcp" | ||
314 | void . forkIO $ do | ||
315 | resultVar <- atomically newEmptyTMVar | ||
316 | timer <- interruptableDelay | ||
317 | (retryVar,action) <- atomically $ do | ||
318 | let noop = return () | ||
319 | map <- readTVar (retrymap server) | ||
320 | let mb = Map.lookup addr map | ||
321 | action <- | ||
322 | maybe (return noop) | ||
323 | (\(v,d) -> do writeTVar v False | ||
324 | return $ interruptDelay d) | ||
325 | mb | ||
326 | v <- newTVar True | ||
327 | writeTVar (retrymap server) (Map.insert addr (v,timer) map) | ||
328 | return (v,action) | ||
329 | action | ||
330 | fix $ \retryLoop -> do | ||
331 | utc <- getCurrentTime | ||
332 | forkIO $ do | ||
333 | sock <- bracketOnError | ||
334 | (socket (socketFamily addr) Stream proto) | ||
335 | (\sock -> do -- only done if there's an error | ||
336 | -- Weird hack: puting the would-be peer address | ||
337 | -- instead of local socketName | ||
338 | conkey <- makeConnKey params (sock,addr) -- XXX: ? | ||
339 | sClose sock | ||
340 | atomically $ do | ||
341 | writeTChan (serverEvent server) | ||
342 | $ (conkey,ConnectFailure addr) | ||
343 | retry <- readTVar retryVar | ||
344 | putTMVar resultVar retry) | ||
345 | $ \sock -> do connect sock addr | ||
346 | return sock | ||
347 | me <- getSocketName sock | ||
348 | conkey <- makeConnKey params (sock,me) | ||
349 | h <- socketToHandle sock ReadWriteMode | ||
350 | threads <- newConnection server params conkey h Out | ||
351 | atomically $ do threadsWait threads | ||
352 | retry <- readTVar retryVar | ||
353 | putTMVar resultVar retry | ||
354 | retry <- atomically $ takeTMVar resultVar | ||
355 | fin_utc <- getCurrentTime | ||
356 | when retry $ do | ||
357 | let elapsed = 1000.0 * (fin_utc `diffUTCTime` utc) | ||
358 | expected = fromIntegral interval | ||
359 | when (retry && elapsed < expected) $ do | ||
360 | warn $ "Waiting to retry " <> bshow addr | ||
361 | startDelay timer (round $ 1000 * (expected-elapsed)) | ||
362 | warn $ "retry " <> bshow (retry,addr) | ||
363 | when retry $ retryLoop | ||
297 | 364 | ||
298 | 365 | ||
299 | -- INTERNAL ---------------------------------------------------------- | 366 | -- INTERNAL ---------------------------------------------------------- |
@@ -397,7 +464,7 @@ newConnection server params conkey h inout = do | |||
397 | mb | 464 | mb |
398 | warn $ "quit-gots: " <> bshow (conkey,inout) | 465 | warn $ "quit-gots: " <> bshow (conkey,inout) |
399 | #endif | 466 | #endif |
400 | return () | 467 | return new |
401 | where | 468 | where |
402 | 469 | ||
403 | announce e = writeTChan (serverEvent server) e | 470 | announce e = writeTChan (serverEvent server) e |
@@ -754,3 +821,29 @@ pingWait me = do | |||
754 | return e | 821 | return e |
755 | 822 | ||
756 | 823 | ||
824 | data InterruptableDelay = InterruptableDelay | ||
825 | { delayThread :: TMVar ThreadId | ||
826 | } | ||
827 | |||
828 | interruptableDelay :: IO InterruptableDelay | ||
829 | interruptableDelay = do | ||
830 | fmap InterruptableDelay | ||
831 | $ atomically newEmptyTMVar | ||
832 | startDelay :: InterruptableDelay -> Microseconds -> IO () | ||
833 | startDelay d interval = do | ||
834 | thread <- myThreadId | ||
835 | atomically $ putTMVar (delayThread d) thread | ||
836 | handle (\(ErrorCall _)-> warn $ "delay interrupted" ) $ do | ||
837 | threadDelay interval | ||
838 | void . atomically $ takeTMVar (delayThread d) | ||
839 | |||
840 | interruptDelay :: InterruptableDelay -> IO () | ||
841 | interruptDelay d = do | ||
842 | mthread <- atomically $ do | ||
843 | tryTakeTMVar (delayThread d) | ||
844 | flip (maybe $ return ()) mthread $ \thread -> do | ||
845 | throwTo thread (ErrorCall "Interrupted delay") | ||
846 | atomically $ do | ||
847 | putTMVar (delayThread d) thread | ||
848 | |||
849 | |||