{-# LANGUAGE OverloadedStrings #-} module XMPPServer ( xmppServer ) where import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe (catMaybes,fromJust,isNothing) import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack) import Data.Char (toUpper) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import Nesting import EventUtil import Server peerport = 5269 clientport = 5222 my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574" -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error -- client connection -- socat script to send stanza fragment -- copyToChannel can keep a stack of closers to append to finish-off a stanza -- the TMVar () from forkConnection can be passed and with a stanza to detect interruption addrToText :: SockAddr -> Text addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) where stripColon s = pre where (pre,port) = break (==':') s addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) where stripColon s = if null bracket then pre else pre ++ "]" where (pre,bracket) = break (==']') s wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink (Flush XML.Event) IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = -- XML.renderBytes XML.def =$ snk XML.renderBuilderFlush XML.def =$= builderToByteStringFlush =$= discardFlush =$ snk where discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = awaitForever $ \x -> do let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False when (ischunk x) $ yield (unchunk x) src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool {- data Stanza = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | PingStanza { stanzaId :: Maybe Text , stanzaChan :: TChan (Maybe XML.Event) } | PongStanza { -- stanzaId :: Maybe Text stanzaChan :: TChan (Maybe XML.Event) } -} data StanzaType = Unrecognized | Ping | Pong deriving Show data Stanza = Stanza { stanzaType :: StanzaType , stanzaId :: Maybe Text , stanzaTo :: Maybe Text , stanzaFrom :: Maybe Text , stanzaChan :: TChan XML.Event , stanzaClosers :: TVar (Maybe [XML.Event]) , stanzaInterrupt :: TMVar () } copyToChannel f chan closer_stack = awaitForever copy where copy x = do liftIO . atomically $ writeTChan chan (f x) case x of EventBeginDocument {} -> do let clsr = closerFor x liftIO . atomically $ modifyTVar' closer_stack (fmap (clsr:)) EventEndDocument {} -> do liftIO . atomically $ modifyTVar' closer_stack (fmap (drop 1)) _ -> return () yield x prettyPrint prefix xs = liftIO $ CL.sourceList xs $= XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines $$ CL.mapM_ (wlogb . (prefix <>)) grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType) grockStanzaIQGet stanza = do mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:xmpp:ping}ping" -> return $ Just Ping _ -> return Nothing ioWriteChan c v = liftIO . atomically $ writeTChan c v xmppInbound :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> TChan Stanza -> TMVar () -> Sink XML.Event IO () xmppInbound k pingflag src stanzas output donevar = doNestingXML $ do withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do whenJust nextElement $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." (chan,clsrs) <- liftIO . atomically $ liftM2 (,) newTChan (newTVar (Just [])) whenJust nextElement $ \stanzaTag -> do stanza_lvl <- nesting liftIO . atomically $ do writeTChan chan stanzaTag modifyTVar' clsrs (fmap (closerFor stanzaTag:)) copyToChannel id chan clsrs =$= do let mid = lookupAttrib "id" (tagAttrs stanzaTag) mfrom = lookupAttrib "from" (tagAttrs stanzaTag) mto = lookupAttrib "to" (tagAttrs stanzaTag) dispatch <- case () of _ | stanzaTag `isServerIQOf` "get" -> grockStanzaIQGet stanzaTag _ -> return $ Just Unrecognized flip (maybe $ return ()) dispatch $ \dispatch -> case dispatch of Ping -> do let to = maybe "todo" id mto from = maybe "todo" id mfrom let pong = peerPong mid to from -- liftIO $ wlog "got ping, sending pong..." pongChan <- liftIO $ atomically newTChan pongClsrs <- liftIO $ atomically $ newTVar (Just []) ioWriteChan output $ Stanza { stanzaType = Pong , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = pongChan , stanzaClosers = pongClsrs , stanzaInterrupt = donevar } void . liftIO . forkIO $ do mapM_ (ioWriteChan pongChan) pong liftIO . atomically $ writeTVar pongClsrs Nothing -- liftIO $ wlog "finished pong stanza" stype -> ioWriteChan stanzas Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar } awaitCloser stanza_lvl liftIO . atomically $ writeTVar clsrs Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch maybe (return []) (\x -> do xs <- readUntilNothing ch return (x:xs)) x greetPeer = [ EventBeginDocument , EventBeginElement (streamP "stream") [ attr "xmlns" "jabber:server" , attr "version" "1.0" ] ] goodbyePeer = [ EventEndElement (streamP "stream") , EventEndDocument ] data XMPPState = PingSlot deriving (Eq,Ord) peerPing :: Maybe Text -> Text -> Text -> [XML.Event] peerPing mid to from = [ EventBeginElement "{jabber:server}iq" $ (case mid of Just c -> (("id",[ContentText c]):) _ -> id ) [ ("type",[ContentText "get"]) , attr "to" to , attr "from" from ] , EventBeginElement "{urn:xmpp:ping}ping" [] , EventEndElement "{urn:xmpp:ping}ping" , EventEndElement "{jabber:server}iq" ] peerPong mid to from = [ EventBeginElement "{jabber:server}iq" $(case mid of Just c -> (("id",[ContentText c]):) _ -> id) [ attr "type" "result" , attr "to" to , attr "from" from ] , EventEndElement "{jabber:server}iq" ] forkConnection :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> Sink (Flush XML.Event) IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection k pingflag src snk stanzas = do rdone <- atomically newEmptyTMVar slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement needsFlush <- atomically $ newTVar False let _ = slots :: Slotted.UpdateStream XMPPState XML.Event let greet_src = do CL.sourceList greetPeer =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do writeTVar needsFlush True return $ do yield (Chunk x) slot_src ,do Slotted.isEmpty slots >>= check readTVar needsFlush >>= check writeTVar needsFlush False return $ do yield Flush slot_src ,readTMVar rdone >> return (return ()) ] what forkIO $ do (greet_src >> slot_src) $$ snk wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do let xchan = stanzaChan stanza xfin = stanzaClosers stanza fix $ \inner -> do what <- atomically $ foldr1 orElse [readTChan xchan >>= \xml -> return $ do atomically $ Slotted.push slots Nothing xml inner ,do mb <- readTVar xfin cempty <- isEmptyTChan xchan if isNothing mb then if cempty then return loop else retry else retry -- todo: send closers ,do isEmptyTChan xchan >>= check readTMVar rdone return (return ())] what ,do pingflag >>= check return $ do let to = addrToText (callBackAddress k) from = "todo" -- Look it up from Server object -- or pass it with Connection event. mid = Just "ping" ping = peerPing mid to from mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) ping wlog "" prettyPrint "P<-PING " ping loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound k pingflag src stanzas output rdone atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k return output data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) {- data Peer = Peer { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis , peerState :: TVar PeerState } data PeerState = PeerPendingConnect UTCTime | PeerPendingAccept UTCTime | PeerConnected (TChan Stanza) -} peerKey (sock,addr) = do peer <- sIsConnected sock >>= \c -> if c then getPeerName sock -- addr is normally socketName else return addr -- Weird hack: addr is would-be peer name return $ PeerKey (peer `withPort` fromIntegral peerport) clientKey (sock,addr) = return $ ClientKey addr monitor sv params = do chan <- return $ serverEvent sv stanzas <- atomically newTChan fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \(k,e) -> return $ do case e of Connection pingflag conread conwrite -> do wlog $ tomsg k "Connection" let (xsrc,xsnk) = xmlStream conread conwrite forkConnection k pingflag xsrc xsnk stanzas return () ConnectFailure addr -> do wlog $ tomsg k "ConnectFailure" EOF -> wlog $ tomsg k "EOF" HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do -- xs <- readUntilNothing (stanzaChan stanza) xs <- chanContents (stanzaChan stanza) let typ = Strict8.pack $ "P->"++(show (stanzaType stanza))++" " wlog "" prettyPrint typ xs ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,ConnectionParameters ConnectionKey) xmppServer = do sv <- server let peer_params = (connectionDefaults peerKey) { pingInterval = 5000 , timeout = 1000 , duplex = False } client_params = connectionDefaults clientKey liftIO $ do forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) -- todo -- control sv (Listen clientport client_params) return (sv,peer_params)