diff options
author | joe <joe@jerkface.net> | 2013-06-30 14:20:43 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-06-30 14:20:43 -0400 |
commit | 24bd9dfb9e8e908056ce2bb601b6fe16bfa84c7a (patch) | |
tree | ce2dbcaf0a816a6c3fda7f48eb8afe009dec9f9b | |
parent | ad04502413624cad42b36437ef0d30fd8105817f (diff) |
outgoing connections to peers added to XMPP.hs.
It still uses Handle for now, TODO: change to a ByteString sink.
-rw-r--r-- | Presence/XMPP.hs | 198 | ||||
-rw-r--r-- | Presence/XMPPServer.hs | 63 | ||||
-rw-r--r-- | Presence/XMPPTypes.hs | 86 |
3 files changed, 279 insertions, 68 deletions
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index 1a4b0e7b..417b3ce7 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -16,8 +16,29 @@ import SocketLike | |||
16 | import ByteStringOperators | 16 | import ByteStringOperators |
17 | 17 | ||
18 | import Data.HList | 18 | import Data.HList |
19 | import Network.Socket (Family) | 19 | import Network.Socket |
20 | import Network.BSD (PortNumber) | 20 | ( Family |
21 | , connect | ||
22 | , socketToHandle | ||
23 | , sClose | ||
24 | , Socket(..) | ||
25 | , socket | ||
26 | , SocketType(..) | ||
27 | ) | ||
28 | import Network.BSD | ||
29 | ( PortNumber | ||
30 | , getHostName | ||
31 | , hostName | ||
32 | , hostAliases | ||
33 | , getProtocolNumber | ||
34 | ) | ||
35 | import System.IO | ||
36 | ( BufferMode(..) | ||
37 | , IOMode(..) | ||
38 | , hSetBuffering | ||
39 | ) | ||
40 | import Control.Exception | ||
41 | ( bracketOnError ) | ||
21 | import Control.Concurrent.STM | 42 | import Control.Concurrent.STM |
22 | import Data.Conduit | 43 | import Data.Conduit |
23 | import qualified Data.Conduit.List as CL | 44 | import qualified Data.Conduit.List as CL |
@@ -27,6 +48,8 @@ import qualified Data.ByteString.Char8 as S (pack,putStr,putStrLn,append) | |||
27 | import qualified Data.ByteString.Lazy.Char8 as L | 48 | import qualified Data.ByteString.Lazy.Char8 as L |
28 | ( putStrLn | 49 | ( putStrLn |
29 | , fromChunks | 50 | , fromChunks |
51 | , unlines | ||
52 | , hPutStrLn | ||
30 | ) | 53 | ) |
31 | import Control.Concurrent (forkIO,killThread) | 54 | import Control.Concurrent (forkIO,killThread) |
32 | import Control.Concurrent.Async | 55 | import Control.Concurrent.Async |
@@ -39,7 +62,6 @@ import Control.Monad as Monad | |||
39 | import Text.XML.Stream.Parse (parseBytes,content) | 62 | import Text.XML.Stream.Parse (parseBytes,content) |
40 | import Text.XML.Stream.Render | 63 | import Text.XML.Stream.Render |
41 | import Data.XML.Types as XML | 64 | import Data.XML.Types as XML |
42 | import Network.BSD (getHostName,hostName,hostAliases) | ||
43 | import Data.Text.Encoding as S (decodeUtf8,encodeUtf8) | 65 | import Data.Text.Encoding as S (decodeUtf8,encodeUtf8) |
44 | import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8) | 66 | import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8) |
45 | import Data.Text.Lazy (toStrict) | 67 | import Data.Text.Lazy (toStrict) |
@@ -54,6 +76,11 @@ import Data.List (find) | |||
54 | import qualified Text.Show.ByteString as L | 76 | import qualified Text.Show.ByteString as L |
55 | import NestingXML | 77 | import NestingXML |
56 | import qualified Data.Set as Set | 78 | import qualified Data.Set as Set |
79 | import qualified Data.Map as Map | ||
80 | import GHC.Conc | ||
81 | ( threadStatus | ||
82 | , ThreadStatus(..) | ||
83 | ) | ||
57 | 84 | ||
58 | data Commands = Send [XML.Event] | QuitThread | 85 | data Commands = Send [XML.Event] | QuitThread |
59 | deriving Prelude.Show | 86 | deriving Prelude.Show |
@@ -534,9 +561,174 @@ fromPeer session = doNestingXML $ do | |||
534 | 561 | ||
535 | 562 | ||
536 | 563 | ||
564 | {- | ||
537 | seekRemotePeers :: XMPPConfig config => | 565 | seekRemotePeers :: XMPPConfig config => |
538 | config -> TChan Presence -> IO () | 566 | config -> TChan Presence -> IO () |
539 | seekRemotePeers config chan = do | 567 | seekRemotePeers config chan = do |
540 | putStrLn "unimplemented: seekRemotePeers" | 568 | putStrLn "unimplemented: seekRemotePeers" |
541 | -- TODO | 569 | -- TODO |
542 | return () | 570 | return () |
571 | -} | ||
572 | |||
573 | data OutBoundMessage = OutBoundPresence Presence | ||
574 | deriving Prelude.Show | ||
575 | |||
576 | newServerConnections = atomically $ newTVar Map.empty | ||
577 | |||
578 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | ||
579 | let port = 5269 :: Int | ||
580 | |||
581 | connected <- liftIO . async $ connect' (peerAddr peer) port | ||
582 | |||
583 | -- We'll cache Presence notifications until the socket | ||
584 | -- is ready. | ||
585 | cached <- liftIO $ newIORef Map.empty | ||
586 | |||
587 | sock <- MaybeT . fix $ \loop -> do | ||
588 | e <- atomically $ orElse | ||
589 | (fmap Right $ waitSTM connected) | ||
590 | (fmap Left $ readTChan chan) | ||
591 | case e of | ||
592 | Left (OutBoundPresence (Presence jid Offline)) -> do | ||
593 | cached_map <- readIORef cached | ||
594 | writeIORef cached (Map.delete jid cached_map) | ||
595 | loop | ||
596 | Left (OutBoundPresence p@(Presence jid st)) -> do | ||
597 | cached_map <- readIORef cached | ||
598 | writeIORef cached (Map.insert jid st cached_map) | ||
599 | loop | ||
600 | {- | ||
601 | Left event -> do | ||
602 | L.putStrLn $ "REMOTE-OUT DISCARDED: " <++> bshow event | ||
603 | loop | ||
604 | -} | ||
605 | Right sock -> return sock | ||
606 | |||
607 | liftIO $ do | ||
608 | h <- socketToHandle sock ReadWriteMode | ||
609 | hSetBuffering h NoBuffering | ||
610 | L.hPutStrLn h "<stream>" | ||
611 | L.putStrLn $ "OUT peer: <stream>" | ||
612 | cache <- fmap Map.assocs . readIORef $ cached | ||
613 | writeIORef cached Map.empty -- hint garbage collector: we're done with this | ||
614 | forM_ cache $ \(jid,st) -> do | ||
615 | r <- xmlifyPresenceForPeer sock (Presence jid st) | ||
616 | L.hPutStrLn h r | ||
617 | L.putStrLn $ "OUT peer: (cache)\n" <++> r <++> "\n" | ||
618 | fix $ \loop -> do | ||
619 | event <- atomically $ readTChan chan | ||
620 | case event of | ||
621 | OutBoundPresence p -> do | ||
622 | r <- xmlifyPresenceForPeer sock p | ||
623 | L.hPutStrLn h r | ||
624 | L.putStrLn $ "OUT peer:\n" <++> r <++> "\n" | ||
625 | loop | ||
626 | L.hPutStrLn h "</stream>" | ||
627 | L.putStrLn $ "OUT peer: </stream>" | ||
628 | |||
629 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | ||
630 | connect' addr port = do | ||
631 | proto <- getProtocolNumber "tcp" | ||
632 | {- | ||
633 | -- Given (host :: HostName) ... | ||
634 | let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] | ||
635 | , addrProtocol = proto | ||
636 | , addrSocketType = Stream } | ||
637 | addrs <- getAddrInfo (Just hints) (Just host) (Just serv) | ||
638 | firstSuccessful $ map tryToConnect addrs | ||
639 | -} | ||
640 | let getport (SockAddrInet port _) = port | ||
641 | getport (SockAddrInet6 port _ _ _) = port | ||
642 | let withPort (SockAddrInet _ a) port = SockAddrInet (toEnum port) a | ||
643 | withPort (SockAddrInet6 _ a b c) port = SockAddrInet6 (toEnum port) a b c | ||
644 | let doException (SomeException e) = do | ||
645 | L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e | ||
646 | return Nothing | ||
647 | handle doException | ||
648 | $ tryToConnect proto (addr `withPort` port) | ||
649 | where | ||
650 | tryToConnect proto addr = | ||
651 | bracketOnError | ||
652 | (socket (socketFamily addr) Stream proto) | ||
653 | (sClose ) -- only done if there's an error | ||
654 | (\sock -> do | ||
655 | connect sock addr | ||
656 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
657 | ) | ||
658 | |||
659 | |||
660 | |||
661 | sendMessage cons msg peer = do | ||
662 | found <- atomically $ do | ||
663 | consmap <- readTVar cons | ||
664 | return (Map.lookup peer consmap) | ||
665 | let newEntry = do | ||
666 | chan <- atomically newTChan | ||
667 | t <- forkIO $ connect_to_server chan peer | ||
668 | -- L.putStrLn $ "remote-map new: " <++> showPeer peer | ||
669 | return (True,(chan,t)) | ||
670 | (is_new,entry) <- maybe newEntry | ||
671 | ( \(chan,t) -> do | ||
672 | st <- threadStatus t | ||
673 | let running = do | ||
674 | -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer | ||
675 | return (False,(chan,t)) | ||
676 | died = do | ||
677 | -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer | ||
678 | newEntry | ||
679 | case st of | ||
680 | ThreadRunning -> running | ||
681 | ThreadBlocked _ -> running | ||
682 | ThreadDied -> died | ||
683 | ThreadFinished -> died | ||
684 | ) | ||
685 | found | ||
686 | -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg | ||
687 | atomically $ writeTChan (fst entry) msg | ||
688 | when is_new . atomically $ | ||
689 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
690 | |||
691 | |||
692 | |||
693 | seekRemotePeers :: XMPPConfig config => | ||
694 | config -> TChan Presence -> IO b0 | ||
695 | seekRemotePeers config chan = do | ||
696 | server_connections <- newServerConnections | ||
697 | fix $ \loop -> do | ||
698 | event <- atomically $ readTChan chan | ||
699 | case event of | ||
700 | p@(Presence jid stat) | not (is_remote (peer jid)) -> do | ||
701 | -- L.putStrLn $ "seekRemotePeers: " <++> L.show jid <++> " " <++> bshow stat | ||
702 | runMaybeT $ do | ||
703 | u <- MaybeT . return $ name jid | ||
704 | subscribers <- liftIO $ do | ||
705 | subs <- getSubscribers config u | ||
706 | mapM parseHostNameJID subs | ||
707 | -- liftIO . L.putStrLn $ "subscribers: " <++> bshow subscribers | ||
708 | let peers = Set.map peer (Set.fromList subscribers) | ||
709 | forM_ (Set.toList peers) $ \peer -> do | ||
710 | when (is_remote peer) $ | ||
711 | liftIO $ sendMessage server_connections (OutBoundPresence p) peer | ||
712 | -- TODO: send presence probes for buddies | ||
713 | -- TODO: cache remote presences for clients | ||
714 | _ -> return (Just ()) | ||
715 | loop | ||
716 | |||
717 | xmlifyPresenceForPeer sock (Presence jid stat) = do | ||
718 | -- TODO: accept socket argument and determine local ip address | ||
719 | -- connected to this peer. | ||
720 | addr <- getSocketName sock | ||
721 | let n = name jid | ||
722 | rsc = resource jid | ||
723 | jid_str = n <$++> "@" <?++> showPeer (RemotePeer addr) <++?> "/" <++$> rsc | ||
724 | return . L.unlines $ | ||
725 | [ "<presence from='" <++> jid_str <++> "' " <++> typ stat <++> ">" | ||
726 | , "<show>" <++> shw stat <++> "</show>" | ||
727 | , "</presence>" | ||
728 | ] | ||
729 | where | ||
730 | typ Offline = " type='unavailable'" | ||
731 | typ _ = "" | ||
732 | shw Available = "chat" | ||
733 | shw Away = "away" | ||
734 | shw Offline = "away" -- Is this right? | ||
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index f607989d..ff50ab1c 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -67,9 +67,6 @@ import qualified Data.Set as Set | |||
67 | import GetHostByAddr | 67 | import GetHostByAddr |
68 | import XMPPTypes | 68 | import XMPPTypes |
69 | 69 | ||
70 | is_remote (RemotePeer _) = True | ||
71 | is_remote _ = False | ||
72 | |||
73 | getNamesForPeer :: Peer -> IO [ByteString] | 70 | getNamesForPeer :: Peer -> IO [ByteString] |
74 | getNamesForPeer LocalHost = fmap ((:[]) . pack) getHostName | 71 | getNamesForPeer LocalHost = fmap ((:[]) . pack) getHostName |
75 | getNamesForPeer peer@(RemotePeer addr) = do | 72 | getNamesForPeer peer@(RemotePeer addr) = do |
@@ -85,10 +82,6 @@ getNamesForPeer peer@(RemotePeer addr) = do | |||
85 | return . map pack $ names | 82 | return . map pack $ names |
86 | 83 | ||
87 | 84 | ||
88 | peerAddr :: Peer -> SockAddr | ||
89 | peerAddr (RemotePeer addr) = addr | ||
90 | -- peerAddr LocalHost = throw exception | ||
91 | |||
92 | 85 | ||
93 | xmlifyPresenceForPeer sock (Presence jid stat) = do | 86 | xmlifyPresenceForPeer sock (Presence jid stat) = do |
94 | -- TODO: accept socket argument and determine local ip address | 87 | -- TODO: accept socket argument and determine local ip address |
@@ -587,62 +580,6 @@ connect_to_server chan peer = (>> return ()) . runMaybeT $ do | |||
587 | L.putStrLn $ "OUT peer: </stream>" | 580 | L.putStrLn $ "OUT peer: </stream>" |
588 | 581 | ||
589 | 582 | ||
590 | splitJID :: ByteString -> (Maybe ByteString,ByteString,Maybe ByteString) | ||
591 | splitJID bjid = | ||
592 | let xs = L.splitWith (=='@') bjid | ||
593 | ys = L.splitWith (=='/') (last xs) | ||
594 | server = head ys | ||
595 | name | ||
596 | = case xs of | ||
597 | (n:s:_) -> Just n | ||
598 | (s:_) -> Nothing | ||
599 | rsrc = case ys of | ||
600 | (s:_:_) -> Just $ last ys | ||
601 | _ -> Nothing | ||
602 | in (name,server,rsrc) | ||
603 | |||
604 | strip_brackets s = | ||
605 | case L.uncons s of | ||
606 | Just ('[',t) -> L.takeWhile (/=']') t | ||
607 | _ -> s | ||
608 | |||
609 | parseAddressJID :: ByteString -> IO JID | ||
610 | parseAddressJID jid = do | ||
611 | let (name,peer_string,rsc) = splitJID jid | ||
612 | hints = Just $ defaultHints { addrFlags = [ {- AI_NUMERICHOST, -} AI_CANONNAME ] } | ||
613 | peer_string' = unpack . strip_brackets $ peer_string | ||
614 | peer <- do | ||
615 | -- putStrLn $ "getAddrInfo 2 " ++ Prelude.show ( Just (unpack peer_string)) | ||
616 | info <- getAddrInfo hints (Just peer_string') Nothing -- (Just "xmpp-server") | ||
617 | let info0 = head info | ||
618 | return . RemotePeer . addrAddress $ info0 | ||
619 | return $ JID name peer rsc | ||
620 | |||
621 | parseHostNameJID :: ByteString -> IO JID | ||
622 | parseHostNameJID jid = do | ||
623 | let (name,peer_string,rsc) = splitJID jid | ||
624 | hints = Just $ defaultHints { addrFlags = [ AI_CANONNAME ] } | ||
625 | peer <- do | ||
626 | if peer_string=="localhost" | ||
627 | then return LocalHost | ||
628 | else do | ||
629 | -- putStrLn $ "getAddrInfo 3 " ++ Prelude.show ( Just (unpack peer_string)) | ||
630 | info <- getAddrInfo hints (Just (unpack peer_string)) Nothing -- (Just "xmpp-server") | ||
631 | let info0 = head info | ||
632 | cname = addrCanonName info0 | ||
633 | if cname==Just "localhost" | ||
634 | then return LocalHost | ||
635 | else do | ||
636 | self <- getHostName | ||
637 | return $ if Just self==cname | ||
638 | then LocalHost | ||
639 | else RemotePeer (addrAddress info0) | ||
640 | return $ JID name peer rsc | ||
641 | |||
642 | socketFamily (SockAddrInet _ _) = AF_INET | ||
643 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
644 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
645 | |||
646 | connect' :: SockAddr -> Int -> IO (Maybe Socket) | 583 | connect' :: SockAddr -> Int -> IO (Maybe Socket) |
647 | connect' addr port = do | 584 | connect' addr port = do |
648 | proto <- getProtocolNumber "tcp" | 585 | proto <- getProtocolNumber "tcp" |
diff --git a/Presence/XMPPTypes.hs b/Presence/XMPPTypes.hs index e3bbfd16..8af1018c 100644 --- a/Presence/XMPPTypes.hs +++ b/Presence/XMPPTypes.hs | |||
@@ -2,10 +2,28 @@ | |||
2 | {-# LANGUAGE TypeFamilies #-} | 2 | {-# LANGUAGE TypeFamilies #-} |
3 | module XMPPTypes where | 3 | module XMPPTypes where |
4 | 4 | ||
5 | import Network.Socket (Socket,SockAddr(..)) | 5 | import Network.Socket |
6 | ( Socket | ||
7 | , Family(..) | ||
8 | , SockAddr(..) | ||
9 | , getAddrInfo | ||
10 | , addrCanonName | ||
11 | , addrAddress | ||
12 | , defaultHints | ||
13 | , AddrInfo(..) | ||
14 | , AddrInfoFlag(..) | ||
15 | ) | ||
16 | import Network.BSD (getHostName) | ||
6 | import System.IO (Handle) | 17 | import System.IO (Handle) |
7 | import Control.Concurrent.STM (TChan) | 18 | import Control.Concurrent.STM (TChan) |
8 | import Data.ByteString.Lazy.Char8 as L (ByteString,unpack,pack) | 19 | import Data.ByteString.Lazy.Char8 as L |
20 | ( ByteString | ||
21 | , unpack | ||
22 | , pack | ||
23 | , splitWith | ||
24 | , uncons | ||
25 | , takeWhile | ||
26 | ) | ||
9 | import Text.Show.ByteString as L | 27 | import Text.Show.ByteString as L |
10 | import Data.Binary.Builder as B | 28 | import Data.Binary.Builder as B |
11 | import Data.Binary.Put | 29 | import Data.Binary.Put |
@@ -82,4 +100,68 @@ showPeer (RemotePeer addr@(SockAddrInet6 _ _ _ _)) = pack $ stripColon (Prelude. | |||
82 | where | 100 | where |
83 | (pre,bracket) = break (==']') s | 101 | (pre,bracket) = break (==']') s |
84 | 102 | ||
103 | is_remote (RemotePeer _) = True | ||
104 | is_remote _ = False | ||
105 | |||
106 | parseHostNameJID :: ByteString -> IO JID | ||
107 | parseHostNameJID jid = do | ||
108 | let (name,peer_string,rsc) = splitJID jid | ||
109 | hints = Just $ defaultHints { addrFlags = [ AI_CANONNAME ] } | ||
110 | peer <- do | ||
111 | if peer_string=="localhost" | ||
112 | then return LocalHost | ||
113 | else do | ||
114 | -- putStrLn $ "getAddrInfo 3 " ++ Prelude.show ( Just (unpack peer_string)) | ||
115 | info <- getAddrInfo hints (Just (unpack peer_string)) Nothing -- (Just "xmpp-server") | ||
116 | let info0 = head info | ||
117 | cname = addrCanonName info0 | ||
118 | if cname==Just "localhost" | ||
119 | then return LocalHost | ||
120 | else do | ||
121 | self <- getHostName | ||
122 | return $ if Just self==cname | ||
123 | then LocalHost | ||
124 | else RemotePeer (addrAddress info0) | ||
125 | return $ JID name peer rsc | ||
126 | |||
127 | splitJID :: ByteString -> (Maybe ByteString,ByteString,Maybe ByteString) | ||
128 | splitJID bjid = | ||
129 | let xs = L.splitWith (=='@') bjid | ||
130 | ys = L.splitWith (=='/') (last xs) | ||
131 | server = head ys | ||
132 | name | ||
133 | = case xs of | ||
134 | (n:s:_) -> Just n | ||
135 | (s:_) -> Nothing | ||
136 | rsrc = case ys of | ||
137 | (s:_:_) -> Just $ last ys | ||
138 | _ -> Nothing | ||
139 | in (name,server,rsrc) | ||
140 | |||
141 | strip_brackets s = | ||
142 | case L.uncons s of | ||
143 | Just ('[',t) -> L.takeWhile (/=']') t | ||
144 | _ -> s | ||
145 | |||
146 | |||
147 | parseAddressJID :: ByteString -> IO JID | ||
148 | parseAddressJID jid = do | ||
149 | let (name,peer_string,rsc) = splitJID jid | ||
150 | hints = Just $ defaultHints { addrFlags = [ {- AI_NUMERICHOST, -} AI_CANONNAME ] } | ||
151 | peer_string' = unpack . strip_brackets $ peer_string | ||
152 | peer <- do | ||
153 | -- putStrLn $ "getAddrInfo 2 " ++ Prelude.show ( Just (unpack peer_string)) | ||
154 | info <- getAddrInfo hints (Just peer_string') Nothing -- (Just "xmpp-server") | ||
155 | let info0 = head info | ||
156 | return . RemotePeer . addrAddress $ info0 | ||
157 | return $ JID name peer rsc | ||
158 | |||
159 | peerAddr :: Peer -> SockAddr | ||
160 | peerAddr (RemotePeer addr) = addr | ||
161 | -- peerAddr LocalHost = throw exception | ||
162 | |||
163 | socketFamily (SockAddrInet _ _) = AF_INET | ||
164 | socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
165 | socketFamily (SockAddrUnix _) = AF_UNIX | ||
166 | |||
85 | 167 | ||