{-# LANGUAGE OverloadedStrings #-} import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe (catMaybes) import Data.Monoid ( (<>) ) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import NestingXML import Server wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s control sv = atomically . putTMVar (serverCommand sv) -- Note: This function ignores name space qualification elementAttrs expected (EventBeginElement name attrs) | nameLocalName name==expected = return attrs elementAttrs _ _ = mzero getStreamName (EventBeginElement name _) = name xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink XML.Event IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = XML.renderBytes XML.def =$ snk src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool data Stanza = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } prettyPrint prefix xs = liftIO $ CL.sourceList xs $= XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines $$ CL.mapM_ (wlogb . (prefix <>)) xmppInbound :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> Sink XML.Event IO () xmppInbound k pingflag src stanzas = doNestingXML $ do withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do -- liftIO . wlog $ "begin-doc" withXML $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do -- liftIO . wlog $ "stream: " ++ show (getStreamName xml) -- liftIO . wlog $ "stream atributes: " ++ show stream_attrs fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." chan <- liftIO $ atomically newTChan whenJust nextElement $ \stanza -> do stanza_lvl <- nesting liftIO . atomically $ writeTChan chan (Just stanza) -- liftIO . wlog $ "stanza: "++show stanza liftIO . atomically $ writeTChan stanzas $ UnrecognizedStanza chan doUntilCloser stanza_lvl $ \xml -> do liftIO . atomically $ writeTChan chan (Just xml) -- liftIO . wlog $ "-stanza: " ++ show xml liftIO . atomically $ writeTChan chan Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x isEventBeginElement (EventBeginElement {}) = True isEventBeginElement _ = False isEventEndElement (EventEndElement {}) = True isEventEndElement _ = False forkConnection :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> Sink XML.Event IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection k pingflag src snk stanzas = do rdone <- atomically newEmptyTMVar forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound k pingflag src stanzas atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement let _ = slots :: Slotted.UpdateStream () XML.Event let slot_src = do what <- lift . atomically $ orElse (Slotted.pull slots >>= \x -> return $ do yield x slot_src) (readTMVar rdone >> return (return ())) what forkIO $ do slot_src $$ snk wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do let xchan = stanzaChan stanza fix $ \inner -> do what <- atomically $ orElse (readTChan xchan >>= \mxml -> return $ do case mxml of Just xml -> do atomically $ Slotted.push slots Nothing xml inner Nothing -> return ()) (readTMVar rdone >> return (return ())) what loop ,do pingflag >>= check return $ do wlog $ "TODO: send ping" loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k return output monitor sv params = do chan <- return $ serverEvent sv stanzas <- atomically newTChan fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \(k,e) -> return $ do case e of Connection pingflag conread conwrite -> do let (xsrc,xsnk) = xmlStream conread conwrite forkConnection k pingflag xsrc xsnk stanzas wlog $ tomsg k "Connection" EOF -> wlog $ tomsg k "EOF" HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do xs <- chanContents (stanzaChan stanza) wlog "" prettyPrint "STANZA: " (catMaybes xs) ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) peerKey (sock,addr) = do peer <- getPeerName sock return $ PeerKey (peer `withPort` fromIntegral peerport) clientKey (sock,addr) = return $ ClientKey addr peerport = 5269 clientport = 5222 main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) { pingInterval = 5000 , timeout = 5000 , duplex = False } client_params <- return $ connectionDefaults clientKey forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) -- control sv (Listen clientport client_params) -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c quitVar <- newEmptyTMVarIO installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing quitMessage <- atomically $ takeTMVar quitVar wlog "goodbye." return ()