{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} module XMPPServer ( xmppServer , ConnectionKey(..) , XMPPServerParameters(..) , XMPPServer , addPeer , StanzaWrap(..) , Stanza(..) , StanzaType(..) , StanzaOrigin(..) , cloneStanza , LangSpecificMessage(..) , peerKeyToText , peerKeyToResolvedName , addrToText , sendModifiedStanzaToPeer , sendModifiedStanzaToClient ) where import Debug.Trace import Control.Monad.Trans.Resource (runResourceT) import Control.Monad.Trans (lift) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Fix (fix) import Control.Monad import Control.Concurrent (forkIO) import Control.Concurrent.STM -- import Control.Concurrent.STM.TChan import Network.Socket import XMPPTypes (withPort) import Text.Printf import System.Posix.Signals import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as Strict8 -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 import Data.Int (Int8) import Data.Conduit import qualified Data.Conduit.List as CL import qualified Data.Conduit.Binary as CB import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML import Data.Maybe import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack,unpack) import Data.Char (toUpper) import Data.Map (Map) import qualified Data.Map as Map import Data.Set (Set, (\\) ) import qualified Data.Set as Set import qualified System.Random import qualified Network.BSD as BSD import GetHostByAddr (getHostByAddr) import qualified Control.Concurrent.STM.UpdateStream as Slotted import ControlMaybe import Nesting import EventUtil import Server peerport = 5269 clientport = 5222 my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574" data ConnectionKey = PeerKey { callBackAddress :: SockAddr } | ClientKey { localAddress :: SockAddr } deriving (Show, Ord, Eq) data JabberShow = Offline | ExtendedAway | Away | DoNotDisturb | Available | Chatty deriving (Show,Enum,Ord,Eq,Read) data MessageThread = MessageThread { msgThreadParent :: Maybe Text, msgThreadContent :: Text } deriving (Show,Eq) data LangSpecificMessage = LangSpecificMessage { msgBody :: Maybe Text , msgSubject :: Maybe Text } deriving (Show,Eq) data RosterEventType = RequestedSubscription | NewBuddy -- preceded by PresenceInformSubscription True | RemovedBuddy -- preceded by PresenceInformSubscription False | PendingSubscriber -- same as PresenceRequestSubscription | NewSubscriber | RejectSubscriber deriving (Show,Read,Ord,Eq,Enum) data StanzaType = Unrecognized | Ping | Pong | RequestResource (Maybe Text) | SetResource | SessionRequest | UnrecognizedQuery Name | RequestRoster | Roster | RosterEvent { rosterEventType :: RosterEventType , rosterUser :: Text , rosterContact :: Text } | Error | PresenceStatus { presenceShow :: JabberShow , presencePriority :: Maybe Int8 , presenceStatus :: [(Lang,Text)] } | PresenceInformError | PresenceInformSubscription Bool | PresenceRequestStatus | PresenceRequestSubscription Bool | Message { msgThread :: Maybe MessageThread , msgLangMap :: [(Lang,LangSpecificMessage)] } deriving Show data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) data StanzaWrap a = Stanza { stanzaType :: StanzaType , stanzaId :: Maybe Text , stanzaTo :: Maybe Text , stanzaFrom :: Maybe Text , stanzaChan :: a , stanzaClosers :: TVar (Maybe [XML.Event]) , stanzaInterrupt :: TMVar () , stanzaOrigin :: StanzaOrigin } type Stanza = StanzaWrap (TChan XML.Event) data XMPPServerParameters = XMPPServerParameters { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text , xmppTellMyNameToClient :: IO Text , xmppTellMyNameToPeer :: SockAddr -> IO Text , xmppTellClientHisName :: ConnectionKey -> IO Text , xmppTellPeerHisName :: ConnectionKey -> IO Text , xmppNewConnection :: ConnectionKey -> SockAddr -> TChan Stanza -> IO () , xmppEOF :: ConnectionKey -> IO () , xmppRosterBuddies :: ConnectionKey -> IO [Text] , xmppRosterSubscribers :: ConnectionKey -> IO [Text] , xmppRosterSolicited :: ConnectionKey -> IO [Text] , xmppRosterOthers :: ConnectionKey -> IO [Text] , xmppSubscribeToRoster :: ConnectionKey -> IO () -- , xmppLookupClientJID :: ConnectionKey -> IO Text , xmppTellClientNameOfPeer :: ConnectionKey -> IO Text , xmppDeliverMessage :: (IO ()) -> Stanza -> IO () , xmppInformClientPresence :: ConnectionKey -> Stanza -> IO () } -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error -- client connection -- socat script to send stanza fragment -- copyToChannel can keep a stack of closers to append to finish-off a stanza -- the TMVar () from forkConnection can be passed and with a stanza to detect interruption addrToText :: SockAddr -> Text addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) where stripColon s = pre where (pre,port) = break (==':') s addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) where stripColon s = if null bracket then pre else pre ++ "]" where (pre,bracket) = break (==']') s peerKeyToText :: ConnectionKey -> Text peerKeyToText (PeerKey { callBackAddress=addr }) = addrToText addr peerKeyToText (ClientKey { localAddress=addr }) = "ErrorClIeNt0" peerKeyToResolvedName :: ConnectionKey -> IO Text peerKeyToResolvedName k@(ClientKey { localAddress=addr }) = return "ErrorClIeNt1" peerKeyToResolvedName k@(PeerKey { callBackAddress=addr }) = do mname <- handleIO_ (return Nothing) $ do ent <- getHostByAddr addr -- AF_UNSPEC addr let names = BSD.hostName ent : BSD.hostAliases ent return $ listToMaybe names return $ maybe (peerKeyToText k) Text.pack mname wlog s = putStrLn s where _ = s :: String wlogb s = Strict8.putStrLn s xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event , Sink (Flush XML.Event) IO () ) xmlStream conread conwrite = (xsrc,xsnk) where xsrc = src $= XML.parseBytes XML.def xsnk = -- XML.renderBytes XML.def =$ snk XML.renderBuilderFlush XML.def =$= builderToByteStringFlush =$= discardFlush =$ snk where discardFlush :: Monad m => ConduitM (Flush a) a m () discardFlush = awaitForever $ \x -> do let unchunk (Chunk a) = a ischunk (Chunk _) = True ischunk _ = False when (ischunk x) $ yield (unchunk x) src = do v <- lift conread maybe (return ()) -- lift . wlog $ "conread: Nothing") (\v -> yield v >> src) v snk = awaitForever $ liftIO . conwrite type FlagCommand = STM Bool type ReadCommand = IO (Maybe ByteString) type WriteCommand = ByteString -> IO Bool cloneStanza stanza = do dupped <- cloneTChan (stanzaChan stanza) return stanza { stanzaChan = dupped } copyToChannel f chan closer_stack = awaitForever copy where copy x = do liftIO . atomically $ writeTChan chan (f x) case x of EventBeginDocument {} -> do let clsr = closerFor x liftIO . atomically $ modifyTVar' closer_stack (fmap (clsr:)) EventEndDocument {} -> do liftIO . atomically $ modifyTVar' closer_stack (fmap (drop 1)) _ -> return () yield x prettyPrint prefix = XML.renderBytes (XML.def { XML.rsPretty=True }) =$= CB.lines =$ CL.mapM_ (wlogb . (prefix <>)) swapNamespace old new = awaitForever swapit where swapit (EventBeginElement n as) | nameNamespace n==Just old = yield $ EventBeginElement (n { nameNamespace = Just new }) as swapit (EventEndElement n) | nameNamespace n==Just old = yield $ EventEndElement (n { nameNamespace = Just new }) swapit x = yield x fixHeaders Stanza { stanzaTo=mto, stanzaFrom=mfrom } = do x <- await maybe (return ()) f x where f (EventBeginElement n as) = do yield $ EventBeginElement n (update as) awaitForever yield f x = yield x >> awaitForever yield update as = as'' where as' = maybe as (\to->attr "to" to:as) mto as'' = maybe as' (\from->attr "from" from:as') mfrom sendModifiedStanzaToPeer stanza chan = do (echan,clsrs,quitvar) <- conduitToChan c ioWriteChan chan stanza { stanzaChan = echan , stanzaClosers = clsrs , stanzaInterrupt = quitvar -- TODO id? origin? } where c = stanzaToConduit stanza =$= swapNamespace "jabber:client" "jabber:server" =$= fixHeaders stanza sendModifiedStanzaToClient stanza chan = do (echan,clsrs,quitvar) <- conduitToChan c ioWriteChan chan stanza { stanzaChan = echan , stanzaClosers = clsrs , stanzaInterrupt = quitvar -- TODO id? origin? } where c = stanzaToConduit stanza =$= swapNamespace "jabber:server" "jabber:client" =$= fixHeaders stanza -- id,to, and from are taken as-is from reply list sendReply donevar stype reply replychan = do if null reply then return () else do let stanzaTag = head reply mid = lookupAttrib "id" (tagAttrs stanzaTag) mfrom = lookupAttrib "from" (tagAttrs stanzaTag) mto = lookupAttrib "to" (tagAttrs stanzaTag) replyStanza <- liftIO . atomically $ do replyChan <- newTChan replyClsrs <- newTVar (Just []) return Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto -- as-is from reply list , stanzaFrom = mfrom -- as-is from reply list , stanzaChan = replyChan , stanzaClosers = replyClsrs , stanzaInterrupt = donevar , stanzaOrigin = LocalPeer } ioWriteChan replychan replyStanza void . liftIO . forkIO $ do mapM_ (ioWriteChan $ stanzaChan replyStanza) reply liftIO . atomically $ writeTVar (stanzaClosers replyStanza) Nothing -- liftIO $ wlog "finished reply stanza" grokStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType) grokStanzaIQGet stanza = do mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:xmpp:ping}ping" -> return $ Just Ping "{jabber:iq:roster}query" -> return $ Just RequestRoster name -> return . Just $ UnrecognizedQuery name grokStanzaIQResult :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType) grokStanzaIQResult stanza = do mtag <- nextElement flip (maybe $ return (Just Pong)) mtag $ \tag -> do case tagName tag of _ -> return Nothing grokStanzaIQSet :: XML.Event -> NestingXML o IO (Maybe StanzaType) grokStanzaIQSet stanza = do mtag <- nextElement flip (maybe $ return Nothing) mtag $ \tag -> do case tagName tag of "{urn:ietf:params:xml:ns:xmpp-bind}bind" -> do mchild <- nextElement case fmap tagName mchild of Just "{urn:ietf:params:xml:ns:xmpp-bind}resource" -> do rsc <- XML.content -- TODO: MonadThrow??? return . Just $ RequestResource (Just rsc) Just _ -> return Nothing Nothing -> return . Just $ RequestResource Nothing "{urn:ietf:params:xml:ns:xmpp-session}session" -> do return $ Just SessionRequest _ -> return Nothing {- C->Unrecognized Unrecognized type="set" C->Unrecognized id="purpleae62d88f" C->Unrecognized xmlns="jabber:client"> C->Unrecognized C->Unrecognized -} ioWriteChan c v = liftIO . atomically $ writeTChan c v parsePresenceStatus ns = do let toStat "away" = Away toStat "xa" = ExtendedAway toStat "dnd" = DoNotDisturb toStat "chat" = Chatty showv <- liftIO . atomically $ newTVar Available priov <- liftIO . atomically $ newTVar Nothing statusv <- liftIO . atomically $ newTChan fix $ \loop -> do mtag <- nextElement flip (maybe $ return ()) mtag $ \tag -> do when (nameNamespace (tagName tag) == Just ns) $ do case nameLocalName (tagName tag) of "show" -> do t <- XML.content liftIO . atomically $ writeTVar showv (toStat t) "priority" -> do t <- XML.content liftIO . handleIO_ (return ()) $ do prio <- readIO (Text.unpack t) atomically $ writeTVar priov (Just prio) "status" -> do t <- XML.content lang <- xmlLang ioWriteChan statusv (maybe "" id lang,t) _ -> return () loop show <- liftIO . atomically $ readTVar showv prio <- liftIO . atomically $ readTVar priov status <- liftIO $ chanContents statusv -- Could use unsafeInterleaveIO to -- avoid multiple passes, but whatever. return . Just $ PresenceStatus { presenceShow = show , presencePriority = prio , presenceStatus = status } grokPresence ns stanzaTag = do let typ = lookupAttrib "type" (tagAttrs stanzaTag) case typ of Nothing -> parsePresenceStatus ns Just "unavailable" -> fmap (fmap (\p -> p {presenceShow=Offline})) $ parsePresenceStatus ns Just "error" -> return . Just $ PresenceInformError Just "unsubscribed" -> return . Just $ PresenceInformSubscription False Just "subscribed" -> return . Just $ PresenceInformSubscription True Just "probe" -> return . Just $ PresenceRequestStatus Just "unsubscribe" -> return . Just $ PresenceRequestSubscription False Just "subscribe" -> return . Just $ PresenceRequestSubscription True _ -> return Nothing parseMessage ns stanza = do let bodytag = Name { nameNamespace = Just ns , nameLocalName = "body" , namePrefix = Nothing } subjecttag = Name { nameNamespace = Just ns , nameLocalName = "subject" , namePrefix = Nothing } threadtag = Name { nameNamespace = Just ns , nameLocalName = "thread" , namePrefix = Nothing } let emptyMsg = LangSpecificMessage { msgBody=Nothing, msgSubject=Nothing } parseChildren (th,cmap) = do child <- nextElement lvl <- nesting xmllang <- xmlLang let lang = maybe "" id xmllang let c = maybe emptyMsg id (Map.lookup lang cmap) -- log $ " child: "<> bshow child case child of Just tag | tagName tag==bodytag -> do txt <- XML.content awaitCloser lvl parseChildren (th,Map.insert lang (c { msgBody=Just txt }) cmap) Just tag | tagName tag==subjecttag -> do txt <- XML.content awaitCloser lvl parseChildren (th,Map.insert lang (c { msgSubject=Just txt }) cmap) Just tag | tagName tag==threadtag -> do txt <- XML.content awaitCloser lvl parseChildren (th {msgThreadContent=txt},cmap) Just tag -> do -- let nm = tagName tag -- attrs = tagAttrs tag -- -- elems = msgElements c -- txt <- XML.content awaitCloser lvl parseChildren (th,Map.insert lang c cmap) Nothing -> return (th,cmap) (th,langmap) <- parseChildren ( MessageThread {msgThreadParent=Nothing, msgThreadContent=""} , Map.empty ) return Message { msgLangMap = Map.toList langmap, msgThread = if msgThreadContent th/="" then Just th else Nothing } grokMessage ns stanzaTag = do t <- parseMessage ns stanzaTag return $ Just t grokStanza "jabber:server" stanzaTag = case () of _ | stanzaTag `isServerIQOf` "get" -> grokStanzaIQGet stanzaTag _ | stanzaTag `isServerIQOf` "result" -> grokStanzaIQResult stanzaTag _ | tagName stanzaTag == "{jabber:server}presence" -> grokPresence "jabber:server" stanzaTag _ | tagName stanzaTag == "{jabber:server}message" -> grokMessage "jabber:server" stanzaTag _ -> return $ Just Unrecognized grokStanza "jabber:client" stanzaTag = case () of _ | stanzaTag `isClientIQOf` "get" -> grokStanzaIQGet stanzaTag _ | stanzaTag `isClientIQOf` "set" -> grokStanzaIQSet stanzaTag _ | stanzaTag `isClientIQOf` "result" -> grokStanzaIQResult stanzaTag _ | tagName stanzaTag == "{jabber:client}presence" -> grokPresence "jabber:client" stanzaTag _ | tagName stanzaTag == "{jabber:client}message" -> grokMessage "jabber:client" stanzaTag _ -> return $ Just Unrecognized xmppInbound :: Server ConnectionKey SockAddr -> XMPPServerParameters -> ConnectionKey -> SockAddr -> FlagCommand -> Source IO XML.Event -> TChan Stanza -> TChan Stanza -> TMVar () -> Sink XML.Event IO () xmppInbound sv xmpp k laddr pingflag src stanzas output donevar = doNestingXML $ do let (namespace,tellmyname,tellyourname) = case k of ClientKey {} -> ( "jabber:client" , xmppTellMyNameToClient xmpp , xmppTellClientHisName xmpp k ) PeerKey {} -> ( "jabber:server" , xmppTellMyNameToPeer xmpp laddr , xmppTellPeerHisName xmpp k ) me <- liftIO tellmyname you <- liftIO tellyourname withXML $ \begindoc -> do when (begindoc==EventBeginDocument) $ do whenJust nextElement $ \xml -> do withJust (elementAttrs "stream" xml) $ \stream_attrs -> do fix $ \loop -> do -- liftIO . wlog $ "waiting for stanza." (chan,clsrs) <- liftIO . atomically $ liftM2 (,) newTChan (newTVar (Just [])) whenJust nextElement $ \stanzaTag -> do stanza_lvl <- nesting liftIO . atomically $ do writeTChan chan stanzaTag modifyTVar' clsrs (fmap (closerFor stanzaTag:)) copyToChannel id chan clsrs =$= do let mid = lookupAttrib "id" (tagAttrs stanzaTag) mfrom = lookupAttrib "from" (tagAttrs stanzaTag) mto = lookupAttrib "to" (tagAttrs stanzaTag) dispatch <- grokStanza namespace stanzaTag let unrecog = do let stype = Unrecognized s <- liftIO . atomically $ do return Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } ioWriteChan stanzas s flip (maybe $ unrecog) dispatch $ \dispatch -> case dispatch of -- Checking that the to-address matches this server. -- Otherwise it could be a client-to-client ping or a -- client-to-server for some other server. -- For now, assuming its for the immediate connection. Ping | mto==Just me || mto==Nothing -> do let pongto = maybe you id mfrom pongfrom = maybe me id mto pong = makePong namespace mid pongto pongfrom sendReply donevar Pong pong output #ifdef PINGNOISE -- TODO: Remove this, it is only to generate a debug print ioWriteChan stanzas Stanza { stanzaType = Ping , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } #endif stype -> ioWriteChan stanzas Stanza { stanzaType = stype , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = donevar , stanzaOrigin = NetworkOrigin k output } awaitCloser stanza_lvl liftIO . atomically $ writeTVar clsrs Nothing loop chanContents :: TChan x -> IO [x] chanContents ch = do x <- atomically $ do bempty <- isEmptyTChan ch if bempty then return Nothing else fmap Just $ readTChan ch maybe (return []) (\x -> do xs <- chanContents ch return (x:xs)) x while :: IO Bool -> IO a -> IO [a] while cond body = do b <- cond if b then do x <- body xs <- while cond body return (x:xs) else return [] readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch maybe (return []) (\x -> do xs <- readUntilNothing ch return (x:xs)) x streamFeatures "jabber:client" = [ EventBeginElement (streamP "features") [] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" {- -- , " " , " " -- , " DIGEST-MD5" , " PLAIN" , " " -} , EventEndElement (streamP "features") ] streamFeatures "jabber:server" = [] greet' namespace host = [ EventBeginDocument , EventBeginElement (streamP "stream") [("from",[ContentText host]) ,("id",[ContentText "someid"]) ,("xmlns",[ContentText namespace]) ,("xmlns:stream",[ContentText "http://etherx.jabber.org/streams"]) ,("version",[ContentText "1.0"]) ] ] ++ streamFeatures namespace consid Nothing = id consid (Just sid) = (("id",[ContentText sid]):) data XMPPState = PingSlot deriving (Eq,Ord) mkname namespace name = (Name name (Just namespace) Nothing) makePing :: Text -> Maybe Text -> Text -> Text -> [XML.Event] makePing namespace mid to from = [ EventBeginElement (mkname namespace "iq") $ (case mid of Just c -> (("id",[ContentText c]):) _ -> id ) [ ("type",[ContentText "get"]) , attr "to" to , attr "from" from ] , EventBeginElement "{urn:xmpp:ping}ping" [] , EventEndElement "{urn:xmpp:ping}ping" , EventEndElement $ mkname namespace "iq"] makePong namespace mid to from = -- Note: similar to session reply [ EventBeginElement (mkname namespace "iq") $(case mid of Just c -> (("id",[ContentText c]):) _ -> id) [ attr "type" "result" , attr "to" to , attr "from" from ] , EventEndElement (mkname namespace "iq") ] iq_bind_reply :: Maybe Text -> Text -> [XML.Event] iq_bind_reply mid jid = [ EventBeginElement "{jabber:client}iq" (consid mid [("type",[ContentText "result"])]) , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" [("xmlns",[ContentText "urn:ietf:params:xml:ns:xmpp-bind"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" [] , EventContent (ContentText jid) , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" , EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" , EventEndElement "{jabber:client}iq" ] iq_session_reply mid host = -- Note: similar to Pong [ EventBeginElement "{jabber:client}iq" (consid mid [("from",[ContentText host]) ,("type",[ContentText "result"]) ]) , EventEndElement "{jabber:client}iq" ] iq_service_unavailable mid host {- mjid -} req = [ EventBeginElement "{jabber:client}iq" (consid mid [("type",[ContentText "error"]) -- , TODO: set "from" if isJust mjid ]) , EventBeginElement req [] , EventEndElement req , EventBeginElement "{jabber:client}error" [("type",[ContentText "cancel"])] , EventBeginElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" [] , EventEndElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" , EventEndElement "{jabber:client}error" , EventEndElement "{jabber:client}iq" ] wrapStanzaList :: [XML.Event] -> STM [Either (StanzaWrap XML.Event) XML.Event] wrapStanzaList xs = do wrap <- do clsrs <- newTVar Nothing donev <- newTMVar () return $ \ x -> Stanza { stanzaType = Unrecognized , stanzaId = mid , stanzaTo = mto , stanzaFrom = mfrom , stanzaClosers = clsrs , stanzaInterrupt = donev , stanzaOrigin = LocalPeer , stanzaChan = x } return $ map (Left . wrap) (take 1 xs) ++ map Right (drop 1 xs) where m = listToMaybe xs mto = m >>= lookupAttrib "to" . tagAttrs mfrom = m >>= lookupAttrib "from" . tagAttrs mid = m >>= lookupAttrib "id" . tagAttrs {- greet namespace = [ EventBeginDocument , EventBeginElement (streamP "stream") [ attr "xmlns" namespace , attr "version" "1.0" ] ] -} goodbye = [ EventEndElement (streamP "stream") , EventEndDocument ] forkConnection :: Server ConnectionKey SockAddr -> XMPPServerParameters -> ConnectionKey -> SockAddr -> FlagCommand -> Source IO XML.Event -> Sink (Flush XML.Event) IO () -> TChan Stanza -> IO (TChan Stanza) forkConnection sv xmpp k laddr pingflag src snk stanzas = do let (namespace,tellmyname) = case k of ClientKey {} -> ("jabber:client", xmppTellMyNameToClient xmpp) PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr) me <- tellmyname rdone <- atomically newEmptyTMVar let isStarter (Left _) = True isStarter (Right e) | isEventBeginElement e = True isStarter _ = False isStopper (Left _) = False isStopper (Right e) | isEventEndElement e = True isStopper _ = False slots <- atomically $ Slotted.new isStarter isStopper needsFlush <- atomically $ newTVar False lastStanza <- atomically $ newTVar Nothing nesting <- atomically $ newTVar 0 let _ = slots :: Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event) let greet_src = do CL.sourceList (greet' namespace me) =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do x <- case x of Left wrapped -> do writeTVar nesting 1 writeTVar lastStanza (Just wrapped) return $ stanzaChan wrapped Right x -> do when (isEventBeginElement x) $ modifyTVar' nesting (+1) when (isEventEndElement x) $ do n <- readTVar nesting when (n==1) $ writeTVar lastStanza Nothing modifyTVar' nesting (subtract 1) return x writeTVar needsFlush True return $ do -- liftIO $ wlog $ "yielding Chunk: " ++ show x yield (Chunk x) slot_src ,do Slotted.isEmpty slots >>= check readTVar needsFlush >>= check writeTVar needsFlush False return $ do -- liftIO $ wlog "yielding Flush" yield Flush slot_src ,readTMVar rdone >> return (return ()) ] what forkIO $ do (greet_src >> slot_src) $$ snk last <- atomically $ readTVar lastStanza es <- while (atomically . fmap not $ Slotted.isEmpty slots) (atomically . Slotted.pull $ slots) let es' = mapMaybe metadata es metadata (Left s) = Just s metadata _ = Nothing let fail s = wlog $ "failed delivery: " ++ show (stanzaId s) maybe (return ()) fail last mapM_ fail es' -- TODO: queue or save these for re-connect? wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer fix $ \loop -> do what <- atomically $ foldr1 orElse [readTChan output >>= \stanza -> return $ do #ifndef PINGNOISE let notping f = case stanzaType stanza of Pong -> return () _ -> f #else let notping f = f #endif notping $ do dup <- atomically $ cloneStanza stanza let typ = Strict8.pack $ c ++ "<-"++(concat . take 1 . words $ show (stanzaType dup))++" " c = case k of ClientKey {} -> "C" PeerKey {} -> "P" wlog "" stanzaToConduit dup $$ prettyPrint typ stanzaToConduit stanza $$ awaitForever $ liftIO . atomically . Slotted.push slots Nothing . Right loop ,do pingflag >>= check return $ do to <- xmppTellPeerHisName xmpp k -- addrToText (callBackAddress k) let from = me -- Look it up from Server object -- or pass it with Connection event. mid = Just "ping" ping = makePing namespace mid to from ping <- atomically $ wrapStanzaList ping mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) ping #ifdef PINGNOISE wlog "" CL.sourceList ping $$ prettyPrint $ case k of ClientKey {} -> "C<-Ping" PeerKey {} -> "P<-Ping " #endif loop ,readTMVar rdone >> return (return ()) ] what wlog $ "end pre-queue fork: " ++ show k forkIO $ do -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) src $$ xmppInbound sv xmpp k laddr pingflag src stanzas output rdone atomically $ putTMVar rdone () wlog $ "end reader fork: " ++ show k return output {- data Peer = Peer { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis , peerState :: TVar PeerState } data PeerState = PeerPendingConnect UTCTime | PeerPendingAccept UTCTime | PeerConnected (TChan Stanza) -} peerKey (sock,addr) = do peer <- sIsConnected sock >>= \c -> if c then getPeerName sock -- addr is normally socketName else return addr -- Weird hack: addr is would-be peer name laddr <- getSocketName sock return $ (PeerKey (peer `withPort` fromIntegral peerport),laddr) clientKey (sock,addr) = do paddr <- getPeerName sock return $ (ClientKey addr,paddr) stanzaToConduit :: MonadIO m => Stanza -> ConduitM i Event m () stanzaToConduit stanza = do let xchan = stanzaChan stanza xfin = stanzaClosers stanza rdone = stanzaInterrupt stanza loop = return () fix $ \inner -> do what <- liftIO . atomically $ foldr1 orElse [readTChan xchan >>= \xml -> return $ do yield xml -- atomically $ Slotted.push slots Nothing xml inner ,do mb <- readTVar xfin cempty <- isEmptyTChan xchan if isNothing mb then if cempty then return loop else retry else do done <- tryReadTMVar rdone check (isJust done) trace "todo: send closers" retry ,do isEmptyTChan xchan >>= check readTMVar rdone return (return ())] what xmlifyRosterItems :: Monad m => Set Text -> Text -> Set Text -> ConduitM i Event m () xmlifyRosterItems solicited stype set = mapM_ item (Set.toList set) where item jid = do yield $ EventBeginElement "{jabber:iq:roster}item" ([ attr "jid" jid , attr "subscription" stype ]++if Set.member jid solicited then [attr "ask" "subscribe"] else [] ) yield $ EventEndElement "{jabber:iq:roster}item" conduitToChan c = do chan <- atomically newTChan clsrs <- atomically $ newTVar (Just []) quitvar <- atomically $ newEmptyTMVar forkIO $ do c =$= copyToChannel id chan clsrs $$ awaitForever (const $ return ()) atomically $ writeTVar clsrs Nothing return (chan,clsrs,quitvar) sendRoster query xmpp replyto = do let k = case stanzaOrigin query of NetworkOrigin k _ -> Just k LocalPeer -> Nothing -- local peer requested roster? flip (maybe $ return ()) k $ \k -> do jid <- case k of ClientKey {} -> xmppTellClientHisName xmpp k -- LookupClientJID xmpp k PeerKey {} -> xmppTellClientNameOfPeer xmpp k let getlist f = do bs <- f xmpp k -- js <- mapM parseHostNameJID bs return (Set.fromList bs) -- js) buddies <- getlist xmppRosterBuddies subscribers <- getlist xmppRosterSubscribers solicited <- getlist xmppRosterSolicited subnone0 <- getlist xmppRosterOthers let subnone = subnone0 \\ (Set.union buddies subscribers) let subto = buddies \\ subscribers let subfrom = subscribers \\ buddies let subboth = Set.intersection buddies subscribers let roster = do yield $ EventBeginElement "{jabber:client}iq" (consid (stanzaId query) [ attr "to" jid , attr "type" "result" ]) yield $ EventBeginElement "{jabber:iq:roster}query" [] -- todo: ver? xmlifyRosterItems solicited "to" subto xmlifyRosterItems solicited "from" subfrom xmlifyRosterItems solicited "both" subboth xmlifyRosterItems solicited "none" subnone yield $ EventEndElement "{jabber:iq:roster}query" yield $ EventEndElement "{jabber:client}iq" (chan,clsrs,quitvar) <- conduitToChan roster ioWriteChan replyto Stanza { stanzaType = Roster , stanzaId = (stanzaId query) , stanzaTo = Just jid , stanzaFrom = Nothing , stanzaChan = chan , stanzaClosers = clsrs , stanzaInterrupt = quitvar , stanzaOrigin = LocalPeer } socketFromKey :: Server ConnectionKey SockAddr -> ConnectionKey -> IO SockAddr socketFromKey sv k = do map <- atomically $ readTVar (conmap sv) let mcd = Map.lookup k map case mcd of Nothing -> case k of ClientKey addr -> return addr PeerKey addr -> return addr -- XXX: ? wrong address -- Shouldnt happen anyway. Just cd -> return $ cdata cd monitor sv params xmpp = do chan <- return $ serverEvent sv stanzas <- atomically newTChan quitVar <- atomically newEmptyTMVar fix $ \loop -> do action <- atomically $ foldr1 orElse [ readTChan chan >>= \((k,u),e) -> return $ do case e of Connection pingflag conread conwrite -> do wlog $ tomsg k "Connection" let (xsrc,xsnk) = xmlStream conread conwrite outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas xmppNewConnection xmpp k u outs return () ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" EOF -> do wlog $ tomsg k "EOF" xmppEOF xmpp k HalfConnection In -> do wlog $ tomsg k "ReadOnly" control sv (Connect (callBackAddress k) params) HalfConnection Out -> wlog $ tomsg k "WriteOnly" RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" _ -> return () , readTChan stanzas >>= \stanza -> return $ do dup <- case stanzaType stanza of Message {} -> do dup <- atomically $ cloneStanza stanza -- dupped so we can make debug print return dup _ -> return stanza forkIO $ do case stanzaType stanza of Message {} -> do let fail = wlog $ "Failed delivery id="++show (stanzaId stanza) -- TODO xmppDeliverMessage xmpp fail stanza _ -> return () case stanzaOrigin stanza of NetworkOrigin k@(ClientKey {}) replyto -> case stanzaType stanza of RequestResource wanted -> do sockaddr <- socketFromKey sv k rsc <- xmppChooseResourceName xmpp k sockaddr wanted let reply = iq_bind_reply (stanzaId stanza) rsc sendReply quitVar SetResource reply replyto SessionRequest -> do me <- xmppTellMyNameToClient xmpp let reply = iq_session_reply (stanzaId stanza) me sendReply quitVar Pong reply replyto RequestRoster -> do sendRoster stanza xmpp replyto xmppSubscribeToRoster xmpp k PresenceStatus {} -> do xmppInformClientPresence xmpp k stanza UnrecognizedQuery query -> do me <- xmppTellMyNameToClient xmpp let reply = iq_service_unavailable (stanzaId stanza) me query sendReply quitVar Error reply replyto _ -> return () _ -> return () -- We need to clone in the case the stanza is passed on as for Message. #ifndef PINGNOISE let notping f = case stanzaType stanza of Pong -> return () _ -> f notping $ do #endif let typ = Strict8.pack $ c ++ "->"++(concat . take 1 . words $ show (stanzaType stanza))++" " c = case stanzaOrigin stanza of LocalPeer -> "*" NetworkOrigin (ClientKey {}) _ -> "C" NetworkOrigin (PeerKey {}) _ -> "P" wlog "" stanzaToConduit dup $$ prettyPrint typ ] action loop where tomsg k str = printf "%12s %s" str (show k) where _ = str :: String data XMPPServer = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr } addPeer :: XMPPServer -> SockAddr -> IO () addPeer sv addr = do control (_xmpp_sv sv) (ConnectWithEndlessRetry addr (_xmpp_peer_params sv) 10000) xmppServer :: ( MonadResource m , MonadIO m ) => XMPPServerParameters -> m XMPPServer xmppServer xmpp = do sv <- server -- some fuzz helps avoid simultaneity pingfuzz <- liftIO $ do gen <- System.Random.getStdGen let (r,gen') = System.Random.next gen return $ r `mod` 2000 -- maximum 2 seconds of fuzz liftIO . wlog $ "pingfuzz = " ++ show pingfuzz let peer_params = (connectionDefaults peerKey) { pingInterval = 15000 + pingfuzz , timeout = 2000 , duplex = False } client_params = (connectionDefaults clientKey) { pingInterval = 0 , timeout = 0 } liftIO $ do forkIO $ monitor sv peer_params xmpp control sv (Listen peerport peer_params) control sv (Listen clientport client_params) return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params } #if MIN_VERSION_stm(2,4,0) #else -- |Clone a 'TChan': similar to dupTChan, but the cloned channel starts with the -- same content available as the original channel. -- -- Terrible inefficient implementation provided to build against older libraries. cloneTChan :: TChan a -> STM (TChan a) cloneTChan chan = do contents <- chanContents' chan chan2 <- dupTChan chan mapM_ (writeTChan chan) contents return chan2 where chanContents' chan = do b <- isEmptyTChan chan if b then return [] else do x <- readTChan chan xs <- chanContents' chan return (x:xs) #endif