{-# LANGUAGE CPP #-}
{-# LANGUAGE OverloadedStrings #-}
module XMPPServer
( xmppServer
, ConnectionKey(..)
, XMPPServerParameters(..)
, XMPPServer
, addPeer
, StanzaWrap(..)
, Stanza(..)
, StanzaType(..)
, StanzaOrigin(..)
, cloneStanza
, LangSpecificMessage(..)
, peerKeyToText
, peerKeyToResolvedName
, addrToText
, sendModifiedStanzaToPeer
, sendModifiedStanzaToClient
) where
import Debug.Trace
import Control.Monad.Trans.Resource (runResourceT)
import Control.Monad.Trans (lift)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Fix (fix)
import Control.Monad
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
-- import Control.Concurrent.STM.TChan
import Network.Socket
import XMPPTypes (withPort)
import Text.Printf
import System.Posix.Signals
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as Strict8
-- import qualified Data.ByteString.Lazy.Char8 as Lazy8
import Data.Int (Int8)
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Data.Conduit.Blaze (builderToByteStringFlush)
import qualified Text.XML.Stream.Render as XML
import qualified Text.XML.Stream.Parse as XML
import Data.XML.Types as XML
import Data.Maybe
import Data.Monoid ( (<>) )
import Data.Text (Text)
import qualified Data.Text as Text (pack,unpack)
import Data.Char (toUpper)
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Set (Set, (\\) )
import qualified Data.Set as Set
import qualified System.Random
import qualified Network.BSD as BSD
import GetHostByAddr (getHostByAddr)
import qualified Control.Concurrent.STM.UpdateStream as Slotted
import ControlMaybe
import Nesting
import EventUtil
import Server
peerport = 5269
clientport = 5222
my_uuid = "154ae29f-98f2-4af4-826d-a40c8a188574"
data ConnectionKey
= PeerKey { callBackAddress :: SockAddr }
| ClientKey { localAddress :: SockAddr }
deriving (Show, Ord, Eq)
data JabberShow = Offline
| ExtendedAway
| Away
| DoNotDisturb
| Available
| Chatty
deriving (Show,Enum,Ord,Eq,Read)
data MessageThread = MessageThread {
msgThreadParent :: Maybe Text,
msgThreadContent :: Text
}
deriving (Show,Eq)
data LangSpecificMessage =
LangSpecificMessage { msgBody :: Maybe Text
, msgSubject :: Maybe Text
}
deriving (Show,Eq)
data RosterEventType
= RequestedSubscription
| NewBuddy -- preceded by PresenceInformSubscription True
| RemovedBuddy -- preceded by PresenceInformSubscription False
| PendingSubscriber -- same as PresenceRequestSubscription
| NewSubscriber
| RejectSubscriber
deriving (Show,Read,Ord,Eq,Enum)
data StanzaType
= Unrecognized
| Ping
| Pong
| RequestResource (Maybe Text)
| SetResource
| SessionRequest
| UnrecognizedQuery Name
| RequestRoster
| Roster
| RosterEvent { rosterEventType :: RosterEventType
, rosterUser :: Text
, rosterContact :: Text }
| Error
| PresenceStatus { presenceShow :: JabberShow
, presencePriority :: Maybe Int8
, presenceStatus :: [(Lang,Text)]
}
| PresenceInformError
| PresenceInformSubscription Bool
| PresenceRequestStatus
| PresenceRequestSubscription Bool
| Message { msgThread :: Maybe MessageThread
, msgLangMap :: [(Lang,LangSpecificMessage)]
}
deriving Show
data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza)
data StanzaWrap a = Stanza
{ stanzaType :: StanzaType
, stanzaId :: Maybe Text
, stanzaTo :: Maybe Text
, stanzaFrom :: Maybe Text
, stanzaChan :: a
, stanzaClosers :: TVar (Maybe [XML.Event])
, stanzaInterrupt :: TMVar ()
, stanzaOrigin :: StanzaOrigin
}
type Stanza = StanzaWrap (TChan XML.Event)
data XMPPServerParameters =
XMPPServerParameters
{ xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text
, xmppTellMyNameToClient :: IO Text
, xmppTellMyNameToPeer :: SockAddr -> IO Text
, xmppTellClientHisName :: ConnectionKey -> IO Text
, xmppTellPeerHisName :: ConnectionKey -> IO Text
, xmppNewConnection :: ConnectionKey -> SockAddr -> TChan Stanza -> IO ()
, xmppEOF :: ConnectionKey -> IO ()
, xmppRosterBuddies :: ConnectionKey -> IO [Text]
, xmppRosterSubscribers :: ConnectionKey -> IO [Text]
, xmppRosterSolicited :: ConnectionKey -> IO [Text]
, xmppRosterOthers :: ConnectionKey -> IO [Text]
, xmppSubscribeToRoster :: ConnectionKey -> IO ()
-- , xmppLookupClientJID :: ConnectionKey -> IO Text
, xmppTellClientNameOfPeer :: ConnectionKey -> IO Text
, xmppDeliverMessage :: (IO ()) -> Stanza -> IO ()
, xmppInformClientPresence :: ConnectionKey -> Stanza -> IO ()
}
-- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error
-- client connection
-- socat script to send stanza fragment
-- copyToChannel can keep a stack of closers to append to finish-off a stanza
-- the TMVar () from forkConnection can be passed and with a stanza to detect interruption
addrToText :: SockAddr -> Text
addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
where stripColon s = pre where (pre,port) = break (==':') s
addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr)
where stripColon s = if null bracket then pre else pre ++ "]"
where
(pre,bracket) = break (==']') s
peerKeyToText :: ConnectionKey -> Text
peerKeyToText (PeerKey { callBackAddress=addr }) = addrToText addr
peerKeyToText (ClientKey { localAddress=addr }) = "ErrorClIeNt0"
peerKeyToResolvedName :: ConnectionKey -> IO Text
peerKeyToResolvedName k@(ClientKey { localAddress=addr }) = return "ErrorClIeNt1"
peerKeyToResolvedName k@(PeerKey { callBackAddress=addr }) = do
mname <- handleIO_ (return Nothing) $ do
ent <- getHostByAddr addr -- AF_UNSPEC addr
let names = BSD.hostName ent : BSD.hostAliases ent
return $ listToMaybe names
return $ maybe (peerKeyToText k) Text.pack mname
wlog s = putStrLn s
where _ = s :: String
wlogb s = Strict8.putStrLn s
xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
, Sink (Flush XML.Event) IO () )
xmlStream conread conwrite = (xsrc,xsnk)
where
xsrc = src $= XML.parseBytes XML.def
xsnk = -- XML.renderBytes XML.def =$ snk
XML.renderBuilderFlush XML.def
=$= builderToByteStringFlush
=$= discardFlush
=$ snk
where
discardFlush :: Monad m => ConduitM (Flush a) a m ()
discardFlush = awaitForever $ \x -> do
let unchunk (Chunk a) = a
ischunk (Chunk _) = True
ischunk _ = False
when (ischunk x) $ yield (unchunk x)
src = do
v <- lift conread
maybe (return ()) -- lift . wlog $ "conread: Nothing")
(\v -> yield v >> src)
v
snk = awaitForever $ liftIO . conwrite
type FlagCommand = STM Bool
type ReadCommand = IO (Maybe ByteString)
type WriteCommand = ByteString -> IO Bool
cloneStanza stanza = do
dupped <- cloneTChan (stanzaChan stanza)
return stanza { stanzaChan = dupped }
copyToChannel f chan closer_stack = awaitForever copy
where
copy x = do
liftIO . atomically $ writeTChan chan (f x)
case x of
EventBeginDocument {} -> do
let clsr = closerFor x
liftIO . atomically $
modifyTVar' closer_stack (fmap (clsr:))
EventEndDocument {} -> do
liftIO . atomically $
modifyTVar' closer_stack (fmap (drop 1))
_ -> return ()
yield x
prettyPrint prefix =
XML.renderBytes (XML.def { XML.rsPretty=True })
=$= CB.lines
=$ CL.mapM_ (wlogb . (prefix <>))
swapNamespace old new = awaitForever swapit
where
swapit (EventBeginElement n as) | nameNamespace n==Just old =
yield $ EventBeginElement (n { nameNamespace = Just new }) as
swapit (EventEndElement n) | nameNamespace n==Just old =
yield $ EventEndElement (n { nameNamespace = Just new })
swapit x = yield x
fixHeaders Stanza { stanzaTo=mto, stanzaFrom=mfrom } = do
x <- await
maybe (return ()) f x
where
f (EventBeginElement n as) = do yield $ EventBeginElement n (update as)
awaitForever yield
f x = yield x >> awaitForever yield
update as = as''
where
as' = maybe as (\to->attr "to" to:as) mto
as'' = maybe as' (\from->attr "from" from:as') mfrom
sendModifiedStanzaToPeer stanza chan = do
(echan,clsrs,quitvar) <- conduitToChan c
ioWriteChan chan
stanza { stanzaChan = echan
, stanzaClosers = clsrs
, stanzaInterrupt = quitvar
-- TODO id? origin?
}
where
c = stanzaToConduit stanza =$= swapNamespace "jabber:client" "jabber:server" =$= fixHeaders stanza
sendModifiedStanzaToClient stanza chan = do
(echan,clsrs,quitvar) <- conduitToChan c
ioWriteChan chan
stanza { stanzaChan = echan
, stanzaClosers = clsrs
, stanzaInterrupt = quitvar
-- TODO id? origin?
}
where
c = stanzaToConduit stanza =$= swapNamespace "jabber:server" "jabber:client" =$= fixHeaders stanza
-- id,to, and from are taken as-is from reply list
sendReply donevar stype reply replychan = do
if null reply then return ()
else do
let stanzaTag = head reply
mid = lookupAttrib "id" (tagAttrs stanzaTag)
mfrom = lookupAttrib "from" (tagAttrs stanzaTag)
mto = lookupAttrib "to" (tagAttrs stanzaTag)
replyStanza <- liftIO . atomically $ do
replyChan <- newTChan
replyClsrs <- newTVar (Just [])
return Stanza { stanzaType = stype
, stanzaId = mid
, stanzaTo = mto -- as-is from reply list
, stanzaFrom = mfrom -- as-is from reply list
, stanzaChan = replyChan
, stanzaClosers = replyClsrs
, stanzaInterrupt = donevar
, stanzaOrigin = LocalPeer
}
ioWriteChan replychan replyStanza
void . liftIO . forkIO $ do
mapM_ (ioWriteChan $ stanzaChan replyStanza) reply
liftIO . atomically $ writeTVar (stanzaClosers replyStanza) Nothing
-- liftIO $ wlog "finished reply stanza"
grokStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType)
grokStanzaIQGet stanza = do
mtag <- nextElement
flip (maybe $ return Nothing) mtag $ \tag -> do
case tagName tag of
"{urn:xmpp:ping}ping" -> return $ Just Ping
"{jabber:iq:roster}query" -> return $ Just RequestRoster
name -> return . Just $ UnrecognizedQuery name
grokStanzaIQResult :: Monad m => XML.Event -> NestingXML o m (Maybe StanzaType)
grokStanzaIQResult stanza = do
mtag <- nextElement
flip (maybe $ return (Just Pong)) mtag $ \tag -> do
case tagName tag of
_ -> return Nothing
grokStanzaIQSet :: XML.Event -> NestingXML o IO (Maybe StanzaType)
grokStanzaIQSet stanza = do
mtag <- nextElement
flip (maybe $ return Nothing) mtag $ \tag -> do
case tagName tag of
"{urn:ietf:params:xml:ns:xmpp-bind}bind" -> do
mchild <- nextElement
case fmap tagName mchild of
Just "{urn:ietf:params:xml:ns:xmpp-bind}resource" -> do
rsc <- XML.content -- TODO: MonadThrow???
return . Just $ RequestResource (Just rsc)
Just _ -> return Nothing
Nothing -> return . Just $ RequestResource Nothing
"{urn:ietf:params:xml:ns:xmpp-session}session" -> do
return $ Just SessionRequest
_ -> return Nothing
{-
C->Unrecognized Unrecognized type="set"
C->Unrecognized id="purpleae62d88f"
C->Unrecognized xmlns="jabber:client">
C->Unrecognized
C->Unrecognized
-}
ioWriteChan c v = liftIO . atomically $ writeTChan c v
parsePresenceStatus ns = do
let toStat "away" = Away
toStat "xa" = ExtendedAway
toStat "dnd" = DoNotDisturb
toStat "chat" = Chatty
showv <- liftIO . atomically $ newTVar Available
priov <- liftIO . atomically $ newTVar Nothing
statusv <- liftIO . atomically $ newTChan
fix $ \loop -> do
mtag <- nextElement
flip (maybe $ return ()) mtag $ \tag -> do
when (nameNamespace (tagName tag) == Just ns) $ do
case nameLocalName (tagName tag) of
"show" -> do t <- XML.content
liftIO . atomically $ writeTVar showv (toStat t)
"priority" -> do t <- XML.content
liftIO . handleIO_ (return ()) $ do
prio <- readIO (Text.unpack t)
atomically $ writeTVar priov (Just prio)
"status" -> do t <- XML.content
lang <- xmlLang
ioWriteChan statusv (maybe "" id lang,t)
_ -> return ()
loop
show <- liftIO . atomically $ readTVar showv
prio <- liftIO . atomically $ readTVar priov
status <- liftIO $ chanContents statusv -- Could use unsafeInterleaveIO to
-- avoid multiple passes, but whatever.
return . Just $ PresenceStatus { presenceShow = show
, presencePriority = prio
, presenceStatus = status
}
grokPresence ns stanzaTag = do
let typ = lookupAttrib "type" (tagAttrs stanzaTag)
case typ of
Nothing -> parsePresenceStatus ns
Just "unavailable" -> fmap (fmap (\p -> p {presenceShow=Offline}))
$ parsePresenceStatus ns
Just "error" -> return . Just $ PresenceInformError
Just "unsubscribed" -> return . Just $ PresenceInformSubscription False
Just "subscribed" -> return . Just $ PresenceInformSubscription True
Just "probe" -> return . Just $ PresenceRequestStatus
Just "unsubscribe" -> return . Just $ PresenceRequestSubscription False
Just "subscribe" -> return . Just $ PresenceRequestSubscription True
_ -> return Nothing
parseMessage ns stanza = do
let bodytag = Name { nameNamespace = Just ns
, nameLocalName = "body"
, namePrefix = Nothing }
subjecttag = Name { nameNamespace = Just ns
, nameLocalName = "subject"
, namePrefix = Nothing }
threadtag = Name { nameNamespace = Just ns
, nameLocalName = "thread"
, namePrefix = Nothing }
let emptyMsg = LangSpecificMessage { msgBody=Nothing, msgSubject=Nothing }
parseChildren (th,cmap) = do
child <- nextElement
lvl <- nesting
xmllang <- xmlLang
let lang = maybe "" id xmllang
let c = maybe emptyMsg id (Map.lookup lang cmap)
-- log $ " child: "<> bshow child
case child of
Just tag | tagName tag==bodytag
-> do
txt <- XML.content
awaitCloser lvl
parseChildren (th,Map.insert lang (c { msgBody=Just txt }) cmap)
Just tag | tagName tag==subjecttag
-> do
txt <- XML.content
awaitCloser lvl
parseChildren (th,Map.insert lang (c { msgSubject=Just txt }) cmap)
Just tag | tagName tag==threadtag
-> do
txt <- XML.content
awaitCloser lvl
parseChildren (th {msgThreadContent=txt},cmap)
Just tag -> do
-- let nm = tagName tag
-- attrs = tagAttrs tag
-- -- elems = msgElements c
-- txt <- XML.content
awaitCloser lvl
parseChildren (th,Map.insert lang c cmap)
Nothing -> return (th,cmap)
(th,langmap) <- parseChildren ( MessageThread {msgThreadParent=Nothing, msgThreadContent=""}
, Map.empty )
return Message {
msgLangMap = Map.toList langmap,
msgThread = if msgThreadContent th/="" then Just th else Nothing
}
grokMessage ns stanzaTag = do
t <- parseMessage ns stanzaTag
return $ Just t
grokStanza "jabber:server" stanzaTag =
case () of
_ | stanzaTag `isServerIQOf` "get" -> grokStanzaIQGet stanzaTag
_ | stanzaTag `isServerIQOf` "result" -> grokStanzaIQResult stanzaTag
_ | tagName stanzaTag == "{jabber:server}presence" -> grokPresence "jabber:server" stanzaTag
_ | tagName stanzaTag == "{jabber:server}message" -> grokMessage "jabber:server" stanzaTag
_ -> return $ Just Unrecognized
grokStanza "jabber:client" stanzaTag =
case () of
_ | stanzaTag `isClientIQOf` "get" -> grokStanzaIQGet stanzaTag
_ | stanzaTag `isClientIQOf` "set" -> grokStanzaIQSet stanzaTag
_ | stanzaTag `isClientIQOf` "result" -> grokStanzaIQResult stanzaTag
_ | tagName stanzaTag == "{jabber:client}presence" -> grokPresence "jabber:client" stanzaTag
_ | tagName stanzaTag == "{jabber:client}message" -> grokMessage "jabber:client" stanzaTag
_ -> return $ Just Unrecognized
xmppInbound :: Server ConnectionKey SockAddr
-> XMPPServerParameters
-> ConnectionKey
-> SockAddr
-> FlagCommand
-> Source IO XML.Event
-> TChan Stanza
-> TChan Stanza
-> TMVar ()
-> Sink XML.Event IO ()
xmppInbound sv xmpp k laddr pingflag src stanzas output donevar = doNestingXML $ do
let (namespace,tellmyname,tellyourname) = case k of
ClientKey {} -> ( "jabber:client"
, xmppTellMyNameToClient xmpp
, xmppTellClientHisName xmpp k
)
PeerKey {} -> ( "jabber:server"
, xmppTellMyNameToPeer xmpp laddr
, xmppTellPeerHisName xmpp k
)
me <- liftIO tellmyname
you <- liftIO tellyourname
withXML $ \begindoc -> do
when (begindoc==EventBeginDocument) $ do
whenJust nextElement $ \xml -> do
withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
fix $ \loop -> do
-- liftIO . wlog $ "waiting for stanza."
(chan,clsrs) <- liftIO . atomically $
liftM2 (,) newTChan (newTVar (Just []))
whenJust nextElement $ \stanzaTag -> do
stanza_lvl <- nesting
liftIO . atomically $ do
writeTChan chan stanzaTag
modifyTVar' clsrs (fmap (closerFor stanzaTag:))
copyToChannel id chan clsrs =$= do
let mid = lookupAttrib "id" (tagAttrs stanzaTag)
mfrom = lookupAttrib "from" (tagAttrs stanzaTag)
mto = lookupAttrib "to" (tagAttrs stanzaTag)
dispatch <- grokStanza namespace stanzaTag
let unrecog = do
let stype = Unrecognized
s <- liftIO . atomically $ do
return Stanza
{ stanzaType = stype
, stanzaId = mid
, stanzaTo = mto
, stanzaFrom = mfrom
, stanzaChan = chan
, stanzaClosers = clsrs
, stanzaInterrupt = donevar
, stanzaOrigin = NetworkOrigin k output
}
ioWriteChan stanzas s
flip (maybe $ unrecog) dispatch $ \dispatch ->
case dispatch of
-- Checking that the to-address matches this server.
-- Otherwise it could be a client-to-client ping or a
-- client-to-server for some other server.
-- For now, assuming its for the immediate connection.
Ping | mto==Just me || mto==Nothing -> do
let pongto = maybe you id mfrom
pongfrom = maybe me id mto
pong = makePong namespace mid pongto pongfrom
sendReply donevar Pong pong output
#ifdef PINGNOISE
-- TODO: Remove this, it is only to generate a debug print
ioWriteChan stanzas Stanza
{ stanzaType = Ping
, stanzaId = mid
, stanzaTo = mto
, stanzaFrom = mfrom
, stanzaChan = chan
, stanzaClosers = clsrs
, stanzaInterrupt = donevar
, stanzaOrigin = NetworkOrigin k output
}
#endif
stype -> ioWriteChan stanzas Stanza
{ stanzaType = stype
, stanzaId = mid
, stanzaTo = mto
, stanzaFrom = mfrom
, stanzaChan = chan
, stanzaClosers = clsrs
, stanzaInterrupt = donevar
, stanzaOrigin = NetworkOrigin k output
}
awaitCloser stanza_lvl
liftIO . atomically $ writeTVar clsrs Nothing
loop
chanContents :: TChan x -> IO [x]
chanContents ch = do
x <- atomically $ do
bempty <- isEmptyTChan ch
if bempty
then return Nothing
else fmap Just $ readTChan ch
maybe (return [])
(\x -> do
xs <- chanContents ch
return (x:xs))
x
while :: IO Bool -> IO a -> IO [a]
while cond body = do
b <- cond
if b then do x <- body
xs <- while cond body
return (x:xs)
else return []
readUntilNothing :: TChan (Maybe x) -> IO [x]
readUntilNothing ch = do
x <- atomically $ readTChan ch
maybe (return [])
(\x -> do
xs <- readUntilNothing ch
return (x:xs))
x
streamFeatures "jabber:client" =
[ EventBeginElement (streamP "features") []
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" []
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
{-
-- , " "
, " "
-- , " DIGEST-MD5"
, " PLAIN"
, " "
-}
, EventEndElement (streamP "features")
]
streamFeatures "jabber:server" =
[]
greet' namespace host =
[ EventBeginDocument
, EventBeginElement (streamP "stream")
[("from",[ContentText host])
,("id",[ContentText "someid"])
,("xmlns",[ContentText namespace])
,("xmlns:stream",[ContentText "http://etherx.jabber.org/streams"])
,("version",[ContentText "1.0"])
]
] ++ streamFeatures namespace
consid Nothing = id
consid (Just sid) = (("id",[ContentText sid]):)
data XMPPState
= PingSlot
deriving (Eq,Ord)
mkname namespace name = (Name name (Just namespace) Nothing)
makePing :: Text -> Maybe Text -> Text -> Text -> [XML.Event]
makePing namespace mid to from =
[ EventBeginElement (mkname namespace "iq")
$ (case mid of
Just c -> (("id",[ContentText c]):)
_ -> id )
[ ("type",[ContentText "get"])
, attr "to" to
, attr "from" from
]
, EventBeginElement "{urn:xmpp:ping}ping" []
, EventEndElement "{urn:xmpp:ping}ping"
, EventEndElement $ mkname namespace "iq"]
makePong namespace mid to from =
-- Note: similar to session reply
[ EventBeginElement (mkname namespace "iq")
$(case mid of
Just c -> (("id",[ContentText c]):)
_ -> id)
[ attr "type" "result"
, attr "to" to
, attr "from" from
]
, EventEndElement (mkname namespace "iq")
]
iq_bind_reply :: Maybe Text -> Text -> [XML.Event]
iq_bind_reply mid jid =
[ EventBeginElement "{jabber:client}iq" (consid mid [("type",[ContentText "result"])])
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
[("xmlns",[ContentText "urn:ietf:params:xml:ns:xmpp-bind"])]
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" []
, EventContent (ContentText jid)
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}jid"
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
, EventEndElement "{jabber:client}iq"
]
iq_session_reply mid host =
-- Note: similar to Pong
[ EventBeginElement "{jabber:client}iq"
(consid mid [("from",[ContentText host])
,("type",[ContentText "result"])
])
, EventEndElement "{jabber:client}iq"
]
iq_service_unavailable mid host {- mjid -} req =
[ EventBeginElement "{jabber:client}iq"
(consid mid [("type",[ContentText "error"])
-- , TODO: set "from" if isJust mjid
])
, EventBeginElement req []
, EventEndElement req
, EventBeginElement "{jabber:client}error" [("type",[ContentText "cancel"])]
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" []
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable"
, EventEndElement "{jabber:client}error"
, EventEndElement "{jabber:client}iq"
]
wrapStanzaList :: [XML.Event] -> STM [Either (StanzaWrap XML.Event) XML.Event]
wrapStanzaList xs = do
wrap <- do
clsrs <- newTVar Nothing
donev <- newTMVar ()
return $ \ x ->
Stanza { stanzaType = Unrecognized
, stanzaId = mid
, stanzaTo = mto
, stanzaFrom = mfrom
, stanzaClosers = clsrs
, stanzaInterrupt = donev
, stanzaOrigin = LocalPeer
, stanzaChan = x
}
return $ map (Left . wrap) (take 1 xs) ++ map Right (drop 1 xs)
where
m = listToMaybe xs
mto = m >>= lookupAttrib "to" . tagAttrs
mfrom = m >>= lookupAttrib "from" . tagAttrs
mid = m >>= lookupAttrib "id" . tagAttrs
{-
greet namespace =
[ EventBeginDocument
, EventBeginElement (streamP "stream")
[ attr "xmlns" namespace
, attr "version" "1.0"
]
]
-}
goodbye =
[ EventEndElement (streamP "stream")
, EventEndDocument
]
forkConnection :: Server ConnectionKey SockAddr
-> XMPPServerParameters
-> ConnectionKey
-> SockAddr
-> FlagCommand
-> Source IO XML.Event
-> Sink (Flush XML.Event) IO ()
-> TChan Stanza
-> IO (TChan Stanza)
forkConnection sv xmpp k laddr pingflag src snk stanzas = do
let (namespace,tellmyname) = case k of
ClientKey {} -> ("jabber:client", xmppTellMyNameToClient xmpp)
PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr)
me <- tellmyname
rdone <- atomically newEmptyTMVar
let isStarter (Left _) = True
isStarter (Right e) | isEventBeginElement e = True
isStarter _ = False
isStopper (Left _) = False
isStopper (Right e) | isEventEndElement e = True
isStopper _ = False
slots <- atomically $ Slotted.new isStarter isStopper
needsFlush <- atomically $ newTVar False
lastStanza <- atomically $ newTVar Nothing
nesting <- atomically $ newTVar 0
let _ = slots :: Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event)
let greet_src = do
CL.sourceList (greet' namespace me) =$= CL.map Chunk
yield Flush
slot_src = do
what <- lift . atomically $ foldr1 orElse
[Slotted.pull slots >>= \x -> do
x <- case x of
Left wrapped -> do
writeTVar nesting 1
writeTVar lastStanza (Just wrapped)
return $ stanzaChan wrapped
Right x -> do
when (isEventBeginElement x)
$ modifyTVar' nesting (+1)
when (isEventEndElement x) $ do
n <- readTVar nesting
when (n==1) $ writeTVar lastStanza Nothing
modifyTVar' nesting (subtract 1)
return x
writeTVar needsFlush True
return $ do
-- liftIO $ wlog $ "yielding Chunk: " ++ show x
yield (Chunk x)
slot_src
,do Slotted.isEmpty slots >>= check
readTVar needsFlush >>= check
writeTVar needsFlush False
return $ do
-- liftIO $ wlog "yielding Flush"
yield Flush
slot_src
,readTMVar rdone >> return (return ())
]
what
forkIO $ do (greet_src >> slot_src) $$ snk
last <- atomically $ readTVar lastStanza
es <- while (atomically . fmap not $ Slotted.isEmpty slots)
(atomically . Slotted.pull $ slots)
let es' = mapMaybe metadata es
metadata (Left s) = Just s
metadata _ = Nothing
let fail s = wlog $ "failed delivery: " ++ show (stanzaId s)
maybe (return ()) fail last
mapM_ fail es' -- TODO: queue or save these for re-connect?
wlog $ "end post-queue fork: " ++ show k
output <- atomically newTChan
forkIO $ do
-- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
fix $ \loop -> do
what <- atomically $ foldr1 orElse
[readTChan output >>= \stanza -> return $ do
#ifndef PINGNOISE
let notping f = case stanzaType stanza of Pong -> return ()
_ -> f
#else
let notping f = f
#endif
notping $ do
dup <- atomically $ cloneStanza stanza
let typ = Strict8.pack $ c ++ "<-"++(concat . take 1 . words $ show (stanzaType dup))++" "
c = case k of
ClientKey {} -> "C"
PeerKey {} -> "P"
wlog ""
stanzaToConduit dup $$ prettyPrint typ
stanzaToConduit stanza
$$ awaitForever
$ liftIO . atomically . Slotted.push slots Nothing . Right
loop
,do pingflag >>= check
return $ do
to <- xmppTellPeerHisName xmpp k -- addrToText (callBackAddress k)
let from = me -- Look it up from Server object
-- or pass it with Connection event.
mid = Just "ping"
ping = makePing namespace mid to from
ping <- atomically $ wrapStanzaList ping
mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
ping
#ifdef PINGNOISE
wlog ""
CL.sourceList ping $$ prettyPrint $ case k of
ClientKey {} -> "C<-Ping"
PeerKey {} -> "P<-Ping "
#endif
loop
,readTMVar rdone >> return (return ())
]
what
wlog $ "end pre-queue fork: " ++ show k
forkIO $ do
-- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
src $$ xmppInbound sv xmpp k laddr pingflag src stanzas output rdone
atomically $ putTMVar rdone ()
wlog $ "end reader fork: " ++ show k
return output
{-
data Peer = Peer
{ peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
, peerState :: TVar PeerState
}
data PeerState
= PeerPendingConnect UTCTime
| PeerPendingAccept UTCTime
| PeerConnected (TChan Stanza)
-}
peerKey (sock,addr) = do
peer <-
sIsConnected sock >>= \c ->
if c then getPeerName sock -- addr is normally socketName
else return addr -- Weird hack: addr is would-be peer name
laddr <- getSocketName sock
return $ (PeerKey (peer `withPort` fromIntegral peerport),laddr)
clientKey (sock,addr) = do
paddr <- getPeerName sock
return $ (ClientKey addr,paddr)
stanzaToConduit :: MonadIO m => Stanza -> ConduitM i Event m ()
stanzaToConduit stanza = do
let xchan = stanzaChan stanza
xfin = stanzaClosers stanza
rdone = stanzaInterrupt stanza
loop = return ()
fix $ \inner -> do
what <- liftIO . atomically $ foldr1 orElse
[readTChan xchan >>= \xml -> return $ do
yield xml -- atomically $ Slotted.push slots Nothing xml
inner
,do mb <- readTVar xfin
cempty <- isEmptyTChan xchan
if isNothing mb
then if cempty then return loop else retry
else do done <- tryReadTMVar rdone
check (isJust done)
trace "todo: send closers" retry
,do isEmptyTChan xchan >>= check
readTMVar rdone
return (return ())]
what
xmlifyRosterItems :: Monad m => Set Text -> Text -> Set Text -> ConduitM i Event m ()
xmlifyRosterItems solicited stype set = mapM_ item (Set.toList set)
where
item jid = do yield $ EventBeginElement "{jabber:iq:roster}item"
([ attr "jid" jid
, attr "subscription" stype
]++if Set.member jid solicited
then [attr "ask" "subscribe"]
else [] )
yield $ EventEndElement "{jabber:iq:roster}item"
conduitToChan c = do
chan <- atomically newTChan
clsrs <- atomically $ newTVar (Just [])
quitvar <- atomically $ newEmptyTMVar
forkIO $ do
c =$= copyToChannel id chan clsrs $$ awaitForever (const $ return ())
atomically $ writeTVar clsrs Nothing
return (chan,clsrs,quitvar)
sendRoster query xmpp replyto = do
let k = case stanzaOrigin query of
NetworkOrigin k _ -> Just k
LocalPeer -> Nothing -- local peer requested roster?
flip (maybe $ return ()) k $ \k -> do
jid <- case k of
ClientKey {} -> xmppTellClientHisName xmpp k -- LookupClientJID xmpp k
PeerKey {} -> xmppTellClientNameOfPeer xmpp k
let getlist f = do
bs <- f xmpp k
-- js <- mapM parseHostNameJID bs
return (Set.fromList bs) -- js)
buddies <- getlist xmppRosterBuddies
subscribers <- getlist xmppRosterSubscribers
solicited <- getlist xmppRosterSolicited
subnone0 <- getlist xmppRosterOthers
let subnone = subnone0 \\ (Set.union buddies subscribers)
let subto = buddies \\ subscribers
let subfrom = subscribers \\ buddies
let subboth = Set.intersection buddies subscribers
let roster = do
yield $ EventBeginElement "{jabber:client}iq"
(consid (stanzaId query)
[ attr "to" jid
, attr "type" "result" ])
yield $ EventBeginElement "{jabber:iq:roster}query" [] -- todo: ver?
xmlifyRosterItems solicited "to" subto
xmlifyRosterItems solicited "from" subfrom
xmlifyRosterItems solicited "both" subboth
xmlifyRosterItems solicited "none" subnone
yield $ EventEndElement "{jabber:iq:roster}query"
yield $ EventEndElement "{jabber:client}iq"
(chan,clsrs,quitvar) <- conduitToChan roster
ioWriteChan replyto
Stanza { stanzaType = Roster
, stanzaId = (stanzaId query)
, stanzaTo = Just jid
, stanzaFrom = Nothing
, stanzaChan = chan
, stanzaClosers = clsrs
, stanzaInterrupt = quitvar
, stanzaOrigin = LocalPeer
}
socketFromKey :: Server ConnectionKey SockAddr -> ConnectionKey -> IO SockAddr
socketFromKey sv k = do
map <- atomically $ readTVar (conmap sv)
let mcd = Map.lookup k map
case mcd of
Nothing -> case k of
ClientKey addr -> return addr
PeerKey addr -> return addr
-- XXX: ? wrong address
-- Shouldnt happen anyway.
Just cd -> return $ cdata cd
monitor sv params xmpp = do
chan <- return $ serverEvent sv
stanzas <- atomically newTChan
quitVar <- atomically newEmptyTMVar
fix $ \loop -> do
action <- atomically $ foldr1 orElse
[ readTChan chan >>= \((k,u),e) -> return $ do
case e of
Connection pingflag conread conwrite -> do
wlog $ tomsg k "Connection"
let (xsrc,xsnk) = xmlStream conread conwrite
outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas
xmppNewConnection xmpp k u outs
return ()
ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure"
EOF -> do wlog $ tomsg k "EOF"
xmppEOF xmpp k
HalfConnection In -> do
wlog $ tomsg k "ReadOnly"
control sv (Connect (callBackAddress k) params)
HalfConnection Out -> wlog $ tomsg k "WriteOnly"
RequiresPing -> return () -- wlog $ tomsg k "RequiresPing"
_ -> return ()
, readTChan stanzas >>= \stanza -> return $ do
dup <- case stanzaType stanza of
Message {} -> do
dup <- atomically $ cloneStanza stanza -- dupped so we can make debug print
return dup
_ -> return stanza
forkIO $ do
case stanzaType stanza of
Message {} -> do
let fail = wlog $ "Failed delivery id="++show (stanzaId stanza) -- TODO
xmppDeliverMessage xmpp fail stanza
_ -> return ()
case stanzaOrigin stanza of
NetworkOrigin k@(ClientKey {}) replyto ->
case stanzaType stanza of
RequestResource wanted -> do
sockaddr <- socketFromKey sv k
rsc <- xmppChooseResourceName xmpp k sockaddr wanted
let reply = iq_bind_reply (stanzaId stanza) rsc
sendReply quitVar SetResource reply replyto
SessionRequest -> do
me <- xmppTellMyNameToClient xmpp
let reply = iq_session_reply (stanzaId stanza) me
sendReply quitVar Pong reply replyto
RequestRoster -> do
sendRoster stanza xmpp replyto
xmppSubscribeToRoster xmpp k
PresenceStatus {} -> do
xmppInformClientPresence xmpp k stanza
UnrecognizedQuery query -> do
me <- xmppTellMyNameToClient xmpp
let reply = iq_service_unavailable (stanzaId stanza) me query
sendReply quitVar Error reply replyto
_ -> return ()
_ -> return ()
-- We need to clone in the case the stanza is passed on as for Message.
#ifndef PINGNOISE
let notping f = case stanzaType stanza of Pong -> return ()
_ -> f
notping $ do
#endif
let typ = Strict8.pack $ c ++ "->"++(concat . take 1 . words $ show (stanzaType stanza))++" "
c = case stanzaOrigin stanza of
LocalPeer -> "*"
NetworkOrigin (ClientKey {}) _ -> "C"
NetworkOrigin (PeerKey {}) _ -> "P"
wlog ""
stanzaToConduit dup $$ prettyPrint typ
]
action
loop
where
tomsg k str = printf "%12s %s" str (show k)
where
_ = str :: String
data XMPPServer
= XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr
, _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr
}
addPeer :: XMPPServer -> SockAddr -> IO ()
addPeer sv addr = do
control (_xmpp_sv sv) (ConnectWithEndlessRetry addr (_xmpp_peer_params sv) 10000)
xmppServer :: ( MonadResource m
, MonadIO m
) => XMPPServerParameters -> m XMPPServer
xmppServer xmpp = do
sv <- server
-- some fuzz helps avoid simultaneity
pingfuzz <- liftIO $ do
gen <- System.Random.getStdGen
let (r,gen') = System.Random.next gen
return $ r `mod` 2000 -- maximum 2 seconds of fuzz
liftIO . wlog $ "pingfuzz = " ++ show pingfuzz
let peer_params = (connectionDefaults peerKey)
{ pingInterval = 15000 + pingfuzz
, timeout = 2000
, duplex = False }
client_params = (connectionDefaults clientKey)
{ pingInterval = 0
, timeout = 0
}
liftIO $ do
forkIO $ monitor sv peer_params xmpp
control sv (Listen peerport peer_params)
control sv (Listen clientport client_params)
return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params }
#if MIN_VERSION_stm(2,4,0)
#else
-- |Clone a 'TChan': similar to dupTChan, but the cloned channel starts with the
-- same content available as the original channel.
--
-- Terrible inefficient implementation provided to build against older libraries.
cloneTChan :: TChan a -> STM (TChan a)
cloneTChan chan = do
contents <- chanContents' chan
chan2 <- dupTChan chan
mapM_ (writeTChan chan) contents
return chan2
where
chanContents' chan = do
b <- isEmptyTChan chan
if b then return [] else do
x <- readTChan chan
xs <- chanContents' chan
return (x:xs)
#endif