summaryrefslogtreecommitdiff
path: root/Presence/Server.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-10 02:52:23 -0500
committerjoe <joe@jerkface.net>2014-02-10 02:52:23 -0500
commitafff51ac877ce8807801334745f1679dbf6440d0 (patch)
treec52a620f7584c712b27ba8a631a7b38744e0ffdd /Presence/Server.hs
parent0e3a39ff7d4c6a2ba5190ddfcc38b4bb8d22c367 (diff)
Fixed ping bug, forked connection thread in xmppServer
Diffstat (limited to 'Presence/Server.hs')
-rw-r--r--Presence/Server.hs113
1 files changed, 77 insertions, 36 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 530b6669..4ee55821 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -48,6 +48,7 @@ import System.IO
48 , hClose 48 , hClose
49 , hIsEOF 49 , hIsEOF
50 , stderr 50 , stderr
51 , stdout
51 , Handle 52 , Handle
52 , hFlush 53 , hFlush
53 ) 54 )
@@ -142,7 +143,7 @@ data InOrOut = In | Out
142data ConnectionEvent b 143data ConnectionEvent b
143 = Got b 144 = Got b
144 -- ^ Arrival of data from a socket 145 -- ^ Arrival of data from a socket
145 | Connection 146 | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool)
146 -- ^ A new connection was established 147 -- ^ A new connection was established
147 | HalfConnection InOrOut 148 | HalfConnection InOrOut
148 -- ^ Half of a half-duplex connection is avaliable. 149 -- ^ Half of a half-duplex connection is avaliable.
@@ -151,8 +152,10 @@ data ConnectionEvent b
151 | RequiresPing 152 | RequiresPing
152 -- ^ 'pingInterval' miliseconds of idle was experienced 153 -- ^ 'pingInterval' miliseconds of idle was experienced
153 154
155{-
154deriving instance Show b => Show (ConnectionEvent b) 156deriving instance Show b => Show (ConnectionEvent b)
155deriving instance Eq b => Eq (ConnectionEvent b) 157deriving instance Eq b => Eq (ConnectionEvent b)
158-}
156 159
157-- | This object accepts commands and signals events and maintains 160-- | This object accepts commands and signals events and maintains
158-- the list of currently listening ports and established connections. 161-- the list of currently listening ports and established connections.
@@ -285,9 +288,10 @@ hWriteUntilNothing h outs =
285 Nothing -> do warn $ "wrote Nothing" 288 Nothing -> do warn $ "wrote Nothing"
286 hClose h 289 hClose h
287 290
291-}
288connRead :: ConnectionState -> IO (Maybe ByteString) 292connRead :: ConnectionState -> IO (Maybe ByteString)
289connRead (WriteOnlyConnection w) = do 293connRead (WriteOnlyConnection w) = do
290 atomically $ discardContents (threadsChannel w) 294 -- atomically $ discardContents (threadsChannel w)
291 return Nothing 295 return Nothing
292connRead conn = do 296connRead conn = do
293 c <- atomically $ getThreads 297 c <- atomically $ getThreads
@@ -297,9 +301,8 @@ connRead conn = do
297 case conn of SaneConnection c -> return c 301 case conn of SaneConnection c -> return c
298 ReadOnlyConnection c -> return c 302 ReadOnlyConnection c -> return c
299 ConnectionPair c w -> do 303 ConnectionPair c w -> do
300 discardContents (threadsChannel w) 304 -- discardContents (threadsChannel w)
301 return c 305 return c
302-}
303 306
304socketFamily (SockAddrInet _ _) = AF_INET 307socketFamily (SockAddrInet _ _) = AF_INET
305socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 308socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
@@ -309,6 +312,12 @@ killListener (thread,sock) = do sClose sock
309 -- killThread thread 312 -- killThread thread
310 313
311 314
315conevent con = Connection pingflag read write
316 where
317 pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False
318 read = connRead con
319 write = connWrite con
320
312newConnection server params conkey h inout = do 321newConnection server params conkey h inout = do
313 hSetBuffering h NoBuffering 322 hSetBuffering h NoBuffering
314 let (forward,idle_ms,timeout_ms) = 323 let (forward,idle_ms,timeout_ms) =
@@ -335,7 +344,8 @@ newConnection server params conkey h inout = do
335 Nothing -> do 344 Nothing -> do
336 (newCon,e) <- return $ 345 (newCon,e) <- return $
337 if duplex params 346 if duplex params
338 then ( SaneConnection new, (conkey, Connection) ) 347 then let newcon = SaneConnection new
348 in ( newcon, (conkey, conevent newcon) )
339 else ( case inout of 349 else ( case inout of
340 In -> ReadOnlyConnection new 350 In -> ReadOnlyConnection new
341 Out -> WriteOnlyConnection new 351 Out -> WriteOnlyConnection new
@@ -351,6 +361,8 @@ newConnection server params conkey h inout = do
351 kont <- updateConMap conkey new what 361 kont <- updateConMap conkey new what
352 putTMVar started () 362 putTMVar started ()
353 return kont 363 return kont
364 {-
365 -- enable this for 'Got' events
354 forkIO $ do -- inout==In || duplex params then forkIO $ do 366 forkIO $ do -- inout==In || duplex params then forkIO $ do
355 -- warn $ "waiting read thread: " <> bshow (conkey,inout) 367 -- warn $ "waiting read thread: " <> bshow (conkey,inout)
356 atomically $ takeTMVar started 368 atomically $ takeTMVar started
@@ -363,6 +375,7 @@ newConnection server params conkey h inout = do
363 maybe (return ()) 375 maybe (return ())
364 (atomically . forward >=> const loop) 376 (atomically . forward >=> const loop)
365 mb 377 mb
378 -}
366 return () 379 return ()
367 where 380 where
368 381
@@ -389,7 +402,7 @@ newConnection server params conkey h inout = do
389 sendPing PingTimeOut = do atomically (connClose newCon) 402 sendPing PingTimeOut = do atomically (connClose newCon)
390 eof 403 eof
391 sendPing PingIdle = do 404 sendPing PingIdle = do
392 atomically . announce $ (conkey,RequiresPing) 405 atomically $ announce (conkey,RequiresPing)
393 handleEOF conkey mvar newCon 406 handleEOF conkey mvar newCon
394 407
395 408
@@ -398,16 +411,19 @@ newConnection server params conkey h inout = do
398 if duplex params then do 411 if duplex params then do
399 announce (conkey,EOF) 412 announce (conkey,EOF)
400 connClose replaced 413 connClose replaced
401 announce $ (conkey,Connection) 414 let newcon = SaneConnection new
402 return $ SaneConnection new 415 announce $ (conkey,conevent newcon)
416 return $ newcon
403 else 417 else
404 case replaced of 418 case replaced of
405 WriteOnlyConnection w | inout==In -> 419 WriteOnlyConnection w | inout==In ->
406 do announce (conkey,Connection) 420 do let newcon = ConnectionPair new w
407 return $ ConnectionPair new w 421 announce (conkey,conevent newcon)
422 return newcon
408 ReadOnlyConnection r | inout==Out -> 423 ReadOnlyConnection r | inout==Out ->
409 do announce (conkey,Connection) 424 do let newcon = ConnectionPair r new
410 return $ ConnectionPair r new 425 announce (conkey,conevent newcon)
426 return newcon
411 _ -> do -- connFlush todo 427 _ -> do -- connFlush todo
412 announce (conkey, EOF) 428 announce (conkey, EOF)
413 connClose replaced 429 connClose replaced
@@ -460,23 +476,16 @@ connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
460connectionThreads h pinglogic = do 476connectionThreads h pinglogic = do
461 477
462 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar 478 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
463 writerThread <- forkIO . fix $ \loop -> do
464 let finished = do -- warn $ "finished write"
465 hClose h -- quit reader
466 atomically $ putTMVar donew ()
467 mb <- atomically $ readTMVar outs
468 case mb of Just bs -> handle (\(SomeException e)->finished)
469 (do S.hPutStr h bs
470 atomically $ takeTMVar outs
471 loop)
472 Nothing -> finished
473 479
474 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan 480 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
475 readerThread <- forkIO $ do 481 readerThread <- forkIO $ do
476 let finished e = do 482 let finished e = do
483 hClose h
477 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) 484 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
478 let _ = fmap ioeGetErrorType e -- type hint 485 let _ = fmap ioeGetErrorType e -- type hint
479 atomically $ do putTMVar outs Nothing -- quit writer 486 -- let _ = fmap what e where what (SomeException _) = undefined
487 atomically $ do tryTakeTMVar outs
488 putTMVar outs Nothing -- quit writer
480 putTMVar doner () 489 putTMVar doner ()
481 handle (finished . Just) $ do 490 handle (finished . Just) $ do
482 pingBump pinglogic -- start the ping timer 491 pingBump pinglogic -- start the ping timer
@@ -487,6 +496,18 @@ connectionThreads h pinglogic = do
487 isEof <- liftIO $ hIsEOF h 496 isEof <- liftIO $ hIsEOF h
488 if isEof then finished Nothing else loop 497 if isEof then finished Nothing else loop
489 498
499 writerThread <- forkIO . fix $ \loop -> do
500 let finished = do -- warn $ "finished write"
501 hClose h -- quit reader
502 -- throwTo readerThread (ErrorCall "EOF")
503 atomically $ putTMVar donew ()
504 mb <- atomically $ readTMVar outs
505 case mb of Just bs -> handle (\(SomeException e)->finished)
506 (do S.hPutStr h bs
507 atomically $ takeTMVar outs
508 loop)
509 Nothing -> finished
510
490 let wait = do readTMVar donew 511 let wait = do readTMVar donew
491 readTMVar doner 512 readTMVar doner
492 return () 513 return ()
@@ -564,7 +585,18 @@ connClose :: ConnectionState -> STM ()
564connClose c = mapConn True threadsClose c 585connClose c = mapConn True threadsClose c
565 586
566connWait :: ConnectionState -> STM () 587connWait :: ConnectionState -> STM ()
567connWait c = mapConn False threadsWait c 588connWait c = doit -- mapConn False threadsWait c
589 where
590 action = threadsWait
591 doit =
592 case c of
593 SaneConnection rw -> action rw
594 ReadOnlyConnection r -> action r
595 WriteOnlyConnection w -> action w
596 ConnectionPair r w -> do
597 rem <- orElse (const w `fmap` action r)
598 (const r `fmap` action w)
599 threadsClose rem
568 600
569connPingTimer c = 601connPingTimer c =
570 case c of 602 case c of
@@ -599,34 +631,40 @@ data PingMachine = PingMachine
599 , pingTimeOut :: TimeOut 631 , pingTimeOut :: TimeOut
600 , pingDelay :: TMVar (Int,PingEvent) 632 , pingDelay :: TMVar (Int,PingEvent)
601 , pingEvent :: TMVar PingEvent 633 , pingEvent :: TMVar PingEvent
602 , pingStarted :: TVar Bool -- True when a threadDelay is running 634 , pingStarted :: TMVar Bool -- True when a threadDelay is running
603 , pingThread :: ThreadId 635 , pingThread :: ThreadId
636 , pingFlag :: TVar Bool
604 } 637 }
605 638
606pingMachine :: PingInterval -> TimeOut -> IO PingMachine 639pingMachine :: PingInterval -> TimeOut -> IO PingMachine
607pingMachine idle timeout = do 640pingMachine idle timeout = do
608 me <- do 641 me <- do
609 (delayVar,eventVar,startedVar) <- atomically $ do 642 (delayVar,eventVar,startedVar,flag) <- atomically $ do
610 d <- newEmptyTMVar 643 d <- newEmptyTMVar
611 e <- newEmptyTMVar 644 e <- newEmptyTMVar
612 s <- newTVar False 645 s <- newTMVar False
613 return (d,e,s) 646 f <- newTVar False
647 return (d,e,s,f)
614 return PingMachine { pingIdle = idle 648 return PingMachine { pingIdle = idle
615 , pingTimeOut = timeout 649 , pingTimeOut = timeout
616 , pingDelay = delayVar 650 , pingDelay = delayVar
617 , pingEvent = eventVar 651 , pingEvent = eventVar
618 , pingStarted = startedVar 652 , pingStarted = startedVar
619 , pingThread = undefined } 653 , pingThread = undefined
654 , pingFlag = flag }
620 thread <- forkIO . when (pingIdle me /=0) . fix $ 655 thread <- forkIO . when (pingIdle me /=0) . fix $
621 \loop -> do 656 \loop -> do
622 (delay,event) <- atomically $ takeTMVar (pingDelay me) 657 (delay,event) <- atomically $ takeTMVar (pingDelay me)
623 when (delay /= 0) $ do 658 when (delay /= 0) $ do
624 handle (\(ErrorCall _)-> do 659 handle (\(ErrorCall _)-> do
625 atomically $ writeTVar (pingStarted me) False 660 atomically $ do takeTMVar (pingStarted me)
661 putTMVar (pingStarted me) False
626 loop) 662 loop)
627 (do atomically $ writeTVar (pingStarted me) True 663 (do atomically $ do takeTMVar (pingStarted me)
664 putTMVar (pingStarted me) True
628 threadDelay delay 665 threadDelay delay
629 atomically $ writeTVar (pingStarted me) False 666 atomically $ do takeTMVar (pingStarted me)
667 putTMVar (pingStarted me) False
630 atomically $ putTMVar (pingEvent me) event 668 atomically $ putTMVar (pingEvent me) event
631 case event of PingTimeOut -> return () 669 case event of PingTimeOut -> return ()
632 PingIdle -> loop) 670 PingIdle -> loop)
@@ -638,8 +676,9 @@ pingCancel me = do
638 b <- atomically $ do 676 b <- atomically $ do
639 tryTakeTMVar (pingDelay me) -- no hang 677 tryTakeTMVar (pingDelay me) -- no hang
640 putTMVar (pingDelay me) (0,PingTimeOut) 678 putTMVar (pingDelay me) (0,PingTimeOut)
641 readTVar (pingStarted me) 679 takeTMVar (pingStarted me)
642 when b $ throwTo (pingThread me) $ ErrorCall "" 680 when b $ throwTo (pingThread me) $ ErrorCall ""
681 atomically $ putTMVar (pingStarted me) b
643 682
644pingBump :: PingMachine -> IO () 683pingBump :: PingMachine -> IO ()
645pingBump me = do 684pingBump me = do
@@ -651,15 +690,17 @@ pingBump me = do
651 Just _ -> retry 690 Just _ -> retry
652 Nothing -> putTMVar (pingDelay me) 691 Nothing -> putTMVar (pingDelay me)
653 (1000*pingIdle me,PingIdle) 692 (1000*pingIdle me,PingIdle)
654 readTVar (pingStarted me) 693 takeTMVar (pingStarted me)
655 when b $ throwTo (pingThread me) $ ErrorCall "" 694 when b $ throwTo (pingThread me) $ ErrorCall ""
695 atomically $ putTMVar (pingStarted me) b
656 696
657pingWait :: PingMachine -> STM PingEvent 697pingWait :: PingMachine -> STM PingEvent
658pingWait me = do 698pingWait me = do
659 e <- takeTMVar (pingEvent me) 699 e <- takeTMVar (pingEvent me)
660 case e of 700 case e of
661 PingIdle -> putTMVar (pingDelay me) 701 PingIdle -> do writeTVar (pingFlag me) True
662 (1000*pingTimeOut me,PingTimeOut) 702 putTMVar (pingDelay me)
703 (1000*pingTimeOut me,PingTimeOut)
663 PingTimeOut -> putTMVar (pingDelay me) 704 PingTimeOut -> putTMVar (pingDelay me)
664 (0,PingTimeOut) 705 (0,PingTimeOut)
665 return e 706 return e