{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE ViewPatterns #-}
module XMPP
( module XMPPTypes
, module SocketLike
, listenForXmppClients
, listenForRemotePeers
, newServerConnections
, seekRemotePeers
, quitListening
, OutBoundMessage(..)
, sendMessage
) where
import ServerC
import XMPPTypes
import SocketLike
import ByteStringOperators
import ControlMaybe
import Data.HList
import Network.Socket
( Family
, connect
, socketToHandle
, sClose
, Socket(..)
, socket
, SocketType(..)
)
import Network.BSD
( PortNumber
, getHostName
, hostName
, hostAliases
, getProtocolNumber
)
import System.IO
( BufferMode(..)
, IOMode(..)
, hSetBuffering
)
import Control.Concurrent.STM
import Data.Conduit
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as S (pack,putStr,putStrLn,append)
import qualified Data.ByteString.Lazy.Char8 as L
( putStrLn
, fromChunks
, unlines
, hPutStrLn
)
import Control.Concurrent (forkIO,killThread)
import Control.Concurrent.Async
import Control.Exception
( handle
-- , SomeException(..)
, finally
, bracketOnError )
import GHC.IO.Exception (IOException(..))
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Maybe
import Todo
import Control.Monad as Monad
import Text.XML.Stream.Parse (parseBytes,content)
import Text.XML.Stream.Render
import Data.XML.Types as XML
import Data.Text.Encoding as S (decodeUtf8,encodeUtf8)
import Data.Text.Lazy.Encoding as L (decodeUtf8,encodeUtf8)
import Data.Text.Lazy (toStrict)
import GetHostByAddr
import Data.Monoid
import qualified Data.Sequence as Seq
import Data.Foldable (toList)
#ifdef RENDERFLUSH
import Data.Conduit.Blaze
#endif
import Data.List (find)
import qualified Text.Show.ByteString as L
import NestingXML
import Data.Set as Set (Set)
import qualified Data.Set as Set
import qualified Data.Map as Map
import Data.Map as Map (Map)
import GHC.Conc
( threadStatus
, ThreadStatus(..)
, ThreadId
)
data Commands = Send [XML.Event] | QuitThread
deriving Prelude.Show
getNamesForPeer :: Peer -> IO [ByteString]
getNamesForPeer LocalHost = fmap ((:[]) . S.pack) getHostName
getNamesForPeer peer@(RemotePeer addr) = do
ent <- getHostByAddr addr -- AF_UNSPEC addr
let names = hostName ent : hostAliases ent
return . map S.pack $ names
xmlifyPresenceForClient :: Presence -> IO [XML.Event]
xmlifyPresenceForClient (Presence jid stat) = do
let n = name jid
rsc = resource jid
names <- getNamesForPeer (peer jid)
let tostr p = L.decodeUtf8 $ n <$++> "@" ++> L.fromChunks [p] <++?> "/" <++$> rsc
jidstrs = fmap (toStrict . tostr) names
return (concatMap presenceEvents jidstrs)
where
presenceEvents jidstr =
[ EventBeginElement "{jabber:client}presence" (("from",[ContentText jidstr]):typ stat)
, EventBeginElement "{jabber:client}show" []
, EventContent (ContentText . shw $ stat)
, EventEndElement "{jabber:client}show"
, EventEndElement "{jabber:client}presence"
]
typ Offline = [("type",[ContentText "unavailable"])]
typ _ = []
shw Available = "chat"
shw Away = "away"
shw Offline = "away" -- Is this right?
prefix ## name = Name name Nothing (Just prefix)
streamP name = Name name (Just "http://etherx.jabber.org/streams") (Just "stream")
greet host =
[ EventBeginDocument
, EventBeginElement (streamP "stream")
[("from",[ContentText host])
,("id",[ContentText "someid"])
,("xmlns",[ContentText "jabber:client"])
,("xmlns:stream",[ContentText "http://etherx.jabber.org/streams"])
,("version",[ContentText "1.0"])
]
, EventBeginElement (streamP "features") []
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind" []
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
{-
-- , " "
, " "
-- , " DIGEST-MD5"
, " PLAIN"
, " "
-}
, EventEndElement (streamP "features")
]
-- type Consumer i m r = forall o. ConduitM i o m r
mawait :: Monad m => MaybeT (ConduitM i o m) i
mawait = MaybeT await
-- Note: This function ignores name space qualification
elementAttrs expected (EventBeginElement name attrs)
| nameLocalName name==expected
= return attrs
elementAttrs _ _ = mzero
eventIsBeginElement (EventBeginElement _ _) = True
eventIsBeginElement _ = False
eventIsEndElement (EventEndElement _) = True
eventIsEndElement _ = False
filterMapElement::
(Monad m, MonadPlus mp) =>
(Event -> mp a) -> Event -> mp a -> MaybeT (ConduitM Event o m) (mp a)
filterMapElement ret opentag empty = loop (empty `mplus` ret opentag) 1
where
loop ts 0 = return ts
loop ts cnt = do
tag <- mawait
let ts' = mplus ts (ret tag)
case () of
_ | eventIsEndElement tag -> loop ts' (cnt-1)
_ | eventIsBeginElement tag -> loop ts' (cnt+1)
_ -> loop ts' cnt
gatherElement ::
(Monad m, MonadPlus mp) =>
Event -> mp Event -> NestingXML o m (mp Event)
gatherElement opentag empty = loop (empty `mplus` return opentag) 1
where
loop ts 0 = return ts
loop ts cnt = do
maybeXML (return ts) $ \tag -> do
let ts' = mplus ts (return tag)
case () of
_ | eventIsEndElement tag -> loop ts' (cnt-1)
_ | eventIsBeginElement tag -> loop ts' (cnt+1)
_ -> loop ts' cnt
{-
sourceStanza :: Monad m => Event -> ConduitM Event Event m ()
sourceStanza opentag = yield opentag >> loop 1
where
loop 0 = return ()
loop cnt = do
e <- await
let go tag cnt = yield tag >> loop cnt
case e of
Just tag | eventIsEndElement tag -> go tag (cnt-1)
Just tag | eventIsBeginElement tag -> go tag (cnt+1)
Just tag -> go tag cnt
Nothing -> return ()
-}
voidMaybeT body = (>> return ()) . runMaybeT $ body
fixMaybeT f = (>> return ()) . runMaybeT . fix $ f
iq_bind_reply id jid =
[ EventBeginElement "{jabber:client}iq" [("type",[ContentText "result"]),("id",[ContentText id])]
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
[("xmlns",[ContentText "urn:ietf:params:xml:ns:xmpp-bind"])]
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-bind}jid" []
, EventContent (ContentText jid)
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}jid"
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-bind}bind"
, EventEndElement "{jabber:client}iq"
]
uncontent cs = head $ map getText cs
where
getText (ContentText x) = x
getText (ContentEntity x ) = x
tagAttrs (EventBeginElement _ xs) = xs
tagAttrs _ = []
tagName (EventBeginElement n _) = n
tagName _ = ""
handleIQSetBind session cmdChan stanza_id = do
mchild <- nextElement
rsc <- case mchild of
Just child -> do
let unhandledBind = do
liftIO $ putStrLn $ "unhandled-bind: "++show child
return ""
case tagName child of
"{urn:ietf:params:xml:ns:xmpp-bind}resource"
-> do
rsc <- lift content
return $ L.fromChunks [S.encodeUtf8 rsc]
_ -> unhandledBind
Nothing -> do
liftIO $ putStrLn $ "empty bind request!"
return ""
liftIO $ do
L.putStrLn $ "iq-set-bind-resource " <++> rsc
setResource session rsc
jid <- getJID session
atomically $ writeTChan cmdChan (Send $ iq_bind_reply stanza_id (toStrict $ L.decodeUtf8 $ L.show jid) )
forCachedPresence session $ \presence -> do
xs <- xmlifyPresenceForClient presence
atomically . writeTChan cmdChan . Send $ xs
iq_session_reply host stanza_id =
[ EventBeginElement "{jabber:client}iq"
[("id",[ContentText stanza_id])
,("from",[ContentText host])
,("type",[ContentText "result"])
]
, EventEndElement "{jabber:client}iq"
]
handleIQSetSession session cmdChan stanza_id = do
host <- liftIO $ do
jid <- getJID session
names <- getNamesForPeer (peer jid)
return (S.decodeUtf8 . head $ names)
liftIO . atomically . writeTChan cmdChan . Send $ iq_session_reply host stanza_id
handleIQSet session cmdChan tag = do
withJust (lookupAttrib "id" (tagAttrs tag)) $ \stanza_id -> do
whenJust nextElement $ \child -> do
let unhandledSet = liftIO $ putStrLn ("iq-set: "++show (stanza_id,child))
case tagName child of
"{urn:ietf:params:xml:ns:xmpp-bind}bind"
-> handleIQSetBind session cmdChan stanza_id
"{urn:ietf:params:xml:ns:xmpp-session}session"
-> handleIQSetSession session cmdChan stanza_id
_ -> unhandledSet
matchAttrib name value attrs =
case find ( (==name) . fst) attrs of
Just (_,[ContentText x]) | x==value -> True
Just (_,[ContentEntity x]) | x==value -> True
_ -> False
lookupAttrib name attrs =
case find ( (==name) . fst) attrs of
Just (_,[ContentText x]) -> Just x
Just (_,[ContentEntity x]) -> Just x
_ -> Nothing
iqTypeSet = "set"
iqTypeGet = "get"
iqTypeResult = "result"
iqTypeError = "error"
isIQOf (EventBeginElement name attrs) testType
| name=="{jabber:client}iq"
&& matchAttrib "type" testType attrs
= True
isIQOf _ _ = False
iq_service_unavailable host iq_id mjid req =
[ EventBeginElement "{jabber:client}iq"
[("type",[ContentText "error"])
,("id",[ContentText iq_id])
-- , TODO: set "from" if isJust mjid
]
, EventBeginElement req []
, EventEndElement req
, EventBeginElement "{jabber:client}error" [("type",[ContentText "cancel"])]
, EventBeginElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable" []
, EventEndElement "{urn:ietf:params:xml:ns:xmpp-stanzas}service-unavailable"
, EventEndElement "{jabber:client}error"
, EventEndElement "{jabber:client}iq"
]
getRoster session = do
budies <- getMyBuddies session
subscribers <- getMySubscribers session
return ([]::[Event]) -- TODO
handleIQGet session cmdChan tag = do
withJust (lookupAttrib "id" (tagAttrs tag)) $ \stanza_id -> do
whenJust nextElement $ \child -> do
host <- liftIO $ do
jid <- getJID session
names <- getNamesForPeer (peer jid)
return (S.decodeUtf8 . head $ names)
let unhandledGet req = do
liftIO $ putStrLn ("iq-get: "++show (stanza_id,child))
liftIO . atomically . writeTChan cmdChan . Send $ iq_service_unavailable host stanza_id Nothing req
case tagName child of
-- "{http://jabber.org/protocol/disco#items}query" -> liftIO $ putStrLn "iq-get-query-items"
"{urn:xmpp:ping}ping" -> liftIO $ do
let mjid = lookupAttrib "from" (tagAttrs tag)
let pong = [ EventBeginElement "{jabber:client}iq"
$ (case mjid of
Just jid -> (("to",[ContentText jid]):)
_ -> id)
[("type",[ContentText "result"])
,("id",[ContentText stanza_id])
,("from",[ContentText host])
]
, EventEndElement "{jabber:client}iq"
]
atomically . writeTChan cmdChan . Send $ pong
"{jabber:iq:roster}query" -> liftIO $ do
putStrLn $ "REQUESTED ROSTER " ++ show tag
-- REQUESTED ROSTER EventBeginElement
-- (Name {nameLocalName = "iq", nameNamespace = Just "jabber:client", namePrefix = Nothing})
-- [(Name { nameLocalName = "id"
-- , nameNamespace = Nothing
-- , namePrefix = Nothing},
-- [ContentText "32a337c2-7b22-45b6-9d21-15ded0d079ec"])
-- ,(Name {nameLocalName = "type", nameNamespace = Nothing, namePrefix = Nothing},
-- [ContentText "get"])]
roster <- getRoster session
atomically . writeTChan cmdChan . Send $ roster
req -> unhandledGet req
fromClient :: (MonadThrow m,MonadIO m, JabberClientSession session) =>
session -> TChan Commands -> Sink XML.Event m ()
fromClient session cmdChan = doNestingXML $ do
let log = liftIO . L.putStrLn . ("(C) " <++>)
send = liftIO . atomically . writeTChan cmdChan . Send
withXML $ \begindoc -> do
when (begindoc==EventBeginDocument) $ do
log "begin-doc"
withXML $ \xml -> do
withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
log $ "stream atributes: " <++> bshow stream_attrs
host <- liftIO $ do
jid <- getJID session
names <- getNamesForPeer (peer jid)
return (S.decodeUtf8 . head $ names)
send $ greet host
fix $ \loop -> do
log "waiting for stanza."
whenJust nextElement $ \stanza -> do
stanza_lvl <- nesting
let unhandledStanza = do
xs <- gatherElement stanza Seq.empty
prettyPrint "unhandled-C: " (toList xs)
case () of
_ | stanza `isIQOf` iqTypeSet -> handleIQSet session cmdChan stanza
_ | stanza `isIQOf` iqTypeGet -> handleIQGet session cmdChan stanza
_ | otherwise -> unhandledStanza
awaitCloser stanza_lvl
loop
log $ "end of stream"
withXML $ \xml -> do
log $ "end-of-document: " <++> bshow xml
prettyPrint prefix xs =
liftIO $ do
CL.sourceList xs $= renderBytes (def { rsPretty=True }) =$= CB.lines $$ CL.mapM_ (S.putStrLn . (prefix `S.append`))
toClient :: MonadIO m => TChan Presence -> TChan Commands -> Source m [XML.Event]
toClient pchan cmdChan = fix $ \loop -> do
let send xs = yield xs >> prettyPrint ">C: " xs
event <- liftIO . atomically $
orElse (fmap Left $ readTChan pchan)
(fmap Right $ readTChan cmdChan)
case event of
Right QuitThread -> return ()
Right (Send xs) -> send xs >> loop
Left presence -> do
xs <- liftIO $ xmlifyPresenceForClient presence
send xs
loop
handleClient
:: (SocketLike sock, HHead l (XMPPClass session),
JabberClientSession session) =>
HCons sock (HCons t l) -> Source IO ByteString -> Sink ByteString IO () -> IO ()
handleClient st src snk = do
let HCons sock (HCons _ st') = st
session_factory = hHead st'
pname <- getPeerName sock
session <- newSession session_factory sock
Prelude.putStrLn $ "PEER NAME: "++Prelude.show pname
pchan <- subscribe session Nothing
cmdChan <- atomically newTChan
#ifdef RENDERFLUSH
writer <- async ( toClient pchan cmdChan
$$ flushList
=$= renderBuilderFlush def
=$= builderToByteStringFlush
=$= discardFlush
=$ snk )
#else
writer <- async ( toClient pchan cmdChan $$ renderChunks =$ snk )
#endif
finally ( src $= parseBytes def $$ fromClient session cmdChan )
$ do
atomically $ writeTChan cmdChan QuitThread
wait writer
closeSession session
listenForXmppClients ::
(HList l, HHead l (XMPPClass session), HExtend e1 l2 l1,
HExtend e l1 (HCons PortNumber l), JabberClientSession session) =>
Family -> e1 -> e -> l2 -> IO ServerHandle
listenForXmppClients addr_family session_factory port st = do
doServer (addr_family .*. port .*. session_factory .*. st) handleClient
#ifdef RENDERFLUSH
flushList :: Monad m => ConduitM [a] (Flush a) m ()
flushList = fixMaybeT $ \loop -> do
xs <- mawait
lift ( CL.sourceList xs $$ CL.mapM_ (yield . Chunk) )
lift ( yield Flush )
loop
discardFlush :: Monad m => ConduitM (Flush a) a m ()
discardFlush = fixMaybeT $ \loop -> do
x <- mawait
let unchunk (Chunk a) = a
ischunk (Chunk _) = True
ischunk _ = False
lift . when (ischunk x) $ yield (unchunk x)
loop
#else
renderChunks :: (MonadUnsafeIO m, MonadIO m) => ConduitM [Event] ByteString m ()
renderChunks = fixMaybeT $ \loop -> do
xs <- mawait
lift . when (not . null $ xs) $ ( CL.sourceList xs $= renderBytes def $$ CL.mapM_ yield )
loop
#endif
listenForRemotePeers
:: (HList l, HHead l (XMPPPeerClass session),
HExtend e l1 (HCons PortNumber l), HExtend e1 l2 l1,
JabberPeerSession session) =>
Family -> e1 -> e -> l2 -> IO ServerHandle
listenForRemotePeers addrfamily session_factory port st = do
doServer (addrfamily .*. port .*. session_factory .*. st) handlePeer
handlePeer
:: (SocketLike sock, HHead l (XMPPPeerClass session),
JabberPeerSession session) =>
HCons sock (HCons t1 l) -> Source IO ByteString -> t -> IO ()
handlePeer st src snk = do
let HCons sock (HCons _ st') = st
session_factory = hHead st'
name <- fmap bshow $ getPeerName sock
L.putStrLn $ "(P) connected " <++> name
session <- newPeerSession session_factory sock
finally ( src $= parseBytes def $$ fromPeer session )
$ do
L.putStrLn $ "(P) disconnected " <++> name
closePeerSession session
handlePeerPresence session stanza False = do
-- Offline
withJust (lookupAttrib "from" (tagAttrs stanza)) $ \jid -> do
peer_jid <- liftIO $ parseAddressJID (L.fromChunks [S.encodeUtf8 jid])
liftIO $ announcePresence session (Presence peer_jid Offline)
handlePeerPresence session stanza True = do
-- online (Available or Away)
let log = liftIO . L.putStrLn . ("(P) " <++>)
withJust (lookupAttrib "from" (tagAttrs stanza)) $ \jid -> do
pjid <- liftIO $ parseAddressJID (L.fromChunks [S.encodeUtf8 jid])
-- stat <- show element content
let parseChildren stat = do
child <- nextElement
case child of
Just tag | tagName tag=="{jabber:server}show"
-> fmap toStat (lift content)
Just tag | otherwise -> parseChildren stat
Nothing -> return stat
toStat "away" = Away
toStat "xa" = Away -- TODO: xa
toStat "dnd" = Away -- TODO: dnd
toStat "chat" = Available
stat' <- parseChildren Available
liftIO $ announcePresence session (Presence pjid stat')
log $ bshow (Presence pjid stat')
matchAttribMaybe name (Just value) attrs =
case find ( (==name) . fst) attrs of
Just (_,[ContentText x]) | x==value -> True
Just (_,[ContentEntity x]) | x==value -> True
_ -> False
matchAttribMaybe name Nothing attrs
| find ( (==name) . fst) attrs==Nothing
= True
matchAttribMaybe name Nothing attrs
| otherwise
= False
presenceTypeOffline = Just "unavailable"
presenceTypeOnline = Nothing
presenceTypeProbe = Just "probe"
isPresenceOf (EventBeginElement name attrs) testType
| name=="{jabber:server}presence"
&& matchAttribMaybe "type" testType attrs
= True
isPresenceOf _ _ = False
handlePresenceProbe session stanza = do
withJust (lookupAttrib "to" (tagAttrs stanza)) $ \to -> do
-- withJust (lookupAttrib "from" (tagAttrs stanza)) $ \from -> do
jid <- liftIO $ parseAddressJID $ L.fromChunks [S.encodeUtf8 to]
withJust (name jid) $ \user -> do
liftIO $ L.putStrLn $ "RECEIVED PROBE "<++>bshow (peerAddress session,to)
liftIO $ do
subs <- getSubscribers (peerSessionFactory session) user
liftIO $ L.putStrLn $ "subscribers for "<++>bshow user<++>": " <++>bshow subs
forM_ subs $ \jidstr -> do
handle (\(IOError _ _ _ _ _ _) -> return ()) $ do
-- handle (\(SomeException _) -> return ()) $ do
L.putStrLn $ "parsing " <++>jidstr
sub <- parseHostNameJID jidstr
putStrLn $ "comparing " ++show (peer sub , peerAddress session)
when (peer sub == peerAddress session) $ do
ps <- userStatus session user
mapM_ (announcePresence session) ps
return ()
fromPeer :: (MonadThrow m,MonadIO m, JabberPeerSession session) =>
session -> Sink XML.Event m ()
fromPeer session = doNestingXML $ do
let log = liftIO . L.putStrLn . ("(P) " <++>)
withXML $ \begindoc -> do
when (begindoc==EventBeginDocument) $ do
log "begin-doc"
withXML $ \xml -> do
withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
log $ "stream atributes: " <++> bshow stream_attrs
fix $ \loop -> do
log "waiting for stanza."
whenJust nextElement $ \stanza -> do
stanza_lvl <- nesting
let unhandledStanza = do
xs <- gatherElement stanza Seq.empty
prettyPrint "P: " (toList xs)
case () of
_ | stanza `isPresenceOf` presenceTypeOnline
-> handlePeerPresence session stanza True
_ | stanza `isPresenceOf` presenceTypeOffline
-> handlePeerPresence session stanza False
_ | stanza `isPresenceOf` presenceTypeProbe
-> handlePresenceProbe session stanza
_ -> unhandledStanza
awaitCloser stanza_lvl
loop
log $ "end of stream"
withXML $ \xml -> do
log $ "end-of-document: " <++> bshow xml
data OutBoundMessage = OutBoundPresence Presence | PresenceProbe JID JID
deriving Prelude.Show
newServerConnections = newTVar Map.empty
data CachedMessages = CachedMessages
{ presences :: Map JID JabberShow
, probes :: Map JID (Set JID)
}
connect_to_server chan peer = (>> return ()) . runMaybeT $ do
let port = 5269 :: Int
-- We'll cache Presence notifications until the socket
-- is ready.
cached <- liftIO $ newIORef (CachedMessages Map.empty Map.empty)
let cacheCmd (OutBoundPresence (Presence jid Offline)) cached = do
cache <- readIORef cached
writeIORef cached (cache { presences=Map.delete jid . presences $ cache })
cacheCmd (OutBoundPresence p@(Presence jid st)) cached = do
cache <- readIORef cached
writeIORef cached (cache { presences=Map.insert jid st . presences $ cache })
cacheCmd (PresenceProbe from to) cached = do
cache <- readIORef cached
let probes' = Map.adjust (Set.insert from) to $ probes cache
writeIORef cached (cache { probes=probes' })
fix $ \sendmsgs -> do
connected <- liftIO . async $ connect' (peerAddr peer) port
sock <- MaybeT . fix $ \loop -> do
e <- atomically $ orElse
(fmap Right $ waitSTM connected)
(fmap Left $ readTChan chan)
case e of
Left cmd -> cacheCmd cmd cached >> loop
Right sock -> return sock
retry <- do
(cache,snk) <- liftIO $ do
h <- socketToHandle sock ReadWriteMode
hSetBuffering h NoBuffering
cache <- readIORef $ cached
-- hint garbage collector: we're done with this...
writeIORef cached (CachedMessages Map.empty Map.empty)
return (cache,packetSink h)
MaybeT $ handleOutgoingToPeer (restrictSocket sock) cache chan snk
liftIO $ cacheCmd retry cached
liftIO $ putStrLn $ "retrying " ++ show retry
sendmsgs
greetPeer =
[ EventBeginDocument
, EventBeginElement (streamP "stream")
[("xmlns",[ContentText "jabber:server"])
,("version",[ContentText "1.0"])
]
]
goodbyePeer =
[ EventEndElement (streamP "stream")
, EventEndDocument
]
presenceProbe sock fromjid tojid = do
addr <- getSocketName sock
let jidstr jid = toStrict . L.decodeUtf8
$ name jid <$++> "@"
++> showPeer (RemotePeer addr)
<++?> "/" <++$> resource jid
from = jidstr fromjid
to = toStrict . L.decodeUtf8
$ name tojid <$++> "@"
++> showPeer (peer tojid)
return
[ EventBeginElement "{jabber:server}presence"
[("from",[ContentText from])
,("to",[ContentText to])
,("type",[ContentText "probe"])
]
, EventEndElement "{jabber:server}presence"
]
{-
toPeerChain
:: SocketLike sock =>
sock
-> CachedMessages
-> TChan OutBoundMessage
-> Sink ByteString IO b
-> IO b
toPeerChain sock cache chan snk = toPeer sock cache chan $$ renderChunks =$ snk
-}
toPeer
:: SocketLike sock =>
sock
-> CachedMessages
-> TChan OutBoundMessage
-> (Maybe OutBoundMessage -> IO ())
-> ConduitM i [Event] IO ()
toPeer sock cache chan fail = do
let -- log = liftIO . L.putStrLn . ("(>P) " <++>)
send xs = yield xs >> prettyPrint ">P: " xs -- >> return (3::Int)
checkConnection cmd = do
liftIO $ catch (getPeerName sock >> return ())
(\_ -> fail . Just $ cmd)
sendPresence presence = do
r <- lift $ xmlifyPresenceForPeer sock presence
{-
liftIO $ do
p' <- catch (fmap (Just . RemotePeer) $ getPeerName sock)
(\_ -> (fail . Just . OutBoundPresence $ presence) >> return Nothing)
L.putStrLn $ "sending Presence to " <++?> fmap showPeer p'
-}
let cmd = OutBoundPresence presence
checkConnection cmd
yieldOr r (fail . Just $ cmd)
prettyPrint ">P: " r
sendProbe from to = do
r <- liftIO $ presenceProbe sock from to
let cmd = PresenceProbe from to
checkConnection cmd
yieldOr r (fail . Just $ cmd)
prettyPrint ">P: " r
send greetPeer
forM_ (Map.assocs . presences $ cache) $ \(jid,st) -> do
sendPresence (Presence jid st)
forM_ (Map.assocs . probes $ cache) $ \(to,froms) -> do
forM_ (Set.toList froms) $ \from -> do
liftIO $ L.putStrLn "sending cached probe..."
sendProbe from to
fix $ \loop -> do
event <- lift . atomically $ readTChan chan
case event of
OutBoundPresence p -> sendPresence p
PresenceProbe from to -> do
liftIO $ L.putStrLn "sending live probe..."
sendProbe from to
loop
send goodbyePeer
handleOutgoingToPeer
:: SocketLike sock =>
sock
-> CachedMessages
-> TChan OutBoundMessage
-> Sink ByteString IO ()
-> IO (Maybe OutBoundMessage)
handleOutgoingToPeer sock cache chan snk = do
p <- getPeerName sock
L.putStrLn $ "(>P) connected " <++> showPeer (RemotePeer p)
failed <- newIORef Nothing
let failure cmd = do
writeIORef failed cmd
putStrLn $ "Failed: " ++ show cmd
finally (
#ifdef RENDERFLUSH
handle (\(IOError _ _ _ _ _ _) -> return ()) $
toPeer sock cache chan failure
$$ flushList
=$= renderBuilderFlush def
=$= builderToByteStringFlush
=$= discardFlush
=$ snk
#else
handle (\(IOError _ _ _ _ _ _) -> return ()) $ toPeer sock cache chan failure $$ renderChunks =$ snk
#endif
) $ L.putStrLn $ "(>P) disconnected " <++> showPeer (RemotePeer p)
readIORef failed
connect' :: SockAddr -> Int -> IO (Maybe Socket)
connect' addr port = do
proto <- getProtocolNumber "tcp"
{-
-- Given (host :: HostName) ...
let hints = defaultHints { addrFlags = [AI_ADDRCONFIG]
, addrProtocol = proto
, addrSocketType = Stream }
addrs <- getAddrInfo (Just hints) (Just host) (Just serv)
firstSuccessful $ map tryToConnect addrs
-}
let getport (SockAddrInet port _) = port
getport (SockAddrInet6 port _ _ _) = port
let doException e@(IOError _ _ _ _ _ _) = do
-- let doException (SomeException e) = do
L.putStrLn $ "\nFailed to reach "<++> showPeer (RemotePeer addr) <++> " on port "<++>bshow port<++>": " <++> bshow e
return Nothing
handle doException
$ tryToConnect proto (addr `withPort` port)
where
tryToConnect proto addr =
bracketOnError
(socket (socketFamily addr) Stream proto)
(sClose ) -- only done if there's an error
(\sock -> do
connect sock addr
return (Just sock) -- socketToHandle sock ReadWriteMode
)
sendMessage :: TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> OutBoundMessage -> Peer -> IO ()
sendMessage cons msg peer = do
found <- atomically $ do
consmap <- readTVar cons
return (Map.lookup peer consmap)
let newEntry = do
chan <- atomically newTChan
t <- forkIO $ connect_to_server chan peer
-- L.putStrLn $ "remote-map new: " <++> showPeer peer
return (True,(chan,t))
(is_new,entry) <- maybe newEntry
( \(chan,t) -> do
st <- threadStatus t
let running = do
-- L.putStrLn $ "remote-map, thread running: " <++> showPeer peer
return (False,(chan,t))
died = do
-- L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> showPeer peer
newEntry
case st of
ThreadRunning -> running
ThreadBlocked _ -> running
ThreadDied -> died
ThreadFinished -> died
)
found
-- L.putStrLn $ "sendMessage ->"<++>showPeer peer<++>": "<++>bshow msg
atomically $ writeTChan (fst entry) msg
when is_new . atomically $
readTVar cons >>= writeTVar cons . Map.insert peer entry
seekRemotePeers :: JabberPeerSession config =>
XMPPPeerClass config -> TChan Presence -> TVar (Map Peer (TChan OutBoundMessage, ThreadId)) -> IO b0
seekRemotePeers config chan server_connections = do
fix $ \loop -> do
event <- atomically $ readTChan chan
case event of
p@(Presence jid stat) | not (is_remote (peer jid)) -> do
-- L.putStrLn $ "seekRemotePeers: " <++> L.show jid <++> " " <++> bshow stat
runMaybeT $ do
u <- MaybeT . return $ name jid
subscribers <- liftIO $ do
subs <- getSubscribers config u
mapM parseHostNameJID subs
-- liftIO . L.putStrLn $ "subscribers: " <++> bshow subscribers
let peers = Set.map peer (Set.fromList subscribers)
forM_ (Set.toList peers) $ \peer -> do
when (is_remote peer) $
liftIO $ sendMessage server_connections (OutBoundPresence p) peer
-- TODO: send presence probes for buddies
-- TODO: cache remote presences for clients
_ -> return (Just ())
loop
xmlifyPresenceForPeer sock (Presence jid stat) = do
-- TODO: accept socket argument and determine local ip address
-- connected to this peer.
addr <- getSocketName sock
let n = name jid
rsc = resource jid
jidstr = toStrict . L.decodeUtf8
$ n <$++> "@" ++> showPeer (RemotePeer addr) <++?> "/" <++$> rsc
return
[ EventBeginElement "{jabber:server}presence"
(("from",[ContentText jidstr]):typ stat)
, EventBeginElement "{jabber:server}show" []
, EventContent (ContentText . shw $ stat)
, EventEndElement "{jabber:server}show"
, EventEndElement "{jabber:server}presence"
]
where
typ Offline = [("type",[ContentText "unavailable"])]
typ _ = []
shw Available = "chat"
shw Away = "away"
shw Offline = "away" -- Is this right?