{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE CPP #-} {-# LANGUAGE ViewPatterns #-} module XMPP ( module XMPPTypes , module SocketLike , listenForXmppClients , listenForRemotePeers , newServerConnections , seekRemotePeers , quitListening , OutBoundMessage(..) , sendMessage ) where import ServerC import XMPPTypes import SocketLike import ByteStringOperators import ControlMaybe import Data.HList import Network.Socket ( Family , connect , socketToHandle , sClose , Socket(..) , socket , SocketType(..) ) import Network.BSD ( PortNumber , getHostName , hostName , hostAliases , getProtocolNumber ) import System.IO ( BufferMode(..) , IOMode(..) , hSetBuffering ) import Control.Concurrent.STM import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as S (pack,putStr,putStrLn,append) import qualified Data.ByteString.Lazy.Char8 as L ( putStrLn , fromChunks , unlines , hPutStrLn ) import Control.Concurrent (forkIO,killThread) import Control.Concurrent.Async import Control.Exception ( handle -- , SomeException(..) , finally , bracketOnError ) import GHC.IO.Exception (IOException(..)) import Control.Monad.IO.Class import Control.Monad.Trans.Class import Control.Monad.Trans.Maybe import Todo import Control.Monad as Monad import Text.XML.Stream.Parse (parseBytes,content) import Text.XML.Stream.Render import Data.XML.Types as XML import Data.Text.Encoding as S (decodeUtf8,encodeUtf8) import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8) import Data.Text.Lazy (toStrict) import GetHostByAddr import Data.Monoid import qualified Data.Sequence as Seq import Data.Foldable (toList) #ifdef RENDERFLUSH import Data.Conduit.Blaze #endif import Data.List (find) import qualified Text.Show.ByteString as L import NestingXML import Data.Set as Set (Set) import qualified Data.Set as Set import qualified Data.Map as Map import Data.Map as Map (Map) import GHC.Conc ( threadStatus , ThreadStatus(..) , ThreadId ) data Commands = Send [XML.Event] | QuitThread deriving Prelude.Show getNamesForPeer :: Peer -> IO [ByteString] getNamesForPeer LocalHost = fmap ((:[]) . S.pack) getHostName getNamesForPeer peer@(RemotePeer addr) = do ent <- getHostByAddr addr -- AF_UNSPEC addr let names = hostName ent : hostAliases ent return . map S.pack $ names xmlifyPresenceForClient :: Presence -> IO [XML.Event] xmlifyPresenceForClient (Presence jid stat) = do let n = name jid rsc = resource jid names <- getNamesForPeer (peer jid) let tostr p = L.decodeUtf8 $ n <$++> "@" L.fromChunks [p] <++?> "/" <++$> rsc jidstrs = fmap (toStrict . tostr) names return (concatMap presenceEvents jidstrs) where presenceEvents jidstr = [ EventBeginElement "{jabber:client}presence" (("from",[ContentText jidstr]):typ stat) , EventBeginElement "{jabber:client}show" [] , EventContent (ContentText . shw $ stat) , EventEndElement "{jabber:client}show" , EventEndElement "{jabber:client}presence" ] typ Offline = [("type",[ContentText "unavailable"])] typ _ = [] shw Available = "chat" shw Away = "away" shw Offline = "away" -- Is this right? prefix ## name = Name name Nothing (Just prefix) streamP name = Name name (Just "http://etherx.jabber.org/streams") (Just "stream") greet host = [ EventBeginDocument , EventBeginElement (streamP "stream") [("from",[ContentText host]) ,("id",[ContentText "someid"]) ,("xmlns",[ContentText "jabber:client"]) ,("xmlns:stream",[ContentText "http://etherx.jabber.org/streams"]) ,("version",[ContentText "1.0"]) ] , EventBeginElement (streamP "features") [] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" {- -- , " " , " " -- , " DIGEST-MD5" , " PLAIN" , " " -} , EventEndElement (streamP "features") ] -- type Consumer i m r = forall o. ConduitM i o m r mawait :: Monad m => MaybeT (ConduitM i o m) i mawait = MaybeT await -- Note: This function ignores name space qualification elementAttrs expected (EventBeginElement name attrs) | nameLocalName name==expected = return attrs elementAttrs _ _ = mzero eventIsBeginElement (EventBeginElement _ _) = True eventIsBeginElement _ = False eventIsEndElement (EventEndElement _) = True eventIsEndElement _ = False filterMapElement:: (Monad m, MonadPlus mp) => (Event -> mp a) -> Event -> mp a -> MaybeT (ConduitM Event o m) (mp a) filterMapElement ret opentag empty = loop (empty `mplus` ret opentag) 1 where loop ts 0 = return ts loop ts cnt = do tag <- mawait let ts' = mplus ts (ret tag) case () of _ | eventIsEndElement tag -> loop ts' (cnt-1) _ | eventIsBeginElement tag -> loop ts' (cnt+1) _ -> loop ts' cnt gatherElement :: (Monad m, MonadPlus mp) => Event -> mp Event -> NestingXML o m (mp Event) gatherElement opentag empty = loop (empty `mplus` return opentag) 1 where loop ts 0 = return ts loop ts cnt = do maybeXML (return ts) $ \tag -> do let ts' = mplus ts (return tag) case () of _ | eventIsEndElement tag -> loop ts' (cnt-1) _ | eventIsBeginElement tag -> loop ts' (cnt+1) _ -> loop ts' cnt {- sourceStanza :: Monad m => Event -> ConduitM Event Event m () sourceStanza opentag = yield opentag >> loop 1 where loop 0 = return () loop cnt = do e <- await let go tag cnt = yield tag >> loop cnt case e of Just tag | eventIsEndElement tag -> go tag (cnt-1) Just tag | eventIsBeginElement tag -> go tag (cnt+1) Just tag -> go tag cnt Nothing -> return () -} voidMaybeT body = (>> return ()) . runMaybeT $ body fixMaybeT f = (>> return ()) . runMaybeT . fix $ f iq_bind_reply id jid = [ EventBeginElement "{jabber:client}iq" [("type",[ContentText "result"]),("id",[ContentText id])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [("xmlns",[ContentText "urn:ietf:params:xml:ns:xmpp-bind"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" [] , EventContent (ContentText jid) , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" , EventEndElement "{jabber:client}iq" ] uncontent cs = head $ map getText cs where getText (ContentText x) = x getText (ContentEntity x ) = x tagAttrs (EventBeginElement _ xs) = xs tagAttrs _ = [] tagName (EventBeginElement n _) = n tagName _ = "" handleIQSetBind session cmdChan stanza_id = do mchild <- nextElement case mchild of Just child -> do let unhandledBind = liftIO $ putStrLn $ "unhandled-bind: "++show child case tagName child of "{urn:ietf:params:xml:ns:xmpp-bind}resource" -> do rsc <- lift content liftIO $ do putStrLn $ "iq-set-bind-resource " ++ show rsc setResource session (L.fromChunks [S.encodeUtf8 rsc]) jid <- getJID session atomically $ writeTChan cmdChan (Send $ iq_bind_reply stanza_id (toStrict $ L.decodeUtf8 $ L.show jid) ) forCachedPresence session $ \presence -> do xs <- xmlifyPresenceForClient presence atomically . writeTChan cmdChan . Send $ xs _ -> unhandledBind Nothing -> do liftIO $ putStrLn $ "empty bind request!" iq_session_reply host stanza_id = [ EventBeginElement "{jabber:client}iq" [("id",[ContentText stanza_id]) ,("from",[ContentText host]) ,("type",[ContentText "result"]) ] , EventEndElement "{jabber:client}iq" ] handleIQSetSession session cmdChan stanza_id = do host <- liftIO $ do jid <- getJID session names <- getNamesForPeer (peer jid) return (S.decodeUtf8 . head $ names) liftIO . atomically . writeTChan cmdChan . Send $ iq_session_reply host stanza_id handleIQSet session cmdChan tag = do withJust (lookupAttrib "id" (tagAttrs tag)) $ \stanza_id -> do whenJust nextElement $ \child -> do let unhandledSet = liftIO $ putStrLn ("iq-set: "++show (stanza_id,child)) case tagName child of "{urn:ietf:params:xml:ns:xmpp-bind}bind" -> handleIQSetBind session cmdChan stanza_id "{urn:ietf:params:xml:ns:xmpp-session}session" -> handleIQSetSession session cmdChan stanza_id _ -> unhandledSet matchAttrib name value attrs = case find ( (==name) . fst) attrs of Just (_,[ContentText x]) | x==value -> True Just (_,[ContentEntity x]) | x==value -> True _ -> False lookupAttrib name attrs = case find ( (==name) . fst) attrs of Just (_,[ContentText x]) -> Just x Just (_,[ContentEntity x]) -> Just x _ -> Nothing iqTypeSet = "set" iqTypeGet = "get" iqTypeResult = "result" iqTypeError = "error" isIQOf (EventBeginElement name attrs) testType | name=="{jabber:client}iq" && matchAttrib "type" testType attrs = True isIQOf _ _ = False iq_service_unavailable host iq_id mjid req = [ EventBeginElement "{jabber:client}iq" [("type",[ContentText "error"]) ,("id",[ContentText iq_id]) -- , TODO: set "from" if isJust mjid ] , EventBeginElement req [] , EventEndElement req , EventBeginElement "{jabber:client}error" [("type",[ContentText "cancel"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" , EventEndElement "{jabber:client}error" , EventEndElement "{jabber:client}iq" ] handleIQGet session cmdChan tag = do withJust (lookupAttrib "id" (tagAttrs tag)) $ \stanza_id -> do whenJust nextElement $ \child -> do host <- liftIO $ do jid <- getJID session names <- getNamesForPeer (peer jid) return (S.decodeUtf8 . head $ names) let unhandledGet req = do liftIO $ putStrLn ("iq-get: "++show (stanza_id,child)) liftIO . atomically . writeTChan cmdChan . Send $ iq_service_unavailable host stanza_id Nothing req case tagName child of -- "{http://jabber.org/protocol/disco#items}query" -> liftIO $ putStrLn "iq-get-query-items" "{urn:xmpp:ping}ping" -> liftIO $ do let mjid = lookupAttrib "from" (tagAttrs tag) let pong = [ EventBeginElement "{jabber:client}iq" $ (case mjid of Just jid -> (("to",[ContentText jid]):) _ -> id) [("type",[ContentText "result"]) ,("id",[ContentText stanza_id]) ,("from",[ContentText host]) ] , EventEndElement "{jabber:client}iq" ] atomically . writeTChan cmdChan . Send $ pong req -> unhandledGet req fromClient :: (MonadThrow m,MonadIO m, JabberClientSession session) => session -> TChan Commands -> Sink XML.Event m () fromClient session cmdChan = doNestingXML $ do let log = liftIO . L.putStrLn . ("(C) " <++>) send = liftIO . atomically . writeTChan cmdChan . Send withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do log "begin-doc" withXML $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do log $ "stream atributes: " <++> bshow stream_attrs host <- liftIO $ do jid <- getJID session names <- getNamesForPeer (peer jid) return (S.decodeUtf8 . head $ names) send $ greet host fix $ \loop -> do log "waiting for stanza." whenJust nextElement $ \stanza -> do stanza_lvl <- nesting let unhandledStanza = do xs <- gatherElement stanza Seq.empty prettyPrint "unhandled-C: " (toList xs) case () of _ | stanza `isIQOf` iqTypeSet -> handleIQSet session cmdChan stanza _ | stanza `isIQOf` iqTypeGet -> handleIQGet session cmdChan stanza _ | otherwise -> unhandledStanza awaitCloser stanza_lvl loop log $ "end of stream" withXML $ \xml -> do log $ "end-of-document: " <++> bshow xml prettyPrint prefix xs = liftIO $ do CL.sourceList xs $= renderBytes (def { rsPretty=True }) =$= CB.lines $$ CL.mapM_ (S.putStrLn . (prefix `S.append`)) toClient :: MonadIO m => TChan Presence -> TChan Commands -> Source m [XML.Event] toClient pchan cmdChan = fix $ \loop -> do let send xs = yield xs >> prettyPrint ">C: " xs event <- liftIO . atomically $ orElse (fmap Left $ readTChan pchan) (fmap Right $ readTChan cmdChan) case event of Right QuitThread -> return () Right (Send xs) -> send xs >> loop Left presence -> do xs <- liftIO $ xmlifyPresenceForClient presence send xs loop handleClient :: (SocketLike sock, HHead l (XMPPClass session), JabberClientSession session) => HCons sock (HCons t l) -> Source IO ByteString -> Sink ByteString IO () -> IO () handleClient st src snk = do let HCons sock (HCons _ st') = st session_factory = hHead st' pname <- getPeerName sock session <- newSession session_factory sock Prelude.putStrLn $ "PEER NAME: "++Prelude.show pname pchan <- subscribe session Nothing cmdChan <- atomically newTChan #ifdef RENDERFLUSH writer <- async ( toClient pchan cmdChan $$ flushList =$= renderBuilderFlush def =$= builderToByteStringFlush =$= discardFlush =$ snk ) #else writer <- async ( toClient pchan cmdChan $$ renderChunks =$ snk ) #endif finally ( src $= parseBytes def $$ fromClient session cmdChan ) $ do atomically $ writeTChan cmdChan QuitThread wait writer closeSession session listenForXmppClients :: (HList l, HHead l (XMPPClass session), HExtend e1 l2 l1, HExtend e l1 (HCons PortNumber l), JabberClientSession session) => Family -> e1 -> e -> l2 -> IO ServerHandle listenForXmppClients addr_family session_factory port st = do doServer (addr_family .*. port .*. session_factory .*. st) handleClient #ifdef RENDERFLUSH flushList :: Monad m => ConduitM [a] (Flush a) m () flushList = fixMaybeT $ \loop -> do xs <- mawait lift ( CL.sourceList xs $$ CL.mapM_ (yield . Chunk) ) lift ( yield Flush ) loop discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = fixMaybeT $ \loop -> do x <- mawait let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False lift . when (ischunk x) $ yield (unchunk x) loop #else renderChunks :: (MonadUnsafeIO m, MonadIO m) => ConduitM [Event] ByteString m () renderChunks = fixMaybeT $ \loop -> do xs <- mawait lift . when (not . null $ xs) $ ( CL.sourceList xs $= renderBytes def $$ CL.mapM_ yield ) loop #endif listenForRemotePeers :: (HList l, HHead l (XMPPPeerClass session), HExtend e l1 (HCons PortNumber l), HExtend e1 l2 l1, JabberPeerSession session) => Family -> e1 -> e -> l2 -> IO ServerHandle listenForRemotePeers addrfamily session_factory port st = do doServer (addrfamily .*. port .*. session_factory .*. st) handlePeer handlePeer :: (SocketLike sock, HHead l (XMPPPeerClass session), JabberPeerSession session) => HCons sock (HCons t1 l) -> Source IO ByteString -> t -> IO () handlePeer st src snk = do let HCons sock (HCons _ st') = st session_factory = hHead st' name <- fmap bshow $ getPeerName sock L.putStrLn $ "(P) connected " <++> name session <- newPeerSession session_factory sock finally ( src $= parseBytes def $$ fromPeer session ) $ do L.putStrLn $ "(P) disconnected " <++> name closePeerSession session handlePeerPresence session stanza False = do -- Offline withJust (lookupAttrib "from" (tagAttrs stanza)) $ \jid -> do peer_jid <- liftIO $ parseAddressJID (L.fromChunks [S.encodeUtf8 jid]) liftIO $ announcePresence session (Presence peer_jid Offline) handlePeerPresence session stanza True = do -- online (Available or Away) let log = liftIO . L.putStrLn . ("(P) " <++>) withJust (lookupAttrib "from" (tagAttrs stanza)) $ \jid -> do pjid <- liftIO $ parseAddressJID (L.fromChunks [S.encodeUtf8 jid]) -- stat <- show element content let parseChildren stat = do child <- nextElement case child of Just tag | tagName tag=="{jabber:server}show" -> fmap toStat (lift content) Just tag | otherwise -> parseChildren stat Nothing -> return stat toStat "away" = Away toStat "xa" = Away -- TODO: xa toStat "dnd" = Away -- TODO: dnd toStat "chat" = Available stat' <- parseChildren Available liftIO $ announcePresence session (Presence pjid stat') log $ bshow (Presence pjid stat') matchAttribMaybe name (Just value) attrs = case find ( (==name) . fst) attrs of Just (_,[ContentText x]) | x==value -> True Just (_,[ContentEntity x]) | x==value -> True _ -> False matchAttribMaybe name Nothing attrs | find ( (==name) . fst) attrs==Nothing = True matchAttribMaybe name Nothing attrs | otherwise = False presenceTypeOffline = Just "unavailable" presenceTypeOnline = Nothing presenceTypeProbe = Just "probe" isPresenceOf (EventBeginElement name attrs) testType | name=="{jabber:server}presence" && matchAttribMaybe "type" testType attrs = True isPresenceOf _ _ = False handlePresenceProbe session stanza = do withJust (lookupAttrib "to" (tagAttrs stanza)) $ \to -> do -- withJust (lookupAttrib "from" (tagAttrs stanza)) $ \from -> do jid <- liftIO $ parseAddressJID $ L.fromChunks [S.encodeUtf8 to] withJust (name jid) $ \user -> do liftIO $ L.putStrLn $ "RECEIVED PROBE "<++>bshow (peerAddress session,to) liftIO $ do subs <- getSubscribers (peerSessionFactory session) user liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs forM_ subs $ \jidstr -> do handle (\(IOError _ _ _ _ _ _) -> return ()) $ do -- handle (\(SomeException _) -> return ()) $ do L.putStrLn $ "parsing " <++>jidstr sub <- parseHostNameJID jidstr putStrLn $ "comparing " ++show (peer sub , peerAddress session) when (peer sub == peerAddress session) $ do ps <- userStatus session user mapM_ (announcePresence session) ps return () fromPeer :: (MonadThrow m,MonadIO m, JabberPeerSession session) => session -> Sink XML.Event m () fromPeer session = doNestingXML $ do let log = liftIO . L.putStrLn . ("(P) " <++>) withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do log "begin-doc" withXML $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do log $ "stream atributes: " <++> bshow stream_attrs fix $ \loop -> do log "waiting for stanza." whenJust nextElement $ \stanza -> do stanza_lvl <- nesting let unhandledStanza = do xs <- gatherElement stanza Seq.empty prettyPrint "P: " (toList xs) case () of _ | stanza `isPresenceOf` presenceTypeOnline -> handlePeerPresence session stanza True _ | stanza `isPresenceOf` presenceTypeOffline -> handlePeerPresence session stanza False _ | stanza `isPresenceOf` presenceTypeProbe -> handlePresenceProbe session stanza _ -> unhandledStanza awaitCloser stanza_lvl loop log $ "end of stream" withXML $ \xml -> do log $ "end-of-document: " <++> bshow xml data OutBoundMessage = OutBoundPresence Presence | PresenceProbe JID JID deriving Prelude.Show newServerConnections = newTVar Map.empty data CachedMessages = CachedMessages { presences :: Map JID JabberShow , probes :: Map JID (Set JID) } connect_to_server chan peer = (>> return ()) . runMaybeT $ do let port = 5269 :: Int -- We'll cache Presence notifications until the socket -- is ready. cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty) let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do cache <- readIORef cached writeIORef cached (cache { presences=Map.delete jid . presences $ cache }) cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do cache <- readIORef cached writeIORef cached (cache { presences=Map.insert jid st . presences $ cache }) cacheCmd (PresenceProbe from to) cached = do cache <- readIORef cached let probes' = Map.adjust (Set.insert from) to $ probes cache writeIORef cached (cache { probes=probes' }) fix $ \sendmsgs -> do connected <- liftIO . async $ connect' (peerAddr peer) port sock <- MaybeT . fix $ \loop -> do e <- atomically $ orElse (fmap Right $ waitSTM connected) (fmap Left $ readTChan chan) case e of Left cmd -> cacheCmd cmd cached >> loop Right sock -> return sock retry <- do (cache,snk) <- liftIO $ do h <- socketToHandle sock ReadWriteMode hSetBuffering h NoBuffering cache <- readIORef $ cached -- hint garbage collector: we're done with this... writeIORef cached (CachedMessages Map.empty Map.empty) return (cache,packetSink h) MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk liftIO $ cacheCmd retry cached liftIO $ putStrLn $ "retrying " ++ show retry sendmsgs greetPeer = [ EventBeginDocument , EventBeginElement (streamP "stream") [("xmlns",[ContentText "jabber:server"]) ,("version",[ContentText "1.0"]) ] ] goodbyePeer = [ EventEndElement (streamP "stream") , EventEndDocument ] presenceProbe sock fromjid tojid = do addr <- getSocketName sock let jidstr jid = toStrict . L.decodeUtf8 $ name jid <$++> "@" showPeer (RemotePeer addr) <++?> "/" <++$> resource jid from = jidstr fromjid to = toStrict . L.decodeUtf8 $ name tojid <$++> "@" showPeer (peer tojid) return [ EventBeginElement "{jabber:server}presence" [("from",[ContentText from]) ,("to",[ContentText to]) ,("type",[ContentText "probe"]) ] , EventEndElement "{jabber:server}presence" ] {- toPeerChain :: SocketLike sock => sock -> CachedMessages -> TChan OutBoundMessage -> Sink ByteString IO b -> IO b toPeerChain sock cache chan snk = toPeer sock cache chan $$ renderChunks =$ snk -} toPeer :: SocketLike sock => sock -> CachedMessages -> TChan OutBoundMessage -> (Maybe OutBoundMessage -> IO ()) -> ConduitM i [Event] IO () toPeer sock cache chan fail = do let -- log = liftIO . L.putStrLn . ("(>P) " <++>) send xs = yield xs >> prettyPrint ">P: " xs -- >> return (3::Int) checkConnection cmd = do liftIO $ catch (getPeerName sock >> return ()) (\_ -> fail . Just $ cmd) sendPresence presence = do r <- lift $ xmlifyPresenceForPeer sock presence {- liftIO $ do p' <- catch (fmap (Just . RemotePeer) $ getPeerName sock) (\_ -> (fail . Just . OutBoundPresence $ presence) >> return Nothing) L.putStrLn $ "sending Presence to " <++?> fmap showPeer p' -} let cmd = OutBoundPresence presence checkConnection cmd yieldOr r (fail . Just $ cmd) prettyPrint ">P: " r sendProbe from to = do r <- liftIO $ presenceProbe sock from to let cmd = PresenceProbe from to checkConnection cmd yieldOr r (fail . Just $ cmd) prettyPrint ">P: " r send greetPeer forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do sendPresence (Presence jid st) forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do forM_ (Set.toList froms) $ \from -> do liftIO $ L.putStrLn "sending cached probe..." sendProbe from to fix $ \loop -> do event <- lift . atomically $ readTChan chan case event of OutBoundPresence p -> sendPresence p PresenceProbe from to -> do liftIO $ L.putStrLn "sending live probe..." sendProbe from to loop send goodbyePeer handleOutgoingToPeer :: SocketLike sock => sock -> CachedMessages -> TChan OutBoundMessage -> Sink ByteString IO () -> IO (Maybe OutBoundMessage) handleOutgoingToPeer sock cache chan snk = do p <- getPeerName sock L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p) failed <- newIORef Nothing let failure cmd = do writeIORef failed cmd putStrLn $ "Failed: " ++ show cmd finally ( #ifdef RENDERFLUSH handle (\(IOError _ _ _ _ _ _) -> return ()) $ toPeer sock cache chan failure $$ flushList =$= renderBuilderFlush def =$= builderToByteStringFlush =$= discardFlush =$ snk #else handle (\(IOError _ _ _ _ _ _) -> return ()) $ toPeer sock cache chan failure $$ renderChunks =$ snk #endif ) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p) readIORef failed connect' :: SockAddr -> Int -> IO (Maybe Socket) connect' addr port = do proto <- getProtocolNumber "tcp" {- -- Given (host :: HostName) ... let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] , addrProtocol = proto , addrSocketType = Stream } addrs <- getAddrInfo (Just hints) (Just host) (Just serv) firstSuccessful $ map tryToConnect addrs -} let getport (SockAddrInet port _) = port getport (SockAddrInet6 port _ _ _) = port let doException e@(IOError _ _ _ _ _ _) = do -- let doException (SomeException e) = do L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e return Nothing handle doException $ tryToConnect proto (addr `withPort` port) where tryToConnect proto addr = bracketOnError (socket (socketFamily addr) Stream proto) (sClose ) -- only done if there's an error (\sock -> do connect sock addr return (Just sock) -- socketToHandle sock ReadWriteMode ) sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO () sendMessage cons msg peer = do found <- atomically $ do consmap <- readTVar cons return (Map.lookup peer consmap) let newEntry = do chan <- atomically newTChan t <- forkIO $ connect_to_server chan peer -- L.putStrLn $ "remote-map new: " <++> showPeer peer return (True,(chan,t)) (is_new,entry) <- maybe newEntry ( \(chan,t) -> do st <- threadStatus t let running = do -- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer return (False,(chan,t)) died = do -- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer newEntry case st of ThreadRunning -> running ThreadBlocked _ -> running ThreadDied -> died ThreadFinished -> died ) found -- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg atomically $ writeTChan (fst entry) msg when is_new . atomically $ readTVar cons >>= writeTVar cons . Map.insert peer entry seekRemotePeers :: JabberPeerSession config => XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0 seekRemotePeers config chan server_connections = do fix $ \loop -> do event <- atomically $ readTChan chan case event of p@(Presence jid stat) | not (is_remote (peer jid)) -> do -- L.putStrLn $ "seekRemotePeers: " <++> L.show jid <++> " " <++> bshow stat runMaybeT $ do u <- MaybeT . return $ name jid subscribers <- liftIO $ do subs <- getSubscribers config u mapM parseHostNameJID subs -- liftIO . L.putStrLn $ "subscribers: " <++> bshow subscribers let peers = Set.map peer (Set.fromList subscribers) forM_ (Set.toList peers) $ \peer -> do when (is_remote peer) $ liftIO $ sendMessage server_connections (OutBoundPresence p) peer -- TODO: send presence probes for buddies -- TODO: cache remote presences for clients _ -> return (Just ()) loop xmlifyPresenceForPeer sock (Presence jid stat) = do -- TODO: accept socket argument and determine local ip address -- connected to this peer. addr <- getSocketName sock let n = name jid rsc = resource jid jidstr = toStrict . L.decodeUtf8 $ n <$++> "@" showPeer (RemotePeer addr) <++?> "/" <++$> rsc return [ EventBeginElement "{jabber:server}presence" (("from",[ContentText jidstr]):typ stat) , EventBeginElement "{jabber:server}show" [] , EventContent (ContentText . shw $ stat) , EventEndElement "{jabber:server}show" , EventEndElement "{jabber:server}presence" ] where typ Offline = [("type",[ContentText "unavailable"])] typ _ = [] shw Available = "chat" shw Away = "away" shw Offline = "away" -- Is this right?