summaryrefslogtreecommitdiff
path: root/Presence/XMPPServer.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
committerjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
commitd0311af98728855128b6417a4a591186a42345f9 (patch)
tree0c0c7e7882f4f8bd52a702872f73c9502cb949e0 /Presence/XMPPServer.hs
parent0abae348d1bec63e964a52b2c5b513048225d4a4 (diff)
Changed Connection event to provide XML.Event streams.
Diffstat (limited to 'Presence/XMPPServer.hs')
-rw-r--r--Presence/XMPPServer.hs42
1 files changed, 27 insertions, 15 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 362cc8a8..b63f04c3 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -46,7 +46,12 @@ import Control.Monad.Trans (lift)
46import Control.Monad.IO.Class (MonadIO, liftIO) 46import Control.Monad.IO.Class (MonadIO, liftIO)
47import Control.Monad.Fix (fix) 47import Control.Monad.Fix (fix)
48import Control.Monad 48import Control.Monad
49import Control.Concurrent (forkIO) 49#ifdef THREAD_DEBUG
50import Control.Concurrent.Lifted.Instrument (forkIO,myThreadId,labelThread)
51#else
52import Control.Concurrent.Lifted (forkIO,myThreadId)
53import GHC.Conc (labelThread)
54#endif
50import Control.Concurrent.STM 55import Control.Concurrent.STM
51-- import Control.Concurrent.STM.TChan 56-- import Control.Concurrent.STM.TChan
52import Network.SocketLike 57import Network.SocketLike
@@ -80,6 +85,7 @@ import qualified System.Random
80import Data.Void (Void) 85import Data.Void (Void)
81import System.Endian (toBE32) 86import System.Endian (toBE32)
82import Control.Applicative 87import Control.Applicative
88import System.IO
83 89
84peerport :: PortNumber 90peerport :: PortNumber
85peerport = 5269 91peerport = 5269
@@ -873,7 +879,7 @@ makePong namespace mid to from =
873 ] 879 ]
874 880
875 881
876xmppInbound :: Server ConnectionKey SockAddr ReleaseKey 882xmppInbound :: Server ConnectionKey SockAddr ReleaseKey XML.Event
877 -> XMPPServerParameters 883 -> XMPPServerParameters
878 -> ConnectionKey 884 -> ConnectionKey
879 -> SockAddr 885 -> SockAddr
@@ -1175,7 +1181,7 @@ presenceStanza stanza_type type_attr me jid =
1175 ] 1181 ]
1176 , EventEndElement "{jabber:server}presence" ] 1182 , EventEndElement "{jabber:server}presence" ]
1177 1183
1178forkConnection :: Server ConnectionKey SockAddr ReleaseKey 1184forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event
1179 -> XMPPServerParameters 1185 -> XMPPServerParameters
1180 -> ConnectionKey 1186 -> ConnectionKey
1181 -> SockAddr 1187 -> SockAddr
@@ -1235,7 +1241,8 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1235 ,readTMVar rdone >> return (return ()) 1241 ,readTMVar rdone >> return (return ())
1236 ] 1242 ]
1237 what 1243 what
1238 forkIO $ do (greet_src >> slot_src) $$ snk 1244 forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k)
1245 (greet_src >> slot_src) $$ snk
1239 last <- atomically $ readTVar lastStanza 1246 last <- atomically $ readTVar lastStanza
1240 es <- while (atomically . fmap not $ Slotted.isEmpty slots) 1247 es <- while (atomically . fmap not $ Slotted.isEmpty slots)
1241 (atomically . Slotted.pull $ slots) 1248 (atomically . Slotted.pull $ slots)
@@ -1259,11 +1266,13 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1259 -- TODO: Probably some stanzas should be queued or saved for re-connect. 1266 -- TODO: Probably some stanzas should be queued or saved for re-connect.
1260 mapM_ fail $ filter notError (maybeToList last ++ es') 1267 mapM_ fail $ filter notError (maybeToList last ++ es')
1261 wlog $ "end post-queue fork: " ++ show k 1268 wlog $ "end post-queue fork: " ++ show k
1269
1262 output <- atomically newTChan 1270 output <- atomically newTChan
1263 hacks <- atomically $ newTVar Map.empty 1271 hacks <- atomically $ newTVar Map.empty
1264 msgids <- atomically $ newTVar [] 1272 msgids <- atomically $ newTVar []
1265 forkIO $ do 1273 forkIO $ do
1266 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer 1274 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
1275 myThreadId >>= flip labelThread ("pre-queue."++show k)
1267 verbosity <- xmppVerbosity xmpp 1276 verbosity <- xmppVerbosity xmpp
1268 fix $ \loop -> do 1277 fix $ \loop -> do
1269 what <- atomically $ foldr1 orElse 1278 what <- atomically $ foldr1 orElse
@@ -1338,6 +1347,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1338 what 1347 what
1339 wlog $ "end pre-queue fork: " ++ show k 1348 wlog $ "end pre-queue fork: " ++ show k
1340 forkIO $ do 1349 forkIO $ do
1350 myThreadId >>= flip labelThread ("reader."++show k)
1341 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) 1351 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
1342 src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone 1352 src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone
1343 atomically $ putTMVar rdone () 1353 atomically $ putTMVar rdone ()
@@ -1437,7 +1447,7 @@ sendRoster query xmpp replyto = do
1437 -} 1447 -}
1438 1448
1439 1449
1440socketFromKey :: Server ConnectionKey SockAddr ReleaseKey -> ConnectionKey -> IO SockAddr 1450socketFromKey :: Server ConnectionKey SockAddr ReleaseKey XML.Event -> ConnectionKey -> IO SockAddr
1441socketFromKey sv k = do 1451socketFromKey sv k = do
1442 map <- atomically $ readTVar (conmap sv) 1452 map <- atomically $ readTVar (conmap sv)
1443 let mcd = Map.lookup k map 1453 let mcd = Map.lookup k map
@@ -1607,7 +1617,7 @@ makeErrorStanza stanza = do
1607 ] 1617 ]
1608 1618
1609monitor :: 1619monitor ::
1610 Server ConnectionKey SockAddr ReleaseKey 1620 Server ConnectionKey SockAddr ReleaseKey XML.Event
1611 -> ConnectionParameters ConnectionKey SockAddr 1621 -> ConnectionParameters ConnectionKey SockAddr
1612 -> XMPPServerParameters 1622 -> XMPPServerParameters
1613 -> IO b 1623 -> IO b
@@ -1619,12 +1629,10 @@ monitor sv params xmpp = do
1619 action <- atomically $ foldr1 orElse 1629 action <- atomically $ foldr1 orElse
1620 [ readTChan chan >>= \((k,u),e) -> return $ do 1630 [ readTChan chan >>= \((k,u),e) -> return $ do
1621 case e of 1631 case e of
1622 Connection pingflag conread conwrite -> do 1632 Connection pingflag xsrc xsnk -> do
1623 wlog $ tomsg k "Connection" 1633 wlog $ tomsg k "Connection"
1624 let (xsrc,xsnk) = xmlStream conread conwrite 1634 outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas
1625 outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas 1635 xmppNewConnection xmpp k u outs
1626 xmppNewConnection xmpp k u outs
1627 return ()
1628 ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" 1636 ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure"
1629 EOF -> do wlog $ tomsg k "EOF" 1637 EOF -> do wlog $ tomsg k "EOF"
1630 xmppEOF xmpp k 1638 xmppEOF xmpp k
@@ -1775,7 +1783,7 @@ monitor sv params xmpp = do
1775 _ = str :: String 1783 _ = str :: String
1776 1784
1777data XMPPServer 1785data XMPPServer
1778 = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey 1786 = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey XML.Event
1779 , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr 1787 , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr
1780 } 1788 }
1781 1789
@@ -1787,7 +1795,7 @@ xmppServer :: ( MonadResource m
1787 , MonadIO m 1795 , MonadIO m
1788 ) => XMPPServerParameters -> m XMPPServer 1796 ) => XMPPServerParameters -> m XMPPServer
1789xmppServer xmpp = do 1797xmppServer xmpp = do
1790 sv <- server allocate 1798 sv <- server allocate xmlStream
1791 -- some fuzz helps avoid simultaneity 1799 -- some fuzz helps avoid simultaneity
1792 pingfuzz <- liftIO $ do 1800 pingfuzz <- liftIO $ do
1793 gen <- System.Random.getStdGen 1801 gen <- System.Random.getStdGen
@@ -1803,8 +1811,12 @@ xmppServer xmpp = do
1803 , timeout = 0 1811 , timeout = 0
1804 } 1812 }
1805 liftIO $ do 1813 liftIO $ do
1806 forkIO $ monitor sv peer_params xmpp 1814 forkIO $ do
1815 myThreadId >>= flip labelThread ("XMPP.monitor")
1816 monitor sv peer_params xmpp
1817 hPutStrLn stderr $ "Starting peer listen"
1807 control sv (Listen peerport peer_params) 1818 control sv (Listen peerport peer_params)
1819 hPutStrLn stderr $ "Starting client listen"
1808 control sv (Listen clientport client_params) 1820 control sv (Listen clientport client_params)
1809 return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params } 1821 return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params }
1810 1822