diff options
author | joe <joe@jerkface.net> | 2014-02-10 00:08:23 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-10 00:08:23 -0500 |
commit | 0e3a39ff7d4c6a2ba5190ddfcc38b4bb8d22c367 (patch) | |
tree | 44cbcd17543b310742c250acfb14ec866537a6c5 /Presence/Server.hs | |
parent | e710ed2434496f7891363def81c1c8756eadd129 (diff) |
moved ping logic into ConnectionState
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r-- | Presence/Server.hs | 62 |
1 files changed, 42 insertions, 20 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index c7162265..530b6669 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -311,7 +311,17 @@ killListener (thread,sock) = do sClose sock | |||
311 | 311 | ||
312 | newConnection server params conkey h inout = do | 312 | newConnection server params conkey h inout = do |
313 | hSetBuffering h NoBuffering | 313 | hSetBuffering h NoBuffering |
314 | new <- connectionThreads h | 314 | let (forward,idle_ms,timeout_ms) = |
315 | case (inout,duplex params) of | ||
316 | (Out,True) -> ( const $ return () | ||
317 | , 0 | ||
318 | , 0 ) | ||
319 | _ -> ( announce . (conkey,) . Got | ||
320 | , pingInterval params | ||
321 | , timeout params ) | ||
322 | |||
323 | new <- do pinglogic <- pingMachine idle_ms timeout_ms | ||
324 | connectionThreads h pinglogic | ||
315 | started <- atomically $ newEmptyTMVar | 325 | started <- atomically $ newEmptyTMVar |
316 | kontvar <- atomically newEmptyTMVar | 326 | kontvar <- atomically newEmptyTMVar |
317 | forkIO $ do | 327 | forkIO $ do |
@@ -319,7 +329,6 @@ newConnection server params conkey h inout = do | |||
319 | kont <- atomically getkont | 329 | kont <- atomically getkont |
320 | kont | 330 | kont |
321 | 331 | ||
322 | pinglogic <- pingMachine (pingInterval params) (timeout params) | ||
323 | atomically $ do | 332 | atomically $ do |
324 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) | 333 | current <- fmap (Map.lookup conkey) $ readTVar (conmap server) |
325 | case current of | 334 | case current of |
@@ -335,25 +344,21 @@ newConnection server params conkey h inout = do | |||
335 | announce e | 344 | announce e |
336 | putTMVar kontvar $ return $ do | 345 | putTMVar kontvar $ return $ do |
337 | atomically $ putTMVar started () | 346 | atomically $ putTMVar started () |
338 | handleEOF conkey kontvar newCon pinglogic | 347 | handleEOF conkey kontvar newCon |
339 | Just what@(mvar,_) -> do | 348 | Just what@(mvar,_) -> do |
340 | putTMVar kontvar $ return $ return () | 349 | putTMVar kontvar $ return $ return () |
341 | putTMVar mvar $ do | 350 | putTMVar mvar $ do |
342 | kont <- updateConMap conkey new what pinglogic | 351 | kont <- updateConMap conkey new what |
343 | putTMVar started () | 352 | putTMVar started () |
344 | return kont | 353 | return kont |
345 | forkIO $ do -- inout==In || duplex params then forkIO $ do | 354 | forkIO $ do -- inout==In || duplex params then forkIO $ do |
346 | -- warn $ "waiting read thread: " <> bshow (conkey,inout) | 355 | -- warn $ "waiting read thread: " <> bshow (conkey,inout) |
347 | let forward = | ||
348 | case (inout,duplex params) of | ||
349 | (Out,True) -> const $ return () | ||
350 | _ -> announce . (conkey,) . Got | ||
351 | atomically $ takeTMVar started | 356 | atomically $ takeTMVar started |
352 | pingBump pinglogic -- start the ping timer | 357 | -- pingBump pinglogic -- start the ping timer |
353 | fix $ \loop -> do | 358 | fix $ \loop -> do |
354 | -- warn $ "read thread: " <> bshow (conkey,inout) | 359 | -- warn $ "read thread: " <> bshow (conkey,inout) |
355 | mb <- threadsRead new | 360 | mb <- threadsRead new |
356 | pingBump pinglogic | 361 | -- pingBump pinglogic |
357 | -- warn $ "got: " <> bshow (mb,(conkey,inout)) | 362 | -- warn $ "got: " <> bshow (mb,(conkey,inout)) |
358 | maybe (return ()) | 363 | maybe (return ()) |
359 | (atomically . forward >=> const loop) | 364 | (atomically . forward >=> const loop) |
@@ -363,17 +368,18 @@ newConnection server params conkey h inout = do | |||
363 | 368 | ||
364 | announce e = writeTChan (serverEvent server) e | 369 | announce e = writeTChan (serverEvent server) e |
365 | 370 | ||
366 | handleEOF conkey mvar newCon pingTimer = do | 371 | handleEOF conkey mvar newCon = do |
367 | action <- atomically . foldr1 orElse $ | 372 | action <- atomically . foldr1 orElse $ |
368 | [ takeTMVar mvar >>= id -- passed continuation | 373 | [ takeTMVar mvar >>= id -- passed continuation |
369 | , connWait newCon >> return eof | 374 | , connWait newCon >> return eof |
370 | , pingWait pingTimer >>= return . sendPing | 375 | , connWaitPing newCon >>= return . sendPing |
376 | -- , pingWait pingTimer >>= return . sendPing | ||
371 | ] | 377 | ] |
372 | action :: IO () | 378 | action :: IO () |
373 | where | 379 | where |
374 | eof = do | 380 | eof = do |
375 | -- warn $ "EOF " <>bshow conkey | 381 | -- warn $ "EOF " <>bshow conkey |
376 | pingCancel pingTimer -- force cleanup of ping thread | 382 | connCancelPing newCon |
377 | atomically $ do connFlush newCon | 383 | atomically $ do connFlush newCon |
378 | announce (conkey,EOF) | 384 | announce (conkey,EOF) |
379 | modifyTVar' (conmap server) | 385 | modifyTVar' (conmap server) |
@@ -384,10 +390,10 @@ newConnection server params conkey h inout = do | |||
384 | eof | 390 | eof |
385 | sendPing PingIdle = do | 391 | sendPing PingIdle = do |
386 | atomically . announce $ (conkey,RequiresPing) | 392 | atomically . announce $ (conkey,RequiresPing) |
387 | handleEOF conkey mvar newCon pingTimer | 393 | handleEOF conkey mvar newCon |
388 | 394 | ||
389 | 395 | ||
390 | updateConMap conkey new (mvar,replaced) pingTimer = do | 396 | updateConMap conkey new (mvar,replaced) = do |
391 | new' <- | 397 | new' <- |
392 | if duplex params then do | 398 | if duplex params then do |
393 | announce (conkey,EOF) | 399 | announce (conkey,EOF) |
@@ -410,7 +416,7 @@ newConnection server params conkey h inout = do | |||
410 | In -> ReadOnlyConnection new | 416 | In -> ReadOnlyConnection new |
411 | Out -> WriteOnlyConnection new | 417 | Out -> WriteOnlyConnection new |
412 | modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') | 418 | modifyTVar' (conmap server) $ Map.insert conkey (mvar,new') |
413 | return $ handleEOF conkey mvar new' pingTimer | 419 | return $ handleEOF conkey mvar new' |
414 | 420 | ||
415 | acceptLoop server params sock = handle (acceptException server params sock) $ do | 421 | acceptLoop server params sock = handle (acceptException server params sock) $ do |
416 | con <- accept sock | 422 | con <- accept sock |
@@ -445,12 +451,13 @@ data ConnectionThreads = ConnectionThreads | |||
445 | { threadsWriter :: TMVar (Maybe ByteString) | 451 | { threadsWriter :: TMVar (Maybe ByteString) |
446 | , threadsChannel :: TChan ByteString | 452 | , threadsChannel :: TChan ByteString |
447 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close | 453 | , threadsWait :: STM () -- ^ waits for a 'ConnectionThreads' object to close |
454 | , threadsPing :: PingMachine | ||
448 | } | 455 | } |
449 | 456 | ||
450 | -- | This spawns the reader and writer threads and returns a newly | 457 | -- | This spawns the reader and writer threads and returns a newly |
451 | -- constructed 'ConnectionThreads' object. | 458 | -- constructed 'ConnectionThreads' object. |
452 | connectionThreads :: Handle -> IO ConnectionThreads | 459 | connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads |
453 | connectionThreads h = do | 460 | connectionThreads h pinglogic = do |
454 | 461 | ||
455 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | 462 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar |
456 | writerThread <- forkIO . fix $ \loop -> do | 463 | writerThread <- forkIO . fix $ \loop -> do |
@@ -471,9 +478,12 @@ connectionThreads h = do | |||
471 | let _ = fmap ioeGetErrorType e -- type hint | 478 | let _ = fmap ioeGetErrorType e -- type hint |
472 | atomically $ do putTMVar outs Nothing -- quit writer | 479 | atomically $ do putTMVar outs Nothing -- quit writer |
473 | putTMVar doner () | 480 | putTMVar doner () |
474 | handle (finished . Just) . fix $ \loop -> do | 481 | handle (finished . Just) $ do |
482 | pingBump pinglogic -- start the ping timer | ||
483 | fix $ \loop -> do | ||
475 | packet <- getPacket h | 484 | packet <- getPacket h |
476 | atomically $ writeTChan incomming packet | 485 | atomically $ writeTChan incomming packet |
486 | pingBump pinglogic | ||
477 | isEof <- liftIO $ hIsEOF h | 487 | isEof <- liftIO $ hIsEOF h |
478 | if isEof then finished Nothing else loop | 488 | if isEof then finished Nothing else loop |
479 | 489 | ||
@@ -482,7 +492,8 @@ connectionThreads h = do | |||
482 | return () | 492 | return () |
483 | return ConnectionThreads { threadsWriter = outs | 493 | return ConnectionThreads { threadsWriter = outs |
484 | , threadsChannel = incomming | 494 | , threadsChannel = incomming |
485 | , threadsWait = wait } | 495 | , threadsWait = wait |
496 | , threadsPing = pinglogic } | ||
486 | 497 | ||
487 | 498 | ||
488 | -- | 'threadsWrite' writes the given 'ByteString' to the | 499 | -- | 'threadsWrite' writes the given 'ByteString' to the |
@@ -555,6 +566,17 @@ connClose c = mapConn True threadsClose c | |||
555 | connWait :: ConnectionState -> STM () | 566 | connWait :: ConnectionState -> STM () |
556 | connWait c = mapConn False threadsWait c | 567 | connWait c = mapConn False threadsWait c |
557 | 568 | ||
569 | connPingTimer c = | ||
570 | case c of | ||
571 | SaneConnection rw -> threadsPing rw | ||
572 | ReadOnlyConnection r -> threadsPing r | ||
573 | WriteOnlyConnection w -> threadsPing w -- should be disabled. | ||
574 | ConnectionPair r w -> threadsPing r | ||
575 | |||
576 | connCancelPing c = pingCancel (connPingTimer c) | ||
577 | connWaitPing c = pingWait (connPingTimer c) | ||
578 | |||
579 | |||
558 | connFlush c = | 580 | connFlush c = |
559 | case c of | 581 | case c of |
560 | SaneConnection rw -> waitChan rw | 582 | SaneConnection rw -> waitChan rw |