diff options
Diffstat (limited to 'Presence/XMPP.hs')
-rw-r--r-- | Presence/XMPP.hs | 198 |
1 files changed, 195 insertions, 3 deletions
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index 1a4b0e7b..417b3ce7 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -16,8 +16,29 @@ import SocketLike | |||
16 | import ByteStringOperators | 16 | import ByteStringOperators |
17 | 17 | ||
18 | import Data.HList | 18 | import Data.HList |
19 | import Network.Socket (Family) | 19 | import Network.Socket |
20 | import Network.BSD (PortNumber) | 20 | ( Family |
21 | , connect | ||
22 | , socketToHandle | ||
23 | , sClose | ||
24 | , Socket(..) | ||
25 | , socket | ||
26 | , SocketType(..) | ||
27 | ) | ||
28 | import Network.BSD | ||
29 | ( PortNumber | ||
30 | , getHostName | ||
31 | , hostName | ||
32 | , hostAliases | ||
33 | , getProtocolNumber | ||
34 | ) | ||
35 | import System.IO | ||
36 | ( BufferMode(..) | ||
37 | , IOMode(..) | ||
38 | , hSetBuffering | ||
39 | ) | ||
40 | import Control.Exception | ||
41 | ( bracketOnError ) | ||
21 | import Control.Concurrent.STM | 42 | import Control.Concurrent.STM |
22 | import Data.Conduit | 43 | import Data.Conduit |
23 | import qualified Data.Conduit.List as CL | 44 | import qualified Data.Conduit.List as CL |
@@ -27,6 +48,8 @@ import qualified Data.ByteString.Char8 as S (pack,putStr,putStrLn,append) | |||
27 | import qualified Data.ByteString.Lazy.Char8 as L | 48 | import qualified Data.ByteString.Lazy.Char8 as L |
28 | ( putStrLn | 49 | ( putStrLn |
29 | , fromChunks | 50 | , fromChunks |
51 | , unlines | ||
52 | , hPutStrLn | ||
30 | ) | 53 | ) |
31 | import Control.Concurrent (forkIO,killThread) | 54 | import Control.Concurrent (forkIO,killThread) |
32 | import Control.Concurrent.Async | 55 | import Control.Concurrent.Async |
@@ -39,7 +62,6 @@ import Control.Monad as Monad | |||
39 | import Text.XML.Stream.Parse (parseBytes,content) | 62 | import Text.XML.Stream.Parse (parseBytes,content) |
40 | import Text.XML.Stream.Render | 63 | import Text.XML.Stream.Render |
41 | import Data.XML.Types as XML | 64 | import Data.XML.Types as XML |
42 | import Network.BSD (getHostName,hostName,hostAliases) | ||
43 | import Data.Text.Encoding as S (decodeUtf8,encodeUtf8) | 65 | import Data.Text.Encoding as S (decodeUtf8,encodeUtf8) |
44 | import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8) | 66 | import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8) |
45 | import Data.Text.Lazy (toStrict) | 67 | import Data.Text.Lazy (toStrict) |
@@ -54,6 +76,11 @@ import Data.List (find) | |||
54 | import qualified Text.Show.ByteString as L | 76 | import qualified Text.Show.ByteString as L |
55 | import NestingXML | 77 | import NestingXML |
56 | import qualified Data.Set as Set | 78 | import qualified Data.Set as Set |
79 | import qualified Data.Map as Map | ||
80 | import GHC.Conc | ||
81 | ( threadStatus | ||
82 | , ThreadStatus(..) | ||
83 | ) | ||
57 | 84 | ||
58 | data Commands = Send [XML.Event] | QuitThread | 85 | data Commands = Send [XML.Event] | QuitThread |
59 | deriving Prelude.Show | 86 | deriving Prelude.Show |
@@ -534,9 +561,174 @@ fromPeer session = doNestingXML $ do | |||
534 | 561 | ||
535 | 562 | ||
536 | 563 | ||
564 | {- | ||
537 | seekRemotePeers :: XMPPConfig config => | 565 | seekRemotePeers :: XMPPConfig config => |
538 | config -> TChan Presence -> IO () | 566 | config -> TChan Presence -> IO () |
539 | seekRemotePeers config chan = do | 567 | seekRemotePeers config chan = do |
540 | putStrLn "unimplemented: seekRemotePeers" | 568 | putStrLn "unimplemented: seekRemotePeers" |
541 | -- TODO | 569 | -- TODO |
542 | return () | 570 | return () |
571 | -} | ||
572 | |||
573 | data OutBoundMessage = OutBoundPresence Presence | ||
574 | deriving Prelude.Show | ||
575 | |||
576 | newServerConnections = atomically $ newTVar Map.empty | ||
577 | |||
578 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | ||
579 | let port = 5269 :: Int | ||
580 | |||
581 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
582 | |||
583 | -- We'll cache Presence notifications until the socket | ||
584 | -- is ready. | ||
585 | cached <- liftIO $ newIORef Map.empty | ||
586 | |||
587 | sock <- MaybeT . fix $ \loop -> do | ||
588 | e <- atomically $ orElse | ||
589 | (fmap Right $ waitSTM connected) | ||
590 | (fmap Left $ readTChan chan) | ||
591 | case e of | ||
592 | Left (OutBoundPresence (Presence jid Offline)) -> do | ||
593 | cached_map <- readIORef cached | ||
594 | writeIORef cached (Map.delete jid cached_map) | ||
595 | loop | ||
596 | Left (OutBoundPresence p@(Presence jid st)) -> do | ||
597 | cached_map <- readIORef cached | ||
598 | writeIORef cached (Map.insert jid st cached_map) | ||
599 | loop | ||
600 | {- | ||
601 | Left event -> do | ||
602 | L.putStrLn $ "REMOTE-OUT DISCARDED: " <++> bshow event | ||
603 | loop | ||
604 | -} | ||
605 | Right sock -> return sock | ||
606 | |||
607 | liftIO $ do | ||
608 | h <- socketToHandle sock ReadWriteMode | ||
609 | hSetBuffering h NoBuffering | ||
610 | L.hPutStrLn h "<stream>" | ||
611 | L.putStrLn $ "OUT peer: <stream>" | ||
612 | cache <- fmap Map.assocs . readIORef $ cached | ||
613 | writeIORef cached Map.empty -- hint garbage collector: we're done with this | ||
614 | forM_ cache $ \(jid,st) -> do | ||
615 | r <- xmlifyPresenceForPeer sock (Presence jid st) | ||
616 | L.hPutStrLn h r | ||
617 | L.putStrLn $ "OUT peer: (cache)\n" <++> r <++> "\n" | ||
618 | fix $ \loop -> do | ||
619 | event <- atomically $ readTChan chan | ||
620 | case event of | ||
621 | OutBoundPresence p -> do | ||
622 | r <- xmlifyPresenceForPeer sock p | ||
623 | L.hPutStrLn h r | ||
624 | L.putStrLn $ "OUT peer:\n" <++> r <++> "\n" | ||
625 | loop | ||
626 | L.hPutStrLn h "</stream>" | ||
627 | L.putStrLn $ "OUT peer: </stream>" | ||
628 | |||
629 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | ||
630 | connect' addr port = do | ||
631 | proto <- getProtocolNumber "tcp" | ||
632 | {- | ||
633 | -- Given (host :: HostName) ... | ||
634 | let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] | ||
635 | , addrProtocol = proto | ||
636 | , addrSocketType = Stream } | ||
637 | addrs <- getAddrInfo (Just hints) (Just host) (Just serv) | ||
638 | firstSuccessful $ map tryToConnect addrs | ||
639 | -} | ||
640 | let getport (SockAddrInet port _) = port | ||
641 | getport (SockAddrInet6 port _ _ _) = port | ||
642 | let withPort (SockAddrInet _ a) port = SockAddrInet (toEnum port) a | ||
643 | withPort (SockAddrInet6 _ a b c) port = SockAddrInet6 (toEnum port) a b c | ||
644 | let doException (SomeException e) = do | ||
645 | L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | ||
646 | return Nothing | ||
647 | handle doException | ||
648 | $ tryToConnect proto (addr `withPort` port) | ||
649 | where | ||
650 | tryToConnect proto addr = | ||
651 | bracketOnError | ||
652 | (socket (socketFamily addr) Stream proto) | ||
653 | (sClose ) -- only done if there's an error | ||
654 | (\sock -> do | ||
655 | connect sock addr | ||
656 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
657 | ) | ||
658 | |||
659 | |||
660 | |||
661 | sendMessage cons msg peer = do | ||
662 | found <- atomically $ do | ||
663 | consmap <- readTVar cons | ||
664 | return (Map.lookup peer consmap) | ||
665 | let newEntry = do | ||
666 | chan <- atomically newTChan | ||
667 | t <- forkIO $ connect_to_server chan peer | ||
668 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | ||
669 | return (True,(chan,t)) | ||
670 | (is_new,entry) <- maybe newEntry | ||
671 | ( \(chan,t) -> do | ||
672 | st <- threadStatus t | ||
673 | let running = do | ||
674 | -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer | ||
675 | return (False,(chan,t)) | ||
676 | died = do | ||
677 | -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer | ||
678 | newEntry | ||
679 | case st of | ||
680 | ThreadRunning -> running | ||
681 | ThreadBlocked _ -> running | ||
682 | ThreadDied -> died | ||
683 | ThreadFinished -> died | ||
684 | ) | ||
685 | found | ||
686 | -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg | ||
687 | atomically $ writeTChan (fst entry) msg | ||
688 | when is_new . atomically $ | ||
689 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
690 | |||
691 | |||
692 | |||
693 | seekRemotePeers :: XMPPConfig config => | ||
694 | config -> TChan Presence -> IO b0 | ||
695 | seekRemotePeers config chan = do | ||
696 | server_connections <- newServerConnections | ||
697 | fix $ \loop -> do | ||
698 | event <- atomically $ readTChan chan | ||
699 | case event of | ||
700 | p@(Presence jid stat) | not (is_remote (peer jid)) -> do | ||
701 | -- L.putStrLn $ "seekRemotePeers: " <++> L.show jid <++> " " <++> bshow stat | ||
702 | runMaybeT $ do | ||
703 | u <- MaybeT . return $ name jid | ||
704 | subscribers <- liftIO $ do | ||
705 | subs <- getSubscribers config u | ||
706 | mapM parseHostNameJID subs | ||
707 | -- liftIO . L.putStrLn $ "subscribers: " <++> bshow subscribers | ||
708 | let peers = Set.map peer (Set.fromList subscribers) | ||
709 | forM_ (Set.toList peers) $ \peer -> do | ||
710 | when (is_remote peer) $ | ||
711 | liftIO $ sendMessage server_connections (OutBoundPresence p) peer | ||
712 | -- TODO: send presence probes for buddies | ||
713 | -- TODO: cache remote presences for clients | ||
714 | _ -> return (Just ()) | ||
715 | loop | ||
716 | |||
717 | xmlifyPresenceForPeer sock (Presence jid stat) = do | ||
718 | -- TODO: accept socket argument and determine local ip address | ||
719 | -- connected to this peer. | ||
720 | addr <- getSocketName sock | ||
721 | let n = name jid | ||
722 | rsc = resource jid | ||
723 | jid_str = n <$++> "@" <?++> showPeer (RemotePeer addr) <++?> "/" <++$> rsc | ||
724 | return . L.unlines $ | ||
725 | [ "<presence from='" <++> jid_str <++> "' " <++> typ stat <++> ">" | ||
726 | , "<show>" <++> shw stat <++> "</show>" | ||
727 | , "</presence>" | ||
728 | ] | ||
729 | where | ||
730 | typ Offline = " type='unavailable'" | ||
731 | typ _ = "" | ||
732 | shw Available = "chat" | ||
733 | shw Away = "away" | ||
734 | shw Offline = "away" -- Is this right? | ||