diff options
author | joe <joe@jerkface.net> | 2017-11-16 21:01:52 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-16 21:01:52 -0500 |
commit | d0311af98728855128b6417a4a591186a42345f9 (patch) | |
tree | 0c0c7e7882f4f8bd52a702872f73c9502cb949e0 /Presence/XMPPServer.hs | |
parent | 0abae348d1bec63e964a52b2c5b513048225d4a4 (diff) |
Changed Connection event to provide XML.Event streams.
Diffstat (limited to 'Presence/XMPPServer.hs')
-rw-r--r-- | Presence/XMPPServer.hs | 42 |
1 files changed, 27 insertions, 15 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 362cc8a8..b63f04c3 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -46,7 +46,12 @@ import Control.Monad.Trans (lift) | |||
46 | import Control.Monad.IO.Class (MonadIO, liftIO) | 46 | import Control.Monad.IO.Class (MonadIO, liftIO) |
47 | import Control.Monad.Fix (fix) | 47 | import Control.Monad.Fix (fix) |
48 | import Control.Monad | 48 | import Control.Monad |
49 | import Control.Concurrent (forkIO) | 49 | #ifdef THREAD_DEBUG |
50 | import Control.Concurrent.Lifted.Instrument (forkIO,myThreadId,labelThread) | ||
51 | #else | ||
52 | import Control.Concurrent.Lifted (forkIO,myThreadId) | ||
53 | import GHC.Conc (labelThread) | ||
54 | #endif | ||
50 | import Control.Concurrent.STM | 55 | import Control.Concurrent.STM |
51 | -- import Control.Concurrent.STM.TChan | 56 | -- import Control.Concurrent.STM.TChan |
52 | import Network.SocketLike | 57 | import Network.SocketLike |
@@ -80,6 +85,7 @@ import qualified System.Random | |||
80 | import Data.Void (Void) | 85 | import Data.Void (Void) |
81 | import System.Endian (toBE32) | 86 | import System.Endian (toBE32) |
82 | import Control.Applicative | 87 | import Control.Applicative |
88 | import System.IO | ||
83 | 89 | ||
84 | peerport :: PortNumber | 90 | peerport :: PortNumber |
85 | peerport = 5269 | 91 | peerport = 5269 |
@@ -873,7 +879,7 @@ makePong namespace mid to from = | |||
873 | ] | 879 | ] |
874 | 880 | ||
875 | 881 | ||
876 | xmppInbound :: Server ConnectionKey SockAddr ReleaseKey | 882 | xmppInbound :: Server ConnectionKey SockAddr ReleaseKey XML.Event |
877 | -> XMPPServerParameters | 883 | -> XMPPServerParameters |
878 | -> ConnectionKey | 884 | -> ConnectionKey |
879 | -> SockAddr | 885 | -> SockAddr |
@@ -1175,7 +1181,7 @@ presenceStanza stanza_type type_attr me jid = | |||
1175 | ] | 1181 | ] |
1176 | , EventEndElement "{jabber:server}presence" ] | 1182 | , EventEndElement "{jabber:server}presence" ] |
1177 | 1183 | ||
1178 | forkConnection :: Server ConnectionKey SockAddr ReleaseKey | 1184 | forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event |
1179 | -> XMPPServerParameters | 1185 | -> XMPPServerParameters |
1180 | -> ConnectionKey | 1186 | -> ConnectionKey |
1181 | -> SockAddr | 1187 | -> SockAddr |
@@ -1235,7 +1241,8 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
1235 | ,readTMVar rdone >> return (return ()) | 1241 | ,readTMVar rdone >> return (return ()) |
1236 | ] | 1242 | ] |
1237 | what | 1243 | what |
1238 | forkIO $ do (greet_src >> slot_src) $$ snk | 1244 | forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k) |
1245 | (greet_src >> slot_src) $$ snk | ||
1239 | last <- atomically $ readTVar lastStanza | 1246 | last <- atomically $ readTVar lastStanza |
1240 | es <- while (atomically . fmap not $ Slotted.isEmpty slots) | 1247 | es <- while (atomically . fmap not $ Slotted.isEmpty slots) |
1241 | (atomically . Slotted.pull $ slots) | 1248 | (atomically . Slotted.pull $ slots) |
@@ -1259,11 +1266,13 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
1259 | -- TODO: Probably some stanzas should be queued or saved for re-connect. | 1266 | -- TODO: Probably some stanzas should be queued or saved for re-connect. |
1260 | mapM_ fail $ filter notError (maybeToList last ++ es') | 1267 | mapM_ fail $ filter notError (maybeToList last ++ es') |
1261 | wlog $ "end post-queue fork: " ++ show k | 1268 | wlog $ "end post-queue fork: " ++ show k |
1269 | |||
1262 | output <- atomically newTChan | 1270 | output <- atomically newTChan |
1263 | hacks <- atomically $ newTVar Map.empty | 1271 | hacks <- atomically $ newTVar Map.empty |
1264 | msgids <- atomically $ newTVar [] | 1272 | msgids <- atomically $ newTVar [] |
1265 | forkIO $ do | 1273 | forkIO $ do |
1266 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | 1274 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer |
1275 | myThreadId >>= flip labelThread ("pre-queue."++show k) | ||
1267 | verbosity <- xmppVerbosity xmpp | 1276 | verbosity <- xmppVerbosity xmpp |
1268 | fix $ \loop -> do | 1277 | fix $ \loop -> do |
1269 | what <- atomically $ foldr1 orElse | 1278 | what <- atomically $ foldr1 orElse |
@@ -1338,6 +1347,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
1338 | what | 1347 | what |
1339 | wlog $ "end pre-queue fork: " ++ show k | 1348 | wlog $ "end pre-queue fork: " ++ show k |
1340 | forkIO $ do | 1349 | forkIO $ do |
1350 | myThreadId >>= flip labelThread ("reader."++show k) | ||
1341 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | 1351 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) |
1342 | src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone | 1352 | src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone |
1343 | atomically $ putTMVar rdone () | 1353 | atomically $ putTMVar rdone () |
@@ -1437,7 +1447,7 @@ sendRoster query xmpp replyto = do | |||
1437 | -} | 1447 | -} |
1438 | 1448 | ||
1439 | 1449 | ||
1440 | socketFromKey :: Server ConnectionKey SockAddr ReleaseKey -> ConnectionKey -> IO SockAddr | 1450 | socketFromKey :: Server ConnectionKey SockAddr ReleaseKey XML.Event -> ConnectionKey -> IO SockAddr |
1441 | socketFromKey sv k = do | 1451 | socketFromKey sv k = do |
1442 | map <- atomically $ readTVar (conmap sv) | 1452 | map <- atomically $ readTVar (conmap sv) |
1443 | let mcd = Map.lookup k map | 1453 | let mcd = Map.lookup k map |
@@ -1607,7 +1617,7 @@ makeErrorStanza stanza = do | |||
1607 | ] | 1617 | ] |
1608 | 1618 | ||
1609 | monitor :: | 1619 | monitor :: |
1610 | Server ConnectionKey SockAddr ReleaseKey | 1620 | Server ConnectionKey SockAddr ReleaseKey XML.Event |
1611 | -> ConnectionParameters ConnectionKey SockAddr | 1621 | -> ConnectionParameters ConnectionKey SockAddr |
1612 | -> XMPPServerParameters | 1622 | -> XMPPServerParameters |
1613 | -> IO b | 1623 | -> IO b |
@@ -1619,12 +1629,10 @@ monitor sv params xmpp = do | |||
1619 | action <- atomically $ foldr1 orElse | 1629 | action <- atomically $ foldr1 orElse |
1620 | [ readTChan chan >>= \((k,u),e) -> return $ do | 1630 | [ readTChan chan >>= \((k,u),e) -> return $ do |
1621 | case e of | 1631 | case e of |
1622 | Connection pingflag conread conwrite -> do | 1632 | Connection pingflag xsrc xsnk -> do |
1623 | wlog $ tomsg k "Connection" | 1633 | wlog $ tomsg k "Connection" |
1624 | let (xsrc,xsnk) = xmlStream conread conwrite | 1634 | outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas |
1625 | outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas | 1635 | xmppNewConnection xmpp k u outs |
1626 | xmppNewConnection xmpp k u outs | ||
1627 | return () | ||
1628 | ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" | 1636 | ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" |
1629 | EOF -> do wlog $ tomsg k "EOF" | 1637 | EOF -> do wlog $ tomsg k "EOF" |
1630 | xmppEOF xmpp k | 1638 | xmppEOF xmpp k |
@@ -1775,7 +1783,7 @@ monitor sv params xmpp = do | |||
1775 | _ = str :: String | 1783 | _ = str :: String |
1776 | 1784 | ||
1777 | data XMPPServer | 1785 | data XMPPServer |
1778 | = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey | 1786 | = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey XML.Event |
1779 | , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr | 1787 | , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr |
1780 | } | 1788 | } |
1781 | 1789 | ||
@@ -1787,7 +1795,7 @@ xmppServer :: ( MonadResource m | |||
1787 | , MonadIO m | 1795 | , MonadIO m |
1788 | ) => XMPPServerParameters -> m XMPPServer | 1796 | ) => XMPPServerParameters -> m XMPPServer |
1789 | xmppServer xmpp = do | 1797 | xmppServer xmpp = do |
1790 | sv <- server allocate | 1798 | sv <- server allocate xmlStream |
1791 | -- some fuzz helps avoid simultaneity | 1799 | -- some fuzz helps avoid simultaneity |
1792 | pingfuzz <- liftIO $ do | 1800 | pingfuzz <- liftIO $ do |
1793 | gen <- System.Random.getStdGen | 1801 | gen <- System.Random.getStdGen |
@@ -1803,8 +1811,12 @@ xmppServer xmpp = do | |||
1803 | , timeout = 0 | 1811 | , timeout = 0 |
1804 | } | 1812 | } |
1805 | liftIO $ do | 1813 | liftIO $ do |
1806 | forkIO $ monitor sv peer_params xmpp | 1814 | forkIO $ do |
1815 | myThreadId >>= flip labelThread ("XMPP.monitor") | ||
1816 | monitor sv peer_params xmpp | ||
1817 | hPutStrLn stderr $ "Starting peer listen" | ||
1807 | control sv (Listen peerport peer_params) | 1818 | control sv (Listen peerport peer_params) |
1819 | hPutStrLn stderr $ "Starting client listen" | ||
1808 | control sv (Listen clientport client_params) | 1820 | control sv (Listen clientport client_params) |
1809 | return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params } | 1821 | return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params } |
1810 | 1822 | ||