summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Presence/XMPP.hs170
1 files changed, 111 insertions, 59 deletions
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs
index fd29473e..3f81f000 100644
--- a/Presence/XMPP.hs
+++ b/Presence/XMPP.hs
@@ -42,8 +42,6 @@ import System.IO
42 , IOMode(..) 42 , IOMode(..)
43 , hSetBuffering 43 , hSetBuffering
44 ) 44 )
45import Control.Exception
46 ( bracketOnError )
47import Control.Concurrent.STM 45import Control.Concurrent.STM
48import Data.Conduit 46import Data.Conduit
49import qualified Data.Conduit.List as CL 47import qualified Data.Conduit.List as CL
@@ -58,7 +56,12 @@ import qualified Data.ByteString.Lazy.Char8 as L
58 ) 56 )
59import Control.Concurrent (forkIO,killThread) 57import Control.Concurrent (forkIO,killThread)
60import Control.Concurrent.Async 58import Control.Concurrent.Async
61import Control.Exception (handle,SomeException(..),finally) 59import Control.Exception
60 ( handle
61 -- , SomeException(..)
62 , finally
63 , bracketOnError )
64import GHC.IO.Exception (IOException(..))
62import Control.Monad.IO.Class 65import Control.Monad.IO.Class
63import Control.Monad.Trans.Class 66import Control.Monad.Trans.Class
64import Control.Monad.Trans.Maybe 67import Control.Monad.Trans.Maybe
@@ -548,7 +551,8 @@ handlePresenceProbe session stanza = do
548 subs <- getSubscribers (peerSessionFactory session) user 551 subs <- getSubscribers (peerSessionFactory session) user
549 liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs 552 liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs
550 forM_ subs $ \jidstr -> do 553 forM_ subs $ \jidstr -> do
551 handle (\(SomeException _) -> return ()) $ do 554 handle (\(IOError _ _ _ _ _ _) -> return ()) $ do
555 -- handle (\(SomeException _) -> return ()) $ do
552 L.putStrLn $ "parsing " <++>jidstr 556 L.putStrLn $ "parsing " <++>jidstr
553 sub <- parseHostNameJID jidstr 557 sub <- parseHostNameJID jidstr
554 putStrLn $ "comparing " ++show (peer sub , peerAddress session) 558 putStrLn $ "comparing " ++show (peer sub , peerAddress session)
@@ -615,46 +619,45 @@ data CachedMessages = CachedMessages
615 619
616connect_to_server chan peer = (>> return ()) . runMaybeT $ do 620connect_to_server chan peer = (>> return ()) . runMaybeT $ do
617 let port = 5269 :: Int 621 let port = 5269 :: Int
618
619 connected <- liftIO . async $ connect' (peerAddr peer) port
620
621 -- We'll cache Presence notifications until the socket 622 -- We'll cache Presence notifications until the socket
622 -- is ready. 623 -- is ready.
623 cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty) 624 cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty)
624 625
625 sock <- MaybeT . fix $ \loop -> do 626 let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do
626 e <- atomically $ orElse 627 cache <- readIORef cached
627 (fmap Right $ waitSTM connected) 628 writeIORef cached (cache { presences=Map.delete jid . presences $ cache })
628 (fmap Left $ readTChan chan) 629 cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do
629 case e of 630 cache <- readIORef cached
630 Left (OutBoundPresence (Presence jid Offline)) -> do 631 writeIORef cached (cache { presences=Map.insert jid st . presences $ cache })
631 cache <- readIORef cached 632 cacheCmd (PresenceProbe from to) cached = do
632 writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) 633 cache <- readIORef cached
633 loop 634 let probes' = Map.adjust (Set.insert from) to $ probes cache
634 Left (OutBoundPresence p@(Presence jid st)) -> do 635 writeIORef cached (cache { probes=probes' })
635 cache <- readIORef cached 636
636 writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) 637 fix $ \sendmsgs -> do
637 loop 638 connected <- liftIO . async $ connect' (peerAddr peer) port
638 Left (PresenceProbe from to) -> do 639
639 cache <- readIORef cached 640 sock <- MaybeT . fix $ \loop -> do
640 let probes' = Map.adjust (Set.insert from) to $ probes cache 641 e <- atomically $ orElse
641 writeIORef cached (cache { probes=probes' }) 642 (fmap Right $ waitSTM connected)
642 loop 643 (fmap Left $ readTChan chan)
643 {- 644 case e of
644 Left event -> do 645 Left cmd -> cacheCmd cmd cached >> loop
645 L.putStrLn $ "REMOTE-OUT DISCARDED: " <++> bshow event 646 Right sock -> return sock
646 loop 647
647 -} 648 retry <- do
648 Right sock -> return sock 649 (cache,snk) <- liftIO $ do
649 650 h <- socketToHandle sock ReadWriteMode
650 liftIO $ do 651 hSetBuffering h NoBuffering
651 h <- socketToHandle sock ReadWriteMode 652 cache <- readIORef $ cached
652 hSetBuffering h NoBuffering 653 -- hint garbage collector: we're done with this...
653 let snk = packetSink h 654 writeIORef cached (CachedMessages Map.empty Map.empty)
654 cache <- readIORef $ cached 655 return (cache,packetSink h)
655 -- hint garbage collector: we're done with this... 656 MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk
656 writeIORef cached (CachedMessages Map.empty Map.empty) 657
657 handleOutgoingToPeer (restrictSocket sock) cache chan snk 658 liftIO $ cacheCmd retry cached
659 liftIO $ putStrLn $ "retrying " ++ show retry
660 sendmsgs
658 661
659 662
660greetPeer = 663greetPeer =
@@ -689,46 +692,94 @@ presenceProbe sock fromjid tojid = do
689 , EventEndElement "{jabber:server}presence" 692 , EventEndElement "{jabber:server}presence"
690 ] 693 ]
691 694
692toPeer sock cache chan = do 695{-
696toPeerChain
697 :: SocketLike sock =>
698 sock
699 -> CachedMessages
700 -> TChan OutBoundMessage
701 -> Sink ByteString IO b
702 -> IO b
703toPeerChain sock cache chan snk = toPeer sock cache chan $$ renderChunks =$ snk
704-}
705
706toPeer
707 :: SocketLike sock =>
708 sock
709 -> CachedMessages
710 -> TChan OutBoundMessage
711 -> (Maybe OutBoundMessage -> IO ())
712 -> ConduitM i [Event] IO ()
713toPeer sock cache chan fail = do
693 let -- log = liftIO . L.putStrLn . ("(>P) " <++>) 714 let -- log = liftIO . L.putStrLn . ("(>P) " <++>)
694 send xs = yield xs >> prettyPrint ">P: " xs 715 send xs = yield xs >> prettyPrint ">P: " xs -- >> return (3::Int)
716 checkConnection cmd = do
717 liftIO $ catch (getPeerName sock >> return ())
718 (\_ -> fail . Just $ cmd)
719 sendPresence presence = do
720 r <- lift $ xmlifyPresenceForPeer sock presence
721 {-
722 liftIO $ do
723 p' <- catch (fmap (Just . RemotePeer) $ getPeerName sock)
724 (\_ -> (fail . Just . OutBoundPresence $ presence) >> return Nothing)
725 L.putStrLn $ "sending Presence to " <++?> fmap showPeer p'
726 -}
727 let cmd = OutBoundPresence presence
728 checkConnection cmd
729 yieldOr r (fail . Just $ cmd)
730 prettyPrint ">P: " r
731 sendProbe from to = do
732 r <- liftIO $ presenceProbe sock from to
733 let cmd = PresenceProbe from to
734 checkConnection cmd
735 yieldOr r (fail . Just $ cmd)
736 prettyPrint ">P: " r
737
695 send greetPeer 738 send greetPeer
696 forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do 739 forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do
697 r <- lift $ xmlifyPresenceForPeer sock (Presence jid st) 740 sendPresence (Presence jid st)
698 send r
699 forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do 741 forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do
700 forM_ (Set.toList froms) $ \from -> do 742 forM_ (Set.toList froms) $ \from -> do
701 liftIO $ L.putStrLn "sending cached probe..." 743 liftIO $ L.putStrLn "sending cached probe..."
702 r <- liftIO $ presenceProbe sock from to 744 sendProbe from to
703 send r
704 fix $ \loop -> do 745 fix $ \loop -> do
705 event <- lift . atomically $ readTChan chan 746 event <- lift . atomically $ readTChan chan
706 case event of 747 case event of
707 OutBoundPresence p -> do 748 OutBoundPresence p -> sendPresence p
708 r <- lift $ xmlifyPresenceForPeer sock p
709 send r
710 PresenceProbe from to -> do 749 PresenceProbe from to -> do
711 liftIO $ L.putStrLn "sending live probe..." 750 liftIO $ L.putStrLn "sending live probe..."
712 r <- liftIO $ presenceProbe sock from to 751 sendProbe from to
713 send r
714 loop 752 loop
715 send goodbyePeer 753 send goodbyePeer
716 754
755handleOutgoingToPeer
756 :: SocketLike sock =>
757 sock
758 -> CachedMessages
759 -> TChan OutBoundMessage
760 -> Sink ByteString IO ()
761 -> IO (Maybe OutBoundMessage)
717handleOutgoingToPeer sock cache chan snk = do 762handleOutgoingToPeer sock cache chan snk = do
718 p <- getPeerName sock 763 p <- getPeerName sock
719 L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) 764 L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
765 failed <- newIORef Nothing
766 let failure cmd = do
767 writeIORef failed cmd
768 putStrLn $ "Failed: " ++ show cmd
720 finally ( 769 finally (
721#ifdef RENDERFLUSH 770#ifdef RENDERFLUSH
722 toPeer sock cache chan 771 handle (\(IOError _ _ _ _ _ _) -> return ()) $
723 $$ flushList 772 toPeer sock cache chan failure
724 =$= renderBuilderFlush def 773 $$ flushList
725 =$= builderToByteStringFlush 774 =$= renderBuilderFlush def
726 =$= discardFlush 775 =$= builderToByteStringFlush
727 =$ snk 776 =$= discardFlush
777 =$ snk
728#else 778#else
729 toPeer sock cache chan $$ renderChunks =$ snk 779 handle (\(IOError _ _ _ _ _ _) -> return ()) $ toPeer sock cache chan failure $$ renderChunks =$ snk
730#endif 780#endif
731 ) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) 781 ) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
782 readIORef failed
732 783
733connect' :: SockAddr -> Int -> IO (Maybe Socket) 784connect' :: SockAddr -> Int -> IO (Maybe Socket)
734connect' addr port = do 785connect' addr port = do
@@ -743,7 +794,8 @@ connect' addr port = do
743 -} 794 -}
744 let getport (SockAddrInet port _) = port 795 let getport (SockAddrInet port _) = port
745 getport (SockAddrInet6 port _ _ _) = port 796 getport (SockAddrInet6 port _ _ _) = port
746 let doException (SomeException e) = do 797 let doException e@(IOError _ _ _ _ _ _) = do
798 -- let doException (SomeException e) = do
747 L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e 799 L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
748 return Nothing 800 return Nothing
749 handle doException 801 handle doException