{-# LANGUAGE OverloadedStrings #-} module XMPPServer ( xmppServer , ConnectionKey(..) , XMPPServerParameters(..) , Stanza(..) , StanzaType(..) , StanzaOrigin(..) , dupStanza ) where import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Int (Int8) import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe (catMaybes,fromJust,isNothing) import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack,unpack) import Data.Char (toUpper) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import Nesting import EventUtil import Server peerport = 5269 clientport = 5222 my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574" data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) data JabberShow = Offline | ExtendedAway | Away | DoNotDisturb | Available | Chatty deriving (Show,Enum,Ord,Eq,Read) data MessageThread = MessageThread { msgThreadParent :: Maybe Text, msgThreadContent :: Text } deriving (Show,Eq) data LangSpecificMessage = LangSpecificMessage { msgBody :: Maybe Text , msgSubject :: Maybe Text } deriving (Show,Eq) data StanzaType = Unrecognized | Ping | Pong | RequestResource (Maybe Text) | SetResource | SessionRequest | UnrecognizedQuery Name | Error | PresenceStatus { presenceShow :: JabberShow , presencePriority :: Maybe Int8 , presenceStatus :: [(Lang,Text)] } | PresenceInformError | PresenceInformSubscription Bool | PresenceRequestStatus | PresenceRequestSubscription Bool | Message { msgThread :: Maybe MessageThread , msgLangMap :: [(Lang,LangSpecificMessage)] } deriving Show data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) data Stanza = Stanza { stanzaType :: StanzaType , stanzaId :: Maybe Text , stanzaTo :: Maybe Text , stanzaFrom :: Maybe Text , stanzaChan :: TChan XML.Event , stanzaClosers :: TVar (Maybe [XML.Event]) , stanzaInterrupt :: TMVar () , stanzaOrigin :: StanzaOrigin } data XMPPServerParameters = XMPPServerParameters { xmppChooseResourceName :: ConnectionKey -> Socket -> Maybe Text -> IO Text , xmppNewConnection :: ConnectionKey -> TChan Stanza -> IO () , xmppEOF :: ConnectionKey -> IO () } -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error -- client connection -- socat script to send stanza fragment -- copyToChannel can keep a stack of closers to append to finish-off a stanza -- the TMVar () from forkConnection can be passed and with a stanza to detect interruption addrToText :: SockAddr -> Text addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) where stripColon s = pre where (pre,port) = break (==':') s addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) where stripColon s = if null bracket then pre else pre ++ "]" where (pre,bracket) = break (==']') s wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink (Flush XML.Event) IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = -- XML.renderBytes XML.def =$ snk XML.renderBuilderFlush XML.def =$= builderToByteStringFlush =$= discardFlush =$ snk where discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = awaitForever $ \x -> do let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False when (ischunk x) $ yield (unchunk x) src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool dupStanza stanza = do dupped <- dupTChan (stanzaChan stanza) return stanza { stanzaChan = dupped } copyToChannel f chan closer_stack = awaitForever copy where copy x = do liftIO . atomically $ writeTChan chan (f x) case x of EventBeginDocument {} -> do let clsr = closerFor x liftIO . atomically $ modifyTVar' closer_stack (fmap (clsr:)) EventEndDocument {} -> do liftIO . atomically $ modifyTVar' closer_stack (fmap (drop 1)) _ -> return () yield x prettyPrint prefix = XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines =$ CL.mapM_ (wlogb . (prefix <>)) sendReply donevar stype reply replychan = do if null reply then return () else do let stanzaTag = head reply mid = lookupAttrib "id" (tagAttrs stanzaTag) mfrom = lookupAttrib "from" (tagAttrs stanzaTag) mto = lookupAttrib "to" (tagAttrs stanzaTag) replyStanza <- liftIO . atomically $ do replyChan <- newTChan replyClsrs <- newTVar (Just []) return Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto -- todo: should this be reversed? , stanzaFrom = mfrom -- todo: should this be reversed? , stanzaChan = replyChan , stanzaClosers = replyClsrs , stanzaInterrupt = donevar , stanzaOrigin = LocalPeer } ioWriteChan replychan replyStanza void . liftIO . forkIO $ do mapM_ (ioWriteChan $ stanzaChan replyStanza) reply liftIO . atomically $ writeTVar (stanzaClosers replyStanza) Nothing -- liftIO $ wlog "finished reply stanza" grokStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType) grokStanzaIQGet stanza = do mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:xmpp:ping}ping" -> return $ Just Ping name -> return . Just $ UnrecognizedQuery name grokStanzaIQResult :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType) grokStanzaIQResult stanza = do mtag <- nextElement flip (maybe $ return (Just Pong)) mtag $ \tag -> do case tagName tag of _ -> return Nothing grokStanzaIQSet :: XML.Event -> NestingXML o IO (Maybe StanzaType) grokStanzaIQSet stanza = do mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:ietf:params:xml:ns:xmpp-bind}bind" -> do mchild <- nextElement case fmap tagName mchild of Just "{urn:ietf:params:xml:ns:xmpp-bind}resource" -> do rsc <- XML.content -- TODO: MonadThrow??? return . Just $ RequestResource (Just rsc) Just _ -> return Nothing Nothing -> return . Just $ RequestResource Nothing "{urn:ietf:params:xml:ns:xmpp-session}session" -> do return $ Just SessionRequest _ -> return Nothing {- C->Unrecognized Unrecognized type="set" C->Unrecognized id="purpleae62d88f" C->Unrecognized xmlns="jabber:client"> C->Unrecognized C->Unrecognized -} ioWriteChan c v = liftIO . atomically $ writeTChan c v parsePresenceStatus ns = do let toStat "away" = Away toStat "xa" = ExtendedAway toStat "dnd" = DoNotDisturb toStat "chat" = Chatty showv <- liftIO . atomically $ newTVar Available priov <- liftIO . atomically $ newTVar Nothing statusv <- liftIO . atomically $ newTChan fix $ \loop -> do mtag <- nextElement flip (maybe $ return ()) mtag $ \tag -> do when (nameNamespace (tagName tag) == Just ns) $ do case nameLocalName (tagName tag) of "show" -> do t <- XML.content liftIO . atomically $ writeTVar showv (toStat t) "priority" -> do t <- XML.content liftIO . handleIO_ (return ()) $ do prio <- readIO (Text.unpack t) atomically $ writeTVar priov (Just prio) "status" -> do t <- XML.content lang <- xmlLang ioWriteChan statusv (maybe "" id lang,t) _ -> return () loop show <- liftIO . atomically $ readTVar showv prio <- liftIO . atomically $ readTVar priov status <- liftIO $ chanContents statusv -- Could use unsafeInterleaveIO to -- avoid multiple passes, but whatever. return . Just $ PresenceStatus { presenceShow = show , presencePriority = prio , presenceStatus = status } grokPresence ns stanzaTag = do let typ = lookupAttrib "type" (tagAttrs stanzaTag) case typ of Nothing -> parsePresenceStatus ns Just "unavailable" -> fmap (fmap (\p -> p {presenceShow=Offline})) $ parsePresenceStatus ns Just "error" -> return . Just $ PresenceInformError Just "unsubscribed" -> return . Just $ PresenceInformSubscription False Just "subscribed" -> return . Just $ PresenceInformSubscription True Just "probe" -> return . Just $ PresenceRequestStatus Just "unsubscribe" -> return . Just $ PresenceRequestSubscription False Just "subscribe" -> return . Just $ PresenceRequestSubscription True _ -> return Nothing -- todo grokStanza "jabber:server" stanzaTag = case () of _ | stanzaTag `isServerIQOf` "get" -> grokStanzaIQGet stanzaTag _ | stanzaTag `isServerIQOf` "result" -> grokStanzaIQResult stanzaTag _ | tagName stanzaTag == "{jabber:server}presence" -> grokPresence "jabber:server" stanzaTag _ -> return $ Just Unrecognized grokStanza "jabber:client" stanzaTag = case () of _ | stanzaTag `isClientIQOf` "get" -> grokStanzaIQGet stanzaTag _ | stanzaTag `isClientIQOf` "set" -> grokStanzaIQSet stanzaTag _ | stanzaTag `isClientIQOf` "result" -> grokStanzaIQResult stanzaTag _ | tagName stanzaTag == "{jabber:client}presence" -> grokPresence "jabber:client" stanzaTag _ -> return $ Just Unrecognized xmppInbound :: Server ConnectionKey -> ConnectionKey -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> TChan Stanza -> TMVar () -> Sink XML.Event IO () xmppInbound sv k pingflag src stanzas output donevar = doNestingXML $ do let namespace = case k of ClientKey {} -> "jabber:client" PeerKey {} -> "jabber:server" withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do whenJust nextElement $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." (chan,clsrs) <- liftIO . atomically $ liftM2 (,) newTChan (newTVar (Just [])) whenJust nextElement $ \stanzaTag -> do stanza_lvl <- nesting liftIO . atomically $ do writeTChan chan stanzaTag modifyTVar' clsrs (fmap (closerFor stanzaTag:)) copyToChannel id chan clsrs =$= do let mid = lookupAttrib "id" (tagAttrs stanzaTag) mfrom = lookupAttrib "from" (tagAttrs stanzaTag) mto = lookupAttrib "to" (tagAttrs stanzaTag) dispatch <- grokStanza namespace stanzaTag let unrecog = do let stype = Unrecognized s <- liftIO . atomically $ do return Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } ioWriteChan stanzas s flip (maybe $ unrecog) dispatch $ \dispatch -> case dispatch of Ping -> do -- TODO: check that the to-address matches this server. -- Otherwise it could be a client-to-client ping or a -- client-to-server for some other server. -- For now, assuming its for the immediate connection. let pongto = maybe "todo" id mfrom pongfrom = maybe "todo" id mto pong = makePong namespace mid pongto pongfrom sendReply donevar Pong pong output -- TODO: Remove this, it is only to generate a debug print ioWriteChan stanzas Stanza { stanzaType = Ping , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } stype -> ioWriteChan stanzas Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } awaitCloser stanza_lvl liftIO . atomically $ writeTVar clsrs Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch maybe (return []) (\x -> do xs <- readUntilNothing ch return (x:xs)) x streamFeatures "jabber:client" = [ EventBeginElement (streamP "features") [] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" {- -- , " " , " " -- , " DIGEST-MD5" , " PLAIN" , " " -} , EventEndElement (streamP "features") ] streamFeatures "jabber:server" = [] greet' namespace host = [ EventBeginDocument , EventBeginElement (streamP "stream") [("from",[ContentText host]) ,("id",[ContentText "someid"]) ,("xmlns",[ContentText namespace]) ,("xmlns:stream",[ContentText "http://etherx.jabber.org/streams"]) ,("version",[ContentText "1.0"]) ] ] ++ streamFeatures namespace consid Nothing = id consid (Just sid) = (("id",[ContentText sid]):) data XMPPState = PingSlot deriving (Eq,Ord) mkname namespace name = (Name name (Just namespace) Nothing) makePing :: Text -> Maybe Text -> Text -> Text -> [XML.Event] makePing namespace mid to from = [ EventBeginElement (mkname namespace "iq") $ (case mid of Just c -> (("id",[ContentText c]):) _ -> id ) [ ("type",[ContentText "get"]) , attr "to" to , attr "from" from ] , EventBeginElement "{urn:xmpp:ping}ping" [] , EventEndElement "{urn:xmpp:ping}ping" , EventEndElement $ mkname namespace "iq"] makePong namespace mid to from = -- Note: similar to session reply [ EventBeginElement (mkname namespace "iq") $(case mid of Just c -> (("id",[ContentText c]):) _ -> id) [ attr "type" "result" , attr "to" to , attr "from" from ] , EventEndElement (mkname namespace "iq") ] iq_bind_reply :: Maybe Text -> Text -> [XML.Event] iq_bind_reply mid jid = [ EventBeginElement "{jabber:client}iq" (consid mid [("type",[ContentText "result"])]) , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [("xmlns",[ContentText "urn:ietf:params:xml:ns:xmpp-bind"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" [] , EventContent (ContentText jid) , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" , EventEndElement "{jabber:client}iq" ] iq_session_reply mid host = -- Note: similar to Pong [ EventBeginElement "{jabber:client}iq" (consid mid [("from",[ContentText host]) ,("type",[ContentText "result"]) ]) , EventEndElement "{jabber:client}iq" ] iq_service_unavailable mid host {- mjid -} req = [ EventBeginElement "{jabber:client}iq" (consid mid [("type",[ContentText "error"]) -- , TODO: set "from" if isJust mjid ]) , EventBeginElement req [] , EventEndElement req , EventBeginElement "{jabber:client}error" [("type",[ContentText "cancel"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" , EventEndElement "{jabber:client}error" , EventEndElement "{jabber:client}iq" ] {- greet namespace = [ EventBeginDocument , EventBeginElement (streamP "stream") [ attr "xmlns" namespace , attr "version" "1.0" ] ] -} goodbye = [ EventEndElement (streamP "stream") , EventEndDocument ] forkConnection :: Server ConnectionKey -> ConnectionKey -> FlagCommand -> Source IO XML.Event -> Sink (Flush XML.Event) IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection sv k pingflag src snk stanzas = do let namespace = case k of ClientKey {} -> "jabber:client" PeerKey {} -> "jabber:server" rdone <- atomically newEmptyTMVar slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement needsFlush <- atomically $ newTVar False let _ = slots :: Slotted.UpdateStream XMPPState XML.Event let greet_src = do CL.sourceList (greet' namespace "localhost") =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do writeTVar needsFlush True return $ do -- liftIO $ wlog $ "yielding Chunk: " ++ show x yield (Chunk x) slot_src ,do Slotted.isEmpty slots >>= check readTVar needsFlush >>= check writeTVar needsFlush False return $ do -- liftIO $ wlog "yielding Flush" yield Flush slot_src ,readTMVar rdone >> return (return ()) ] what forkIO $ do (greet_src >> slot_src) $$ snk wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do dup <- atomically $ dupStanza stanza stanzaToConduit dup $$ prettyPrint $ case k of ClientKey {} -> "C<-" <> bshow (stanzaType dup) <> " " PeerKey {} -> "P<-" <> bshow (stanzaType dup) <> " " stanzaToConduit stanza $$ awaitForever $ liftIO . atomically . Slotted.push slots Nothing loop ,do pingflag >>= check return $ do let to = addrToText (callBackAddress k) from = "todo" -- Look it up from Server object -- or pass it with Connection event. mid = Just "ping" ping = makePing namespace mid to from mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) ping wlog "" CL.sourceList ping $$ prettyPrint $ case k of ClientKey {} -> "C<-Ping" PeerKey {} -> "P<-Ping " loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound sv k pingflag src stanzas output rdone atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k return output {- data Peer = Peer { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis , peerState :: TVar PeerState } data PeerState = PeerPendingConnect UTCTime | PeerPendingAccept UTCTime | PeerConnected (TChan Stanza) -} peerKey (sock,addr) = do peer <- sIsConnected sock >>= \c -> if c then getPeerName sock -- addr is normally socketName else return addr -- Weird hack: addr is would-be peer name return $ PeerKey (peer `withPort` fromIntegral peerport) clientKey (sock,addr) = return $ ClientKey addr stanzaToConduit :: MonadIO m => Stanza -> ConduitM i Event m () stanzaToConduit stanza = do let xchan = stanzaChan stanza xfin = stanzaClosers stanza rdone = stanzaInterrupt stanza loop = return () fix $ \inner -> do what <- liftIO . atomically $ foldr1 orElse [readTChan xchan >>= \xml -> return $ do yield xml -- atomically $ Slotted.push slots Nothing xml inner ,do mb <- readTVar xfin cempty <- isEmptyTChan xchan if isNothing mb then if cempty then return loop else retry else retry -- todo: send closers ,do isEmptyTChan xchan >>= check readTMVar rdone return (return ())] what socketFromKey :: Server k -> k -> IO Socket socketFromKey sv k = do return todo monitor sv params xmpp = do chan <- return $ serverEvent sv stanzas <- atomically newTChan quitVar <- atomically newEmptyTMVar fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \(k,e) -> return $ do case e of Connection pingflag conread conwrite -> do wlog $ tomsg k "Connection" let (xsrc,xsnk) = xmlStream conread conwrite outs <- forkConnection sv k pingflag xsrc xsnk stanzas xmppNewConnection xmpp k outs return () ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" EOF -> do wlog $ tomsg k "EOF" xmppEOF xmpp k HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do forkIO $ do case stanzaOrigin stanza of NetworkOrigin k@(ClientKey {}) replyto -> case stanzaType stanza of RequestResource wanted -> do sock <- socketFromKey sv k rsc <- xmppChooseResourceName xmpp k sock wanted let reply = iq_bind_reply (stanzaId stanza) rsc sendReply quitVar SetResource reply replyto SessionRequest -> do let reply = iq_session_reply (stanzaId stanza) "localhost" sendReply quitVar Pong reply replyto UnrecognizedQuery query -> do let reply = iq_service_unavailable (stanzaId stanza) "localhost" query sendReply quitVar Error reply replyto _ -> return () _ -> return () let typ = Strict8.pack $ c ++ "->"++(concat . take 1 . words $ show (stanzaType stanza))++" " c = case stanzaOrigin stanza of LocalPeer -> "*" NetworkOrigin (ClientKey {}) _ -> "C" NetworkOrigin (PeerKey {}) _ -> "P" wlog "" stanzaToConduit stanza $$ prettyPrint typ ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String xmppServer :: ( MonadResource m , MonadIO m ) => XMPPServerParameters -> m (Server ConnectionKey,ConnectionParameters ConnectionKey) xmppServer xmpp = do sv <- server let peer_params = (connectionDefaults peerKey) { pingInterval = 15000 , timeout = 2000 , duplex = False } client_params = (connectionDefaults clientKey) { pingInterval = 0 , timeout = 0 } liftIO $ do forkIO $ monitor sv peer_params xmpp control sv (Listen peerport peer_params) control sv (Listen clientport client_params) return (sv,peer_params)