{-# LANGUAGE OverloadedStrings #-} import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe (catMaybes) import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import NestingXML import EventUtil import Server addrToText :: SockAddr -> Text addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) where stripColon s = pre where (pre,port) = break (==':') s addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) where stripColon s = if null bracket then pre else pre ++ "]" where (pre,bracket) = break (==']') s wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s control sv = atomically . putTMVar (serverCommand sv) xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink (Flush XML.Event) IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = -- XML.renderBytes XML.def =$ snk XML.renderBuilderFlush XML.def =$= builderToByteStringFlush =$= discardFlush =$ snk where discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = awaitForever $ \x -> do let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False when (ischunk x) $ yield (unchunk x) src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool data Stanza = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } prettyPrint prefix xs = liftIO $ CL.sourceList xs $= XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines $$ CL.mapM_ (wlogb . (prefix <>)) xmppInbound :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> Sink XML.Event IO () xmppInbound k pingflag src stanzas = doNestingXML $ do withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do whenJust nextElement $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." chan <- liftIO $ atomically newTChan whenJust nextElement $ \stanza -> do stanza_lvl <- nesting liftIO . atomically $ writeTChan chan (Just stanza) liftIO . atomically $ writeTChan stanzas $ UnrecognizedStanza chan doUntilCloser stanza_lvl $ \xml -> do liftIO . atomically $ writeTChan chan (Just xml) liftIO . atomically $ writeTChan chan Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch maybe (return []) (\x -> do xs <- readUntilNothing ch return (x:xs)) x greetPeer = [ EventBeginDocument , EventBeginElement (streamP "stream") [ attr "xmlns" "jabber:server" , attr "version" "1.0" ] ] goodbyePeer = [ EventEndElement (streamP "stream") , EventEndDocument ] data XMPPState = PingSlot deriving (Eq,Ord) peerPing :: Maybe Text -> Text -> Text -> [XML.Event] peerPing mid to from = [ EventBeginElement "{jabber:server}iq" $ (case mid of Just c -> (("id",[ContentText c]):) _ -> id ) [ ("type",[ContentText "get"]) , attr "to" to , attr "from" from ] , EventBeginElement "{urn:xmpp:ping}ping" [] , EventEndElement "{urn:xmpp:ping}ping" , EventEndElement "{jabber:server}iq" ] forkConnection :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> Sink (Flush XML.Event) IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection k pingflag src snk stanzas = do rdone <- atomically newEmptyTMVar forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound k pingflag src stanzas atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement needsFlush <- atomically $ newTVar False let _ = slots :: Slotted.UpdateStream XMPPState XML.Event let greet_src = do CL.sourceList greetPeer =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do writeTVar needsFlush True return $ do yield (Chunk x) slot_src ,do Slotted.isEmpty slots >>= check readTVar needsFlush >>= check writeTVar needsFlush False return $ do yield Flush slot_src ,readTMVar rdone >> return (return ()) ] what forkIO $ do (greet_src >> slot_src) $$ snk wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do let xchan = stanzaChan stanza fix $ \inner -> do what <- atomically $ orElse (readTChan xchan >>= \mxml -> return $ do case mxml of Just xml -> do atomically $ Slotted.push slots Nothing xml inner Nothing -> loop) (readTMVar rdone >> return (return ())) what ,do pingflag >>= check return $ do let to = addrToText (callBackAddress k) from = "todo" -- Look it up from Server object -- or pass it with Connection event. mid = Just "ping" mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) (peerPing mid to from) loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k return output monitor sv params = do chan <- return $ serverEvent sv stanzas <- atomically newTChan fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \(k,e) -> return $ do case e of Connection pingflag conread conwrite -> do let (xsrc,xsnk) = xmlStream conread conwrite forkConnection k pingflag xsrc xsnk stanzas wlog $ tomsg k "Connection" EOF -> wlog $ tomsg k "EOF" HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do xs <- readUntilNothing (stanzaChan stanza) wlog "" prettyPrint "STANZA: " xs ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) peerKey (sock,addr) = do peer <- getPeerName sock return $ PeerKey (peer `withPort` fromIntegral peerport) clientKey (sock,addr) = return $ ClientKey addr peerport = 5269 clientport = 5222 main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) { pingInterval = 10000 , timeout = 10000 , duplex = False } client_params <- return $ connectionDefaults clientKey forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) -- control sv (Listen clientport client_params) -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c quitVar <- newEmptyTMVarIO installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing quitMessage <- atomically $ takeTMVar quitVar wlog "goodbye." return ()