summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-10 02:52:23 -0500
committerjoe <joe@jerkface.net>2014-02-10 02:52:23 -0500
commitafff51ac877ce8807801334745f1679dbf6440d0 (patch)
treec52a620f7584c712b27ba8a631a7b38744e0ffdd
parent0e3a39ff7d4c6a2ba5190ddfcc38b4bb8d22c367 (diff)
Fixed ping bug, forked connection thread in xmppServer
-rw-r--r--Presence/Server.hs113
-rw-r--r--xmppServer.hs39
2 files changed, 113 insertions, 39 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 530b6669..4ee55821 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -48,6 +48,7 @@ import System.IO
48 , hClose 48 , hClose
49 , hIsEOF 49 , hIsEOF
50 , stderr 50 , stderr
51 , stdout
51 , Handle 52 , Handle
52 , hFlush 53 , hFlush
53 ) 54 )
@@ -142,7 +143,7 @@ data InOrOut = In | Out
142data ConnectionEvent b 143data ConnectionEvent b
143 = Got b 144 = Got b
144 -- ^ Arrival of data from a socket 145 -- ^ Arrival of data from a socket
145 | Connection 146 | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool)
146 -- ^ A new connection was established 147 -- ^ A new connection was established
147 | HalfConnection InOrOut 148 | HalfConnection InOrOut
148 -- ^ Half of a half-duplex connection is avaliable. 149 -- ^ Half of a half-duplex connection is avaliable.
@@ -151,8 +152,10 @@ data ConnectionEvent b
151 | RequiresPing 152 | RequiresPing
152 -- ^ 'pingInterval' miliseconds of idle was experienced 153 -- ^ 'pingInterval' miliseconds of idle was experienced
153 154
155{-
154deriving instance Show b => Show (ConnectionEvent b) 156deriving instance Show b => Show (ConnectionEvent b)
155deriving instance Eq b => Eq (ConnectionEvent b) 157deriving instance Eq b => Eq (ConnectionEvent b)
158-}
156 159
157-- | This object accepts commands and signals events and maintains 160-- | This object accepts commands and signals events and maintains
158-- the list of currently listening ports and established connections. 161-- the list of currently listening ports and established connections.
@@ -285,9 +288,10 @@ hWriteUntilNothing h outs =
285 Nothing -> do warn $ "wrote Nothing" 288 Nothing -> do warn $ "wrote Nothing"
286 hClose h 289 hClose h
287 290
291-}
288connRead :: ConnectionState -> IO (Maybe ByteString) 292connRead :: ConnectionState -> IO (Maybe ByteString)
289connRead (WriteOnlyConnection w) = do 293connRead (WriteOnlyConnection w) = do
290 atomically $ discardContents (threadsChannel w) 294 -- atomically $ discardContents (threadsChannel w)
291 return Nothing 295 return Nothing
292connRead conn = do 296connRead conn = do
293 c <- atomically $ getThreads 297 c <- atomically $ getThreads
@@ -297,9 +301,8 @@ connRead conn = do
297 case conn of SaneConnection c -> return c 301 case conn of SaneConnection c -> return c
298 ReadOnlyConnection c -> return c 302 ReadOnlyConnection c -> return c
299 ConnectionPair c w -> do 303 ConnectionPair c w -> do
300 discardContents (threadsChannel w) 304 -- discardContents (threadsChannel w)
301 return c 305 return c
302-}
303 306
304socketFamily (SockAddrInet _ _) = AF_INET 307socketFamily (SockAddrInet _ _) = AF_INET
305socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 308socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
@@ -309,6 +312,12 @@ killListener (thread,sock) = do sClose sock
309 -- killThread thread 312 -- killThread thread
310 313
311 314
315conevent con = Connection pingflag read write
316 where
317 pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False
318 read = connRead con
319 write = connWrite con
320
312newConnection server params conkey h inout = do 321newConnection server params conkey h inout = do
313 hSetBuffering h NoBuffering 322 hSetBuffering h NoBuffering
314 let (forward,idle_ms,timeout_ms) = 323 let (forward,idle_ms,timeout_ms) =
@@ -335,7 +344,8 @@ newConnection server params conkey h inout = do
335 Nothing -> do 344 Nothing -> do
336 (newCon,e) <- return $ 345 (newCon,e) <- return $
337 if duplex params 346 if duplex params
338 then ( SaneConnection new, (conkey, Connection) ) 347 then let newcon = SaneConnection new
348 in ( newcon, (conkey, conevent newcon) )
339 else ( case inout of 349 else ( case inout of
340 In -> ReadOnlyConnection new 350 In -> ReadOnlyConnection new
341 Out -> WriteOnlyConnection new 351 Out -> WriteOnlyConnection new
@@ -351,6 +361,8 @@ newConnection server params conkey h inout = do
351 kont <- updateConMap conkey new what 361 kont <- updateConMap conkey new what
352 putTMVar started () 362 putTMVar started ()
353 return kont 363 return kont
364 {-
365 -- enable this for 'Got' events
354 forkIO $ do -- inout==In || duplex params then forkIO $ do 366 forkIO $ do -- inout==In || duplex params then forkIO $ do
355 -- warn $ "waiting read thread: " <> bshow (conkey,inout) 367 -- warn $ "waiting read thread: " <> bshow (conkey,inout)
356 atomically $ takeTMVar started 368 atomically $ takeTMVar started
@@ -363,6 +375,7 @@ newConnection server params conkey h inout = do
363 maybe (return ()) 375 maybe (return ())
364 (atomically . forward >=> const loop) 376 (atomically . forward >=> const loop)
365 mb 377 mb
378 -}
366 return () 379 return ()
367 where 380 where
368 381
@@ -389,7 +402,7 @@ newConnection server params conkey h inout = do
389 sendPing PingTimeOut = do atomically (connClose newCon) 402 sendPing PingTimeOut = do atomically (connClose newCon)
390 eof 403 eof
391 sendPing PingIdle = do 404 sendPing PingIdle = do
392 atomically . announce $ (conkey,RequiresPing) 405 atomically $ announce (conkey,RequiresPing)
393 handleEOF conkey mvar newCon 406 handleEOF conkey mvar newCon
394 407
395 408
@@ -398,16 +411,19 @@ newConnection server params conkey h inout = do
398 if duplex params then do 411 if duplex params then do
399 announce (conkey,EOF) 412 announce (conkey,EOF)
400 connClose replaced 413 connClose replaced
401 announce $ (conkey,Connection) 414 let newcon = SaneConnection new
402 return $ SaneConnection new 415 announce $ (conkey,conevent newcon)
416 return $ newcon
403 else 417 else
404 case replaced of 418 case replaced of
405 WriteOnlyConnection w | inout==In -> 419 WriteOnlyConnection w | inout==In ->
406 do announce (conkey,Connection) 420 do let newcon = ConnectionPair new w
407 return $ ConnectionPair new w 421 announce (conkey,conevent newcon)
422 return newcon
408 ReadOnlyConnection r | inout==Out -> 423 ReadOnlyConnection r | inout==Out ->
409 do announce (conkey,Connection) 424 do let newcon = ConnectionPair r new
410 return $ ConnectionPair r new 425 announce (conkey,conevent newcon)
426 return newcon
411 _ -> do -- connFlush todo 427 _ -> do -- connFlush todo
412 announce (conkey, EOF) 428 announce (conkey, EOF)
413 connClose replaced 429 connClose replaced
@@ -460,23 +476,16 @@ connectionThreads :: Handle -> PingMachine -> IO ConnectionThreads
460connectionThreads h pinglogic = do 476connectionThreads h pinglogic = do
461 477
462 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar 478 (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar
463 writerThread <- forkIO . fix $ \loop -> do
464 let finished = do -- warn $ "finished write"
465 hClose h -- quit reader
466 atomically $ putTMVar donew ()
467 mb <- atomically $ readTMVar outs
468 case mb of Just bs -> handle (\(SomeException e)->finished)
469 (do S.hPutStr h bs
470 atomically $ takeTMVar outs
471 loop)
472 Nothing -> finished
473 479
474 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan 480 (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan
475 readerThread <- forkIO $ do 481 readerThread <- forkIO $ do
476 let finished e = do 482 let finished e = do
483 hClose h
477 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) 484 -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e)
478 let _ = fmap ioeGetErrorType e -- type hint 485 let _ = fmap ioeGetErrorType e -- type hint
479 atomically $ do putTMVar outs Nothing -- quit writer 486 -- let _ = fmap what e where what (SomeException _) = undefined
487 atomically $ do tryTakeTMVar outs
488 putTMVar outs Nothing -- quit writer
480 putTMVar doner () 489 putTMVar doner ()
481 handle (finished . Just) $ do 490 handle (finished . Just) $ do
482 pingBump pinglogic -- start the ping timer 491 pingBump pinglogic -- start the ping timer
@@ -487,6 +496,18 @@ connectionThreads h pinglogic = do
487 isEof <- liftIO $ hIsEOF h 496 isEof <- liftIO $ hIsEOF h
488 if isEof then finished Nothing else loop 497 if isEof then finished Nothing else loop
489 498
499 writerThread <- forkIO . fix $ \loop -> do
500 let finished = do -- warn $ "finished write"
501 hClose h -- quit reader
502 -- throwTo readerThread (ErrorCall "EOF")
503 atomically $ putTMVar donew ()
504 mb <- atomically $ readTMVar outs
505 case mb of Just bs -> handle (\(SomeException e)->finished)
506 (do S.hPutStr h bs
507 atomically $ takeTMVar outs
508 loop)
509 Nothing -> finished
510
490 let wait = do readTMVar donew 511 let wait = do readTMVar donew
491 readTMVar doner 512 readTMVar doner
492 return () 513 return ()
@@ -564,7 +585,18 @@ connClose :: ConnectionState -> STM ()
564connClose c = mapConn True threadsClose c 585connClose c = mapConn True threadsClose c
565 586
566connWait :: ConnectionState -> STM () 587connWait :: ConnectionState -> STM ()
567connWait c = mapConn False threadsWait c 588connWait c = doit -- mapConn False threadsWait c
589 where
590 action = threadsWait
591 doit =
592 case c of
593 SaneConnection rw -> action rw
594 ReadOnlyConnection r -> action r
595 WriteOnlyConnection w -> action w
596 ConnectionPair r w -> do
597 rem <- orElse (const w `fmap` action r)
598 (const r `fmap` action w)
599 threadsClose rem
568 600
569connPingTimer c = 601connPingTimer c =
570 case c of 602 case c of
@@ -599,34 +631,40 @@ data PingMachine = PingMachine
599 , pingTimeOut :: TimeOut 631 , pingTimeOut :: TimeOut
600 , pingDelay :: TMVar (Int,PingEvent) 632 , pingDelay :: TMVar (Int,PingEvent)
601 , pingEvent :: TMVar PingEvent 633 , pingEvent :: TMVar PingEvent
602 , pingStarted :: TVar Bool -- True when a threadDelay is running 634 , pingStarted :: TMVar Bool -- True when a threadDelay is running
603 , pingThread :: ThreadId 635 , pingThread :: ThreadId
636 , pingFlag :: TVar Bool
604 } 637 }
605 638
606pingMachine :: PingInterval -> TimeOut -> IO PingMachine 639pingMachine :: PingInterval -> TimeOut -> IO PingMachine
607pingMachine idle timeout = do 640pingMachine idle timeout = do
608 me <- do 641 me <- do
609 (delayVar,eventVar,startedVar) <- atomically $ do 642 (delayVar,eventVar,startedVar,flag) <- atomically $ do
610 d <- newEmptyTMVar 643 d <- newEmptyTMVar
611 e <- newEmptyTMVar 644 e <- newEmptyTMVar
612 s <- newTVar False 645 s <- newTMVar False
613 return (d,e,s) 646 f <- newTVar False
647 return (d,e,s,f)
614 return PingMachine { pingIdle = idle 648 return PingMachine { pingIdle = idle
615 , pingTimeOut = timeout 649 , pingTimeOut = timeout
616 , pingDelay = delayVar 650 , pingDelay = delayVar
617 , pingEvent = eventVar 651 , pingEvent = eventVar
618 , pingStarted = startedVar 652 , pingStarted = startedVar
619 , pingThread = undefined } 653 , pingThread = undefined
654 , pingFlag = flag }
620 thread <- forkIO . when (pingIdle me /=0) . fix $ 655 thread <- forkIO . when (pingIdle me /=0) . fix $
621 \loop -> do 656 \loop -> do
622 (delay,event) <- atomically $ takeTMVar (pingDelay me) 657 (delay,event) <- atomically $ takeTMVar (pingDelay me)
623 when (delay /= 0) $ do 658 when (delay /= 0) $ do
624 handle (\(ErrorCall _)-> do 659 handle (\(ErrorCall _)-> do
625 atomically $ writeTVar (pingStarted me) False 660 atomically $ do takeTMVar (pingStarted me)
661 putTMVar (pingStarted me) False
626 loop) 662 loop)
627 (do atomically $ writeTVar (pingStarted me) True 663 (do atomically $ do takeTMVar (pingStarted me)
664 putTMVar (pingStarted me) True
628 threadDelay delay 665 threadDelay delay
629 atomically $ writeTVar (pingStarted me) False 666 atomically $ do takeTMVar (pingStarted me)
667 putTMVar (pingStarted me) False
630 atomically $ putTMVar (pingEvent me) event 668 atomically $ putTMVar (pingEvent me) event
631 case event of PingTimeOut -> return () 669 case event of PingTimeOut -> return ()
632 PingIdle -> loop) 670 PingIdle -> loop)
@@ -638,8 +676,9 @@ pingCancel me = do
638 b <- atomically $ do 676 b <- atomically $ do
639 tryTakeTMVar (pingDelay me) -- no hang 677 tryTakeTMVar (pingDelay me) -- no hang
640 putTMVar (pingDelay me) (0,PingTimeOut) 678 putTMVar (pingDelay me) (0,PingTimeOut)
641 readTVar (pingStarted me) 679 takeTMVar (pingStarted me)
642 when b $ throwTo (pingThread me) $ ErrorCall "" 680 when b $ throwTo (pingThread me) $ ErrorCall ""
681 atomically $ putTMVar (pingStarted me) b
643 682
644pingBump :: PingMachine -> IO () 683pingBump :: PingMachine -> IO ()
645pingBump me = do 684pingBump me = do
@@ -651,15 +690,17 @@ pingBump me = do
651 Just _ -> retry 690 Just _ -> retry
652 Nothing -> putTMVar (pingDelay me) 691 Nothing -> putTMVar (pingDelay me)
653 (1000*pingIdle me,PingIdle) 692 (1000*pingIdle me,PingIdle)
654 readTVar (pingStarted me) 693 takeTMVar (pingStarted me)
655 when b $ throwTo (pingThread me) $ ErrorCall "" 694 when b $ throwTo (pingThread me) $ ErrorCall ""
695 atomically $ putTMVar (pingStarted me) b
656 696
657pingWait :: PingMachine -> STM PingEvent 697pingWait :: PingMachine -> STM PingEvent
658pingWait me = do 698pingWait me = do
659 e <- takeTMVar (pingEvent me) 699 e <- takeTMVar (pingEvent me)
660 case e of 700 case e of
661 PingIdle -> putTMVar (pingDelay me) 701 PingIdle -> do writeTVar (pingFlag me) True
662 (1000*pingTimeOut me,PingTimeOut) 702 putTMVar (pingDelay me)
703 (1000*pingTimeOut me,PingTimeOut)
663 PingTimeOut -> putTMVar (pingDelay me) 704 PingTimeOut -> putTMVar (pingDelay me)
664 (0,PingTimeOut) 705 (0,PingTimeOut)
665 return e 706 return e
diff --git a/xmppServer.hs b/xmppServer.hs
index 50818dd6..c720ea5f 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -7,6 +7,12 @@ import Control.Concurrent.STM
7import Network.Socket 7import Network.Socket
8import XMPPTypes (withPort) 8import XMPPTypes (withPort)
9import Text.Printf 9import Text.Printf
10import System.Posix.Signals
11
12import Data.Conduit
13
14import qualified Text.XML.Stream.Render as XML
15import qualified Text.XML.Stream.Parse as XML
10 16
11import Server 17import Server
12 18
@@ -14,12 +20,34 @@ wlog s = putStrLn s
14 20
15control sv = atomically . putTMVar (serverCommand sv) 21control sv = atomically . putTMVar (serverCommand sv)
16 22
23xmlStream conread conwrite = (xsrc,xsnk)
24 where
25 xsrc = src $= XML.parseBytes XML.def
26 xsnk = XML.renderBytes XML.def =$ snk
27
28 src = do
29 v <- lift conread
30 maybe (return ()) -- lift . wlog $ "conread: Nothing")
31 (\v -> yield v >> src)
32 v
33 snk = awaitForever $ lift . conwrite
34
35
36forkConnection k pingflag src snk = do
37 forkIO $ do
38 src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
39 wlog $ "end fork: " ++ show k
40 return ()
41
17monitor sv params = do 42monitor sv params = do
18 chan <- return $ serverEvent sv 43 chan <- return $ serverEvent sv
19 fix $ \loop -> do 44 fix $ \loop -> do
20 (k,e) <- atomically $ readTChan chan 45 (k,e) <- atomically $ readTChan chan
21 case e of 46 case e of
22 Connection -> wlog $ tomsg k "Connection" 47 Connection pingflag conread conwrite -> do
48 let (xsrc,xsnk) = xmlStream conread conwrite
49 forkConnection k pingflag xsrc xsnk
50 wlog $ tomsg k "Connection"
23 EOF -> wlog $ tomsg k "EOF" 51 EOF -> wlog $ tomsg k "EOF"
24 HalfConnection In -> do 52 HalfConnection In -> do
25 wlog $ tomsg k "ReadOnly" 53 wlog $ tomsg k "ReadOnly"
@@ -49,12 +77,17 @@ main = runResourceT $ do
49 sv <- server 77 sv <- server
50 lift $ do 78 lift $ do
51 peer_params <- return (connectionDefaults peerKey) 79 peer_params <- return (connectionDefaults peerKey)
52 { duplex = False } 80 { pingInterval = 2000, duplex = False }
53 client_params <- return $ connectionDefaults clientKey 81 client_params <- return $ connectionDefaults clientKey
54 forkIO $ monitor sv peer_params 82 forkIO $ monitor sv peer_params
55 control sv (Listen peerport peer_params) 83 control sv (Listen peerport peer_params)
56 control sv (Listen clientport client_params) 84 control sv (Listen clientport client_params)
57 85
58 atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c 86 -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c
87 quitVar <- newEmptyTMVarIO
88 installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing
89 installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing
90 quitMessage <- atomically $ takeTMVar quitVar
59 91
92 wlog "goodbye."
60 return () 93 return ()