diff options
author | joe <joe@jerkface.net> | 2014-02-10 02:52:23 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-10 02:52:23 -0500 |
commit | afff51ac877ce8807801334745f1679dbf6440d0 (patch) | |
tree | c52a620f7584c712b27ba8a631a7b38744e0ffdd /Presence/Server.hs | |
parent | 0e3a39ff7d4c6a2ba5190ddfcc38b4bb8d22c367 (diff) |
Fixed ping bug, forked connection thread in xmppServer
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r-- | Presence/Server.hs | 113 |
1 files changed, 77 insertions, 36 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 530b6669..4ee55821 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -48,6 +48,7 @@ import System.IO | |||
48 | , hClose | 48 | , hClose |
49 | , hIsEOF | 49 | , hIsEOF |
50 | , stderr | 50 | , stderr |
51 | , stdout | ||
51 | , Handle | 52 | , Handle |
52 | , hFlush | 53 | , hFlush |
53 | ) | 54 | ) |
@@ -142,7 +143,7 @@ data InOrOut = In | Out | |||
142 | data ConnectionEvent b | 143 | data ConnectionEvent b |
143 | = Got b | 144 | = Got b |
144 | -- ^ Arrival of data from a socket | 145 | -- ^ Arrival of data from a socket |
145 | | Connection | 146 | | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) |
146 | -- ^ A new connection was established | 147 | -- ^ A new connection was established |
147 | | HalfConnection InOrOut | 148 | | HalfConnection InOrOut |
148 | -- ^ Half of a half-duplex connection is avaliable. | 149 | -- ^ Half of a half-duplex connection is avaliable. |
@@ -151,8 +152,10 @@ data ConnectionEvent b | |||
151 | | RequiresPing | 152 | | RequiresPing |
152 | -- ^ 'pingInterval' miliseconds of idle was experienced | 153 | -- ^ 'pingInterval' miliseconds of idle was experienced |
153 | 154 | ||
155 | {- | ||
154 | deriving instance Show b => Show (ConnectionEvent b) | 156 | deriving instance Show b => Show (ConnectionEvent b) |
155 | deriving instance Eq b => Eq (ConnectionEvent b) | 157 | deriving instance Eq b => Eq (ConnectionEvent b) |
158 | -} | ||
156 | 159 | ||
157 | -- | This object accepts commands and signals events and maintains | 160 | -- | This object accepts commands and signals events and maintains |
158 | -- the list of currently listening ports and established connections. | 161 | -- the list of currently listening ports and established connections. |
@@ -285,9 +288,10 @@ hWriteUntilNothing h outs = | |||
285 | Nothing -> do warn $ "wrote Nothing" | 288 | Nothing -> do warn $ "wrote Nothing" |
286 | hClose h | 289 | hClose h |
287 | 290 | ||
291 | -} | ||
288 | connRead :: ConnectionState -> IO (Maybe ByteString) | 292 | connRead :: ConnectionState -> IO (Maybe ByteString) |
289 | connRead (WriteOnlyConnection w) = do | 293 | connRead (WriteOnlyConnection w) = do |
290 | atomically $ discardContents (threadsChannel w) | 294 | -- atomically $ discardContents (threadsChannel w) |
291 | return Nothing | 295 | return Nothing |
292 | connRead conn = do | 296 | connRead conn = do |
293 | c <- atomically $ getThreads | 297 | c <- atomically $ getThreads |
@@ -297,9 +301,8 @@ connRead conn = do | |||
297 | case conn of SaneConnection c -> return c | 301 | case conn of SaneConnection c -> return c |
298 | ReadOnlyConnection c -> return c | 302 | ReadOnlyConnection c -> return c |
299 | ConnectionPair c w -> do | 303 | ConnectionPair c w -> do |
300 | discardContents (threadsChannel w) | 304 | -- discardContents (threadsChannel w) |
301 | return c | 305 | return c |
302 | -} | ||
303 | 306 | ||
304 | socketFamily (SockAddrInet _ _) = AF_INET | 307 | socketFamily (SockAddrInet _ _) = AF_INET |
305 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | 308 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 |
@@ -309,6 +312,12 @@ killListener (thread,sock) = do sClose sock | |||
309 | -- killThread thread | 312 | -- killThread thread |
310 | 313 | ||
311 | 314 | ||
315 | conevent con = Connection pingflag read write | ||
316 | where | ||
317 | pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False | ||
318 | read = connRead con | ||
319 | write = connWrite con | ||
320 | |||
312 | newConnection server params conkey h inout = do | 321 | newConnection server params conkey h inout = do |
313 | hSetBuffering h NoBuffering | 322 | hSetBuffering h NoBuffering |
314 | let (forward,idle_ms,timeout_ms) = | 323 | let (forward,idle_ms,timeout_ms) = |
@@ -335,7 +344,8 @@ newConnection server params conkey h inout = do | |||
335 | Nothing -> do | 344 | Nothing -> do |
336 | (newCon,e) <- return $ | 345 | (newCon,e) <- return $ |
337 | if duplex params | 346 | if duplex params |
338 | then ( SaneConnection new, (conkey, Connection) ) | 347 | then let newcon = SaneConnection new |
348 | in ( newcon, (conkey, conevent newcon) ) | ||
339 | else ( case inout of | 349 | else ( case inout of |
340 | In -> ReadOnlyConnection new | 350 | In -> ReadOnlyConnection new |
341 | Out -> WriteOnlyConnection new | 351 | Out -> WriteOnlyConnection new |
@@ -351,6 +361,8 @@ newConnection server params conkey h inout = do | |||
351 | kont <- updateConMap conkey new what | 361 | kont <- updateConMap conkey new what |
352 | putTMVar started () | 362 | putTMVar started () |
353 | return kont | 363 | return kont |
364 | {- | ||
365 | -- enable this for 'Got' events | ||
354 | forkIO $ do -- inout==In || duplex params then forkIO $ do | 366 | forkIO $ do -- inout==In || duplex params then forkIO $ do |
355 | -- warn $ "waiting read thread: " <> bshow (conkey,inout) | 367 | -- warn $ "waiting read thread: " <> bshow (conkey,inout) |
356 | atomically $ takeTMVar started | 368 | atomically $ takeTMVar started |
@@ -363,6 +375,7 @@ newConnection server params conkey h inout = do | |||
363 | maybe (return ()) | 375 | maybe (return ()) |
364 | (atomically . forward >=> const loop) | 376 | (atomically . forward >=> const loop) |
365 | mb | 377 | mb |
378 | -} | ||
366 | return () | 379 | return () |
367 | where | 380 | where |
368 | 381 | ||
@@ -389,7 +402,7 @@ newConnection server params conkey h inout = do | |||
389 | sendPing PingTimeOut = do atomically (connClose newCon) | 402 | sendPing PingTimeOut = do atomically (connClose newCon) |
390 | eof | 403 | eof |
391 | sendPing PingIdle = do | 404 | sendPing PingIdle = do |
392 | atomically . announce $ (conkey,RequiresPing) | 405 | atomically $ announce (conkey,RequiresPing) |
393 | handleEOF conkey mvar newCon | 406 | handleEOF conkey mvar newCon |
394 | 407 | ||
395 | 408 | ||
@@ -398,16 +411,19 @@ newConnection server params conkey h inout = do | |||
398 | if duplex params then do | 411 | if duplex params then do |
399 | announce (conkey,EOF) | 412 | announce (conkey,EOF) |
400 | connClose replaced | 413 | connClose replaced |
401 | announce $ (conkey,Connection) | 414 | let newcon = SaneConnection new |
402 | return $ SaneConnection new | 415 | announce $ (conkey,conevent newcon) |
416 | return $ newcon | ||
403 | else | 417 | else |
404 | case replaced of | 418 | case replaced of |
405 | WriteOnlyConnection w | inout==In -> | 419 | WriteOnlyConnection w | inout==In -> |
406 | do announce (conkey,Connection) | 420 | do let newcon = ConnectionPair new w |
407 | return $ ConnectionPair new w | 421 | announce (conkey,conevent newcon) |
422 | return newcon | ||
408 | ReadOnlyConnection r | inout==Out -> | 423 | ReadOnlyConnection r | inout==Out -> |
409 | do announce (conkey,Connection) | 424 | do let newcon = ConnectionPair r new |
410 | return $ ConnectionPair r new | 425 | announce (conkey,conevent newcon) |
426 | return newcon | ||
411 | _ -> do -- connFlush todo | 427 | _ -> do -- connFlush todo |
412 | announce (conkey, EOF) | 428 | announce (conkey, EOF) |
413 | connClose replaced | 429 | connClose replaced |
@@ -460,23 +476,16 @@ connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads | |||
460 | connectionThreads h pinglogic = do | 476 | connectionThreads h pinglogic = do |
461 | 477 | ||
462 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | 478 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar |
463 | writerThread <- forkIO . fix $ \loop -> do | ||
464 | let finished = do -- warn $ "finished write" | ||
465 | hClose h -- quit reader | ||
466 | atomically $ putTMVar donew () | ||
467 | mb <- atomically $ readTMVar outs | ||
468 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
469 | (do S.hPutStr h bs | ||
470 | atomically $ takeTMVar outs | ||
471 | loop) | ||
472 | Nothing -> finished | ||
473 | 479 | ||
474 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | 480 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan |
475 | readerThread <- forkIO $ do | 481 | readerThread <- forkIO $ do |
476 | let finished e = do | 482 | let finished e = do |
483 | hClose h | ||
477 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | 484 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) |
478 | let _ = fmap ioeGetErrorType e -- type hint | 485 | let _ = fmap ioeGetErrorType e -- type hint |
479 | atomically $ do putTMVar outs Nothing -- quit writer | 486 | -- let _ = fmap what e where what (SomeException _) = undefined |
487 | atomically $ do tryTakeTMVar outs | ||
488 | putTMVar outs Nothing -- quit writer | ||
480 | putTMVar doner () | 489 | putTMVar doner () |
481 | handle (finished . Just) $ do | 490 | handle (finished . Just) $ do |
482 | pingBump pinglogic -- start the ping timer | 491 | pingBump pinglogic -- start the ping timer |
@@ -487,6 +496,18 @@ connectionThreads h pinglogic = do | |||
487 | isEof <- liftIO $ hIsEOF h | 496 | isEof <- liftIO $ hIsEOF h |
488 | if isEof then finished Nothing else loop | 497 | if isEof then finished Nothing else loop |
489 | 498 | ||
499 | writerThread <- forkIO . fix $ \loop -> do | ||
500 | let finished = do -- warn $ "finished write" | ||
501 | hClose h -- quit reader | ||
502 | -- throwTo readerThread (ErrorCall "EOF") | ||
503 | atomically $ putTMVar donew () | ||
504 | mb <- atomically $ readTMVar outs | ||
505 | case mb of Just bs -> handle (\(SomeException e)->finished) | ||
506 | (do S.hPutStr h bs | ||
507 | atomically $ takeTMVar outs | ||
508 | loop) | ||
509 | Nothing -> finished | ||
510 | |||
490 | let wait = do readTMVar donew | 511 | let wait = do readTMVar donew |
491 | readTMVar doner | 512 | readTMVar doner |
492 | return () | 513 | return () |
@@ -564,7 +585,18 @@ connClose :: ConnectionState -> STM () | |||
564 | connClose c = mapConn True threadsClose c | 585 | connClose c = mapConn True threadsClose c |
565 | 586 | ||
566 | connWait :: ConnectionState -> STM () | 587 | connWait :: ConnectionState -> STM () |
567 | connWait c = mapConn False threadsWait c | 588 | connWait c = doit -- mapConn False threadsWait c |
589 | where | ||
590 | action = threadsWait | ||
591 | doit = | ||
592 | case c of | ||
593 | SaneConnection rw -> action rw | ||
594 | ReadOnlyConnection r -> action r | ||
595 | WriteOnlyConnection w -> action w | ||
596 | ConnectionPair r w -> do | ||
597 | rem <- orElse (const w `fmap` action r) | ||
598 | (const r `fmap` action w) | ||
599 | threadsClose rem | ||
568 | 600 | ||
569 | connPingTimer c = | 601 | connPingTimer c = |
570 | case c of | 602 | case c of |
@@ -599,34 +631,40 @@ data PingMachine = PingMachine | |||
599 | , pingTimeOut :: TimeOut | 631 | , pingTimeOut :: TimeOut |
600 | , pingDelay :: TMVar (Int,PingEvent) | 632 | , pingDelay :: TMVar (Int,PingEvent) |
601 | , pingEvent :: TMVar PingEvent | 633 | , pingEvent :: TMVar PingEvent |
602 | , pingStarted :: TVar Bool -- True when a threadDelay is running | 634 | , pingStarted :: TMVar Bool -- True when a threadDelay is running |
603 | , pingThread :: ThreadId | 635 | , pingThread :: ThreadId |
636 | , pingFlag :: TVar Bool | ||
604 | } | 637 | } |
605 | 638 | ||
606 | pingMachine :: PingInterval -> TimeOut -> IO PingMachine | 639 | pingMachine :: PingInterval -> TimeOut -> IO PingMachine |
607 | pingMachine idle timeout = do | 640 | pingMachine idle timeout = do |
608 | me <- do | 641 | me <- do |
609 | (delayVar,eventVar,startedVar) <- atomically $ do | 642 | (delayVar,eventVar,startedVar,flag) <- atomically $ do |
610 | d <- newEmptyTMVar | 643 | d <- newEmptyTMVar |
611 | e <- newEmptyTMVar | 644 | e <- newEmptyTMVar |
612 | s <- newTVar False | 645 | s <- newTMVar False |
613 | return (d,e,s) | 646 | f <- newTVar False |
647 | return (d,e,s,f) | ||
614 | return PingMachine { pingIdle = idle | 648 | return PingMachine { pingIdle = idle |
615 | , pingTimeOut = timeout | 649 | , pingTimeOut = timeout |
616 | , pingDelay = delayVar | 650 | , pingDelay = delayVar |
617 | , pingEvent = eventVar | 651 | , pingEvent = eventVar |
618 | , pingStarted = startedVar | 652 | , pingStarted = startedVar |
619 | , pingThread = undefined } | 653 | , pingThread = undefined |
654 | , pingFlag = flag } | ||
620 | thread <- forkIO . when (pingIdle me /=0) . fix $ | 655 | thread <- forkIO . when (pingIdle me /=0) . fix $ |
621 | \loop -> do | 656 | \loop -> do |
622 | (delay,event) <- atomically $ takeTMVar (pingDelay me) | 657 | (delay,event) <- atomically $ takeTMVar (pingDelay me) |
623 | when (delay /= 0) $ do | 658 | when (delay /= 0) $ do |
624 | handle (\(ErrorCall _)-> do | 659 | handle (\(ErrorCall _)-> do |
625 | atomically $ writeTVar (pingStarted me) False | 660 | atomically $ do takeTMVar (pingStarted me) |
661 | putTMVar (pingStarted me) False | ||
626 | loop) | 662 | loop) |
627 | (do atomically $ writeTVar (pingStarted me) True | 663 | (do atomically $ do takeTMVar (pingStarted me) |
664 | putTMVar (pingStarted me) True | ||
628 | threadDelay delay | 665 | threadDelay delay |
629 | atomically $ writeTVar (pingStarted me) False | 666 | atomically $ do takeTMVar (pingStarted me) |
667 | putTMVar (pingStarted me) False | ||
630 | atomically $ putTMVar (pingEvent me) event | 668 | atomically $ putTMVar (pingEvent me) event |
631 | case event of PingTimeOut -> return () | 669 | case event of PingTimeOut -> return () |
632 | PingIdle -> loop) | 670 | PingIdle -> loop) |
@@ -638,8 +676,9 @@ pingCancel me = do | |||
638 | b <- atomically $ do | 676 | b <- atomically $ do |
639 | tryTakeTMVar (pingDelay me) -- no hang | 677 | tryTakeTMVar (pingDelay me) -- no hang |
640 | putTMVar (pingDelay me) (0,PingTimeOut) | 678 | putTMVar (pingDelay me) (0,PingTimeOut) |
641 | readTVar (pingStarted me) | 679 | takeTMVar (pingStarted me) |
642 | when b $ throwTo (pingThread me) $ ErrorCall "" | 680 | when b $ throwTo (pingThread me) $ ErrorCall "" |
681 | atomically $ putTMVar (pingStarted me) b | ||
643 | 682 | ||
644 | pingBump :: PingMachine -> IO () | 683 | pingBump :: PingMachine -> IO () |
645 | pingBump me = do | 684 | pingBump me = do |
@@ -651,15 +690,17 @@ pingBump me = do | |||
651 | Just _ -> retry | 690 | Just _ -> retry |
652 | Nothing -> putTMVar (pingDelay me) | 691 | Nothing -> putTMVar (pingDelay me) |
653 | (1000*pingIdle me,PingIdle) | 692 | (1000*pingIdle me,PingIdle) |
654 | readTVar (pingStarted me) | 693 | takeTMVar (pingStarted me) |
655 | when b $ throwTo (pingThread me) $ ErrorCall "" | 694 | when b $ throwTo (pingThread me) $ ErrorCall "" |
695 | atomically $ putTMVar (pingStarted me) b | ||
656 | 696 | ||
657 | pingWait :: PingMachine -> STM PingEvent | 697 | pingWait :: PingMachine -> STM PingEvent |
658 | pingWait me = do | 698 | pingWait me = do |
659 | e <- takeTMVar (pingEvent me) | 699 | e <- takeTMVar (pingEvent me) |
660 | case e of | 700 | case e of |
661 | PingIdle -> putTMVar (pingDelay me) | 701 | PingIdle -> do writeTVar (pingFlag me) True |
662 | (1000*pingTimeOut me,PingTimeOut) | 702 | putTMVar (pingDelay me) |
703 | (1000*pingTimeOut me,PingTimeOut) | ||
663 | PingTimeOut -> putTMVar (pingDelay me) | 704 | PingTimeOut -> putTMVar (pingDelay me) |
664 | (0,PingTimeOut) | 705 | (0,PingTimeOut) |
665 | return e | 706 | return e |