{-# LANGUAGE OverloadedStrings #-} import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe (catMaybes,fromJust) import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import Nesting import EventUtil import Server addrToText :: SockAddr -> Text addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) where stripColon s = pre where (pre,port) = break (==':') s addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) where stripColon s = if null bracket then pre else pre ++ "]" where (pre,bracket) = break (==']') s wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s control sv = atomically . putTMVar (serverCommand sv) xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink (Flush XML.Event) IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = -- XML.renderBytes XML.def =$ snk XML.renderBuilderFlush XML.def =$= builderToByteStringFlush =$= discardFlush =$ snk where discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = awaitForever $ \x -> do let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False when (ischunk x) $ yield (unchunk x) src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool data Stanza = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | PingStanza { stanzaId :: Maybe Text , stanzaChan :: TChan (Maybe XML.Event) } | PongStanza { -- stanzaId :: Maybe Text stanzaChan :: TChan (Maybe XML.Event) } copyToChannel f chan = awaitForever copy where copy x = do liftIO . atomically $ writeTChan chan (f x) yield x prettyPrint prefix xs = liftIO $ CL.sourceList xs $= XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines $$ CL.mapM_ (wlogb . (prefix <>)) grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza)) grockStanzaIQGet stanza = do let mid = lookupAttrib "id" (tagAttrs stanza) -- mfrom = lookupAttrib "from" (tagAttrs stanza) mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:xmpp:ping}ping" -> do return $ Just (PingStanza mid) _ -> return Nothing ioWriteChan c v = liftIO . atomically $ writeTChan c v xmppInbound :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> TChan Stanza -> Sink XML.Event IO () xmppInbound k pingflag src stanzas output = doNestingXML $ do withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do whenJust nextElement $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." chan <- liftIO $ atomically newTChan whenJust nextElement $ \stanza -> do stanza_lvl <- nesting ioWriteChan chan (Just stanza) copyToChannel Just chan =$= do dispatch <- case () of _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza _ -> return $ Just UnrecognizedStanza flip (maybe $ return ()) dispatch $ \dispatch -> case dispatch chan of d@(PingStanza {}) -> do let to = "todo" from = "todo" let pong = peerPong (stanzaId d) to from pongChan <- liftIO $ atomically newTChan ioWriteChan output (PongStanza pongChan) mapM_ (ioWriteChan pongChan . Just) pong ioWriteChan pongChan Nothing disp -> ioWriteChan stanzas disp awaitCloser stanza_lvl ioWriteChan chan Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch maybe (return []) (\x -> do xs <- readUntilNothing ch return (x:xs)) x greetPeer = [ EventBeginDocument , EventBeginElement (streamP "stream") [ attr "xmlns" "jabber:server" , attr "version" "1.0" ] ] goodbyePeer = [ EventEndElement (streamP "stream") , EventEndDocument ] data XMPPState = PingSlot deriving (Eq,Ord) peerPing :: Maybe Text -> Text -> Text -> [XML.Event] peerPing mid to from = [ EventBeginElement "{jabber:server}iq" $ (case mid of Just c -> (("id",[ContentText c]):) _ -> id ) [ ("type",[ContentText "get"]) , attr "to" to , attr "from" from ] , EventBeginElement "{urn:xmpp:ping}ping" [] , EventEndElement "{urn:xmpp:ping}ping" , EventEndElement "{jabber:server}iq" ] peerPong mid to from = [ EventBeginElement "{jabber:server}iq" $(case mid of Just c -> (("id",[ContentText c]):) _ -> id) [ attr "type" "result" , attr "to" to , attr "from" from ] , EventEndElement "{jabber:server}iq" ] forkConnection :: ConnectionKey -> FlagCommand -> Source IO XML.Event -> Sink (Flush XML.Event) IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection k pingflag src snk stanzas = do rdone <- atomically newEmptyTMVar slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement needsFlush <- atomically $ newTVar False let _ = slots :: Slotted.UpdateStream XMPPState XML.Event let greet_src = do CL.sourceList greetPeer =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do writeTVar needsFlush True return $ do yield (Chunk x) slot_src ,do Slotted.isEmpty slots >>= check readTVar needsFlush >>= check writeTVar needsFlush False return $ do yield Flush slot_src ,readTMVar rdone >> return (return ()) ] what forkIO $ do (greet_src >> slot_src) $$ snk wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do let xchan = stanzaChan stanza fix $ \inner -> do what <- atomically $ orElse (readTChan xchan >>= \mxml -> return $ do case mxml of Just xml -> do atomically $ Slotted.push slots Nothing xml inner Nothing -> loop) (readTMVar rdone >> return (return ())) what ,do pingflag >>= check return $ do let to = addrToText (callBackAddress k) from = "todo" -- Look it up from Server object -- or pass it with Connection event. mid = Just "ping" mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) (peerPing mid to from) loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound k pingflag src stanzas output atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k return output data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) {- data Peer = Peer { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis , peerState :: TVar PeerState } data PeerState = PeerPendingConnect UTCTime | PeerPendingAccept UTCTime | PeerConnected (TChan Stanza) -} peerKey (sock,addr) = do peer <- sIsConnected sock >>= \c -> if c then getPeerName sock -- addr is normally socketName else return addr -- Weird hack: addr is would-be peer name return $ PeerKey (peer `withPort` fromIntegral peerport) clientKey (sock,addr) = return $ ClientKey addr monitor sv params = do chan <- return $ serverEvent sv stanzas <- atomically newTChan fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \(k,e) -> return $ do case e of Connection pingflag conread conwrite -> do wlog $ tomsg k "Connection" let (xsrc,xsnk) = xmlStream conread conwrite forkConnection k pingflag xsrc xsnk stanzas return () ConnectFailure addr -> do wlog $ tomsg k "ConnectFailure" EOF -> wlog $ tomsg k "EOF" HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do xs <- readUntilNothing (stanzaChan stanza) wlog "" prettyPrint "STANZA: " xs ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String peerport = 5269 clientport = 5222 main = runResourceT $ do sv <- server lift $ do peer_params <- return (connectionDefaults peerKey) { pingInterval = 10000 , timeout = 10000 , duplex = False } client_params <- return $ connectionDefaults clientKey let testaddr0 = "fd97:ca88:fa7c:b94b:c8b8:fad4:1021:a54d" testaddr<- fmap (addrAddress . head) $ getAddrInfo (Just $ defaultHints { addrFlags = [ AI_CANONNAME ]}) (Just testaddr0) (Just "5269") putStrLn $ "Connecting to "++show testaddr control sv (ConnectWithEndlessRetry testaddr peer_params 2000) forkIO $ monitor sv peer_params control sv (Listen peerport peer_params) -- control sv (Listen clientport client_params) -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c quitVar <- newEmptyTMVarIO installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing quitMessage <- atomically $ takeTMVar quitVar wlog "goodbye." return ()