diff options
Diffstat (limited to 'Presence/XMPP.hs')
-rw-r--r-- | Presence/XMPP.hs | 170 |
1 files changed, 111 insertions, 59 deletions
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index fd29473e..3f81f000 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -42,8 +42,6 @@ import System.IO | |||
42 | , IOMode(..) | 42 | , IOMode(..) |
43 | , hSetBuffering | 43 | , hSetBuffering |
44 | ) | 44 | ) |
45 | import Control.Exception | ||
46 | ( bracketOnError ) | ||
47 | import Control.Concurrent.STM | 45 | import Control.Concurrent.STM |
48 | import Data.Conduit | 46 | import Data.Conduit |
49 | import qualified Data.Conduit.List as CL | 47 | import qualified Data.Conduit.List as CL |
@@ -58,7 +56,12 @@ import qualified Data.ByteString.Lazy.Char8 as L | |||
58 | ) | 56 | ) |
59 | import Control.Concurrent (forkIO,killThread) | 57 | import Control.Concurrent (forkIO,killThread) |
60 | import Control.Concurrent.Async | 58 | import Control.Concurrent.Async |
61 | import Control.Exception (handle,SomeException(..),finally) | 59 | import Control.Exception |
60 | ( handle | ||
61 | -- , SomeException(..) | ||
62 | , finally | ||
63 | , bracketOnError ) | ||
64 | import GHC.IO.Exception (IOException(..)) | ||
62 | import Control.Monad.IO.Class | 65 | import Control.Monad.IO.Class |
63 | import Control.Monad.Trans.Class | 66 | import Control.Monad.Trans.Class |
64 | import Control.Monad.Trans.Maybe | 67 | import Control.Monad.Trans.Maybe |
@@ -548,7 +551,8 @@ handlePresenceProbe session stanza = do | |||
548 | subs <- getSubscribers (peerSessionFactory session) user | 551 | subs <- getSubscribers (peerSessionFactory session) user |
549 | liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs | 552 | liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs |
550 | forM_ subs $ \jidstr -> do | 553 | forM_ subs $ \jidstr -> do |
551 | handle (\(SomeException _) -> return ()) $ do | 554 | handle (\(IOError _ _ _ _ _ _) -> return ()) $ do |
555 | -- handle (\(SomeException _) -> return ()) $ do | ||
552 | L.putStrLn $ "parsing " <++>jidstr | 556 | L.putStrLn $ "parsing " <++>jidstr |
553 | sub <- parseHostNameJID jidstr | 557 | sub <- parseHostNameJID jidstr |
554 | putStrLn $ "comparing " ++show (peer sub , peerAddress session) | 558 | putStrLn $ "comparing " ++show (peer sub , peerAddress session) |
@@ -615,46 +619,45 @@ data CachedMessages = CachedMessages | |||
615 | 619 | ||
616 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | 620 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do |
617 | let port = 5269 :: Int | 621 | let port = 5269 :: Int |
618 | |||
619 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
620 | |||
621 | -- We'll cache Presence notifications until the socket | 622 | -- We'll cache Presence notifications until the socket |
622 | -- is ready. | 623 | -- is ready. |
623 | cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty) | 624 | cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty) |
624 | 625 | ||
625 | sock <- MaybeT . fix $ \loop -> do | 626 | let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do |
626 | e <- atomically $ orElse | 627 | cache <- readIORef cached |
627 | (fmap Right $ waitSTM connected) | 628 | writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) |
628 | (fmap Left $ readTChan chan) | 629 | cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do |
629 | case e of | 630 | cache <- readIORef cached |
630 | Left (OutBoundPresence (Presence jid Offline)) -> do | 631 | writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) |
631 | cache <- readIORef cached | 632 | cacheCmd (PresenceProbe from to) cached = do |
632 | writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) | 633 | cache <- readIORef cached |
633 | loop | 634 | let probes' = Map.adjust (Set.insert from) to $ probes cache |
634 | Left (OutBoundPresence p@(Presence jid st)) -> do | 635 | writeIORef cached (cache { probes=probes' }) |
635 | cache <- readIORef cached | 636 | |
636 | writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) | 637 | fix $ \sendmsgs -> do |
637 | loop | 638 | connected <- liftIO . async $ connect' (peerAddr peer) port |
638 | Left (PresenceProbe from to) -> do | 639 | |
639 | cache <- readIORef cached | 640 | sock <- MaybeT . fix $ \loop -> do |
640 | let probes' = Map.adjust (Set.insert from) to $ probes cache | 641 | e <- atomically $ orElse |
641 | writeIORef cached (cache { probes=probes' }) | 642 | (fmap Right $ waitSTM connected) |
642 | loop | 643 | (fmap Left $ readTChan chan) |
643 | {- | 644 | case e of |
644 | Left event -> do | 645 | Left cmd -> cacheCmd cmd cached >> loop |
645 | L.putStrLn $ "REMOTE-OUT DISCARDED: " <++> bshow event | 646 | Right sock -> return sock |
646 | loop | 647 | |
647 | -} | 648 | retry <- do |
648 | Right sock -> return sock | 649 | (cache,snk) <- liftIO $ do |
649 | 650 | h <- socketToHandle sock ReadWriteMode | |
650 | liftIO $ do | 651 | hSetBuffering h NoBuffering |
651 | h <- socketToHandle sock ReadWriteMode | 652 | cache <- readIORef $ cached |
652 | hSetBuffering h NoBuffering | 653 | -- hint garbage collector: we're done with this... |
653 | let snk = packetSink h | 654 | writeIORef cached (CachedMessages Map.empty Map.empty) |
654 | cache <- readIORef $ cached | 655 | return (cache,packetSink h) |
655 | -- hint garbage collector: we're done with this... | 656 | MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk |
656 | writeIORef cached (CachedMessages Map.empty Map.empty) | 657 | |
657 | handleOutgoingToPeer (restrictSocket sock) cache chan snk | 658 | liftIO $ cacheCmd retry cached |
659 | liftIO $ putStrLn $ "retrying " ++ show retry | ||
660 | sendmsgs | ||
658 | 661 | ||
659 | 662 | ||
660 | greetPeer = | 663 | greetPeer = |
@@ -689,46 +692,94 @@ presenceProbe sock fromjid tojid = do | |||
689 | , EventEndElement "{jabber:server}presence" | 692 | , EventEndElement "{jabber:server}presence" |
690 | ] | 693 | ] |
691 | 694 | ||
692 | toPeer sock cache chan = do | 695 | {- |
696 | toPeerChain | ||
697 | :: SocketLike sock => | ||
698 | sock | ||
699 | -> CachedMessages | ||
700 | -> TChan OutBoundMessage | ||
701 | -> Sink ByteString IO b | ||
702 | -> IO b | ||
703 | toPeerChain sock cache chan snk = toPeer sock cache chan $$ renderChunks =$ snk | ||
704 | -} | ||
705 | |||
706 | toPeer | ||
707 | :: SocketLike sock => | ||
708 | sock | ||
709 | -> CachedMessages | ||
710 | -> TChan OutBoundMessage | ||
711 | -> (Maybe OutBoundMessage -> IO ()) | ||
712 | -> ConduitM i [Event] IO () | ||
713 | toPeer sock cache chan fail = do | ||
693 | let -- log = liftIO . L.putStrLn . ("(>P) " <++>) | 714 | let -- log = liftIO . L.putStrLn . ("(>P) " <++>) |
694 | send xs = yield xs >> prettyPrint ">P: " xs | 715 | send xs = yield xs >> prettyPrint ">P: " xs -- >> return (3::Int) |
716 | checkConnection cmd = do | ||
717 | liftIO $ catch (getPeerName sock >> return ()) | ||
718 | (\_ -> fail . Just $ cmd) | ||
719 | sendPresence presence = do | ||
720 | r <- lift $ xmlifyPresenceForPeer sock presence | ||
721 | {- | ||
722 | liftIO $ do | ||
723 | p' <- catch (fmap (Just . RemotePeer) $ getPeerName sock) | ||
724 | (\_ -> (fail . Just . OutBoundPresence $ presence) >> return Nothing) | ||
725 | L.putStrLn $ "sending Presence to " <++?> fmap showPeer p' | ||
726 | -} | ||
727 | let cmd = OutBoundPresence presence | ||
728 | checkConnection cmd | ||
729 | yieldOr r (fail . Just $ cmd) | ||
730 | prettyPrint ">P: " r | ||
731 | sendProbe from to = do | ||
732 | r <- liftIO $ presenceProbe sock from to | ||
733 | let cmd = PresenceProbe from to | ||
734 | checkConnection cmd | ||
735 | yieldOr r (fail . Just $ cmd) | ||
736 | prettyPrint ">P: " r | ||
737 | |||
695 | send greetPeer | 738 | send greetPeer |
696 | forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do | 739 | forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do |
697 | r <- lift $ xmlifyPresenceForPeer sock (Presence jid st) | 740 | sendPresence (Presence jid st) |
698 | send r | ||
699 | forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do | 741 | forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do |
700 | forM_ (Set.toList froms) $ \from -> do | 742 | forM_ (Set.toList froms) $ \from -> do |
701 | liftIO $ L.putStrLn "sending cached probe..." | 743 | liftIO $ L.putStrLn "sending cached probe..." |
702 | r <- liftIO $ presenceProbe sock from to | 744 | sendProbe from to |
703 | send r | ||
704 | fix $ \loop -> do | 745 | fix $ \loop -> do |
705 | event <- lift . atomically $ readTChan chan | 746 | event <- lift . atomically $ readTChan chan |
706 | case event of | 747 | case event of |
707 | OutBoundPresence p -> do | 748 | OutBoundPresence p -> sendPresence p |
708 | r <- lift $ xmlifyPresenceForPeer sock p | ||
709 | send r | ||
710 | PresenceProbe from to -> do | 749 | PresenceProbe from to -> do |
711 | liftIO $ L.putStrLn "sending live probe..." | 750 | liftIO $ L.putStrLn "sending live probe..." |
712 | r <- liftIO $ presenceProbe sock from to | 751 | sendProbe from to |
713 | send r | ||
714 | loop | 752 | loop |
715 | send goodbyePeer | 753 | send goodbyePeer |
716 | 754 | ||
755 | handleOutgoingToPeer | ||
756 | :: SocketLike sock => | ||
757 | sock | ||
758 | -> CachedMessages | ||
759 | -> TChan OutBoundMessage | ||
760 | -> Sink ByteString IO () | ||
761 | -> IO (Maybe OutBoundMessage) | ||
717 | handleOutgoingToPeer sock cache chan snk = do | 762 | handleOutgoingToPeer sock cache chan snk = do |
718 | p <- getPeerName sock | 763 | p <- getPeerName sock |
719 | L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) | 764 | L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) |
765 | failed <- newIORef Nothing | ||
766 | let failure cmd = do | ||
767 | writeIORef failed cmd | ||
768 | putStrLn $ "Failed: " ++ show cmd | ||
720 | finally ( | 769 | finally ( |
721 | #ifdef RENDERFLUSH | 770 | #ifdef RENDERFLUSH |
722 | toPeer sock cache chan | 771 | handle (\(IOError _ _ _ _ _ _) -> return ()) $ |
723 | $$ flushList | 772 | toPeer sock cache chan failure |
724 | =$= renderBuilderFlush def | 773 | $$ flushList |
725 | =$= builderToByteStringFlush | 774 | =$= renderBuilderFlush def |
726 | =$= discardFlush | 775 | =$= builderToByteStringFlush |
727 | =$ snk | 776 | =$= discardFlush |
777 | =$ snk | ||
728 | #else | 778 | #else |
729 | toPeer sock cache chan $$ renderChunks =$ snk | 779 | handle (\(IOError _ _ _ _ _ _) -> return ()) $ toPeer sock cache chan failure $$ renderChunks =$ snk |
730 | #endif | 780 | #endif |
731 | ) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) | 781 | ) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) |
782 | readIORef failed | ||
732 | 783 | ||
733 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | 784 | connect' :: SockAddr -> Int -> IO (Maybe Socket) |
734 | connect' addr port = do | 785 | connect' addr port = do |
@@ -743,7 +794,8 @@ connect' addr port = do | |||
743 | -} | 794 | -} |
744 | let getport (SockAddrInet port _) = port | 795 | let getport (SockAddrInet port _) = port |
745 | getport (SockAddrInet6 port _ _ _) = port | 796 | getport (SockAddrInet6 port _ _ _) = port |
746 | let doException (SomeException e) = do | 797 | let doException e@(IOError _ _ _ _ _ _) = do |
798 | -- let doException (SomeException e) = do | ||
747 | L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | 799 | L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e |
748 | return Nothing | 800 | return Nothing |
749 | handle doException | 801 | handle doException |