summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
committerjoe <joe@jerkface.net>2017-11-16 21:01:52 -0500
commitd0311af98728855128b6417a4a591186a42345f9 (patch)
tree0c0c7e7882f4f8bd52a702872f73c9502cb949e0 /Presence
parent0abae348d1bec63e964a52b2c5b513048225d4a4 (diff)
Changed Connection event to provide XML.Event streams.
Diffstat (limited to 'Presence')
-rw-r--r--Presence/Server.hs83
-rw-r--r--Presence/XMPPServer.hs42
2 files changed, 64 insertions, 61 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 5f85c262..46b32b81 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -24,6 +24,7 @@ module Server where
24 24
25import Data.ByteString (ByteString,hGetNonBlocking) 25import Data.ByteString (ByteString,hGetNonBlocking)
26import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack) 26import qualified Data.ByteString.Char8 as S -- ( hPutStrLn, hPutStr, pack)
27import Data.Conduit ( Source, Sink, Flush )
27#if MIN_VERSION_containers(0,5,0) 28#if MIN_VERSION_containers(0,5,0)
28import qualified Data.Map.Strict as Map 29import qualified Data.Map.Strict as Map
29import Data.Map.Strict (Map) 30import Data.Map.Strict (Map)
@@ -160,9 +161,7 @@ data InOrOut = In | Out
160-- | These events may be read from 'serverEvent' TChannel. 161-- | These events may be read from 'serverEvent' TChannel.
161-- 162--
162data ConnectionEvent b 163data ConnectionEvent b
163 = Got b 164 = Connection (STM Bool) (Source IO b) (Sink (Flush b) IO ())
164 -- ^ Arrival of data from a socket
165 | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool)
166 -- ^ A new connection was established 165 -- ^ A new connection was established
167 | ConnectFailure SockAddr 166 | ConnectFailure SockAddr
168 -- ^ A 'Connect' command failed. 167 -- ^ A 'Connect' command failed.
@@ -192,16 +191,16 @@ data ConnectionRecord u
192 191
193-- | This object accepts commands and signals events and maintains 192-- | This object accepts commands and signals events and maintains
194-- the list of currently listening ports and established connections. 193-- the list of currently listening ports and established connections.
195data Server a u releaseKey 194data Server a u releaseKey b
196 = Server { serverCommand :: TMVar (ServerInstruction a u) 195 = Server { serverCommand :: TMVar (ServerInstruction a u)
197 , serverEvent :: TChan ((a,u), ConnectionEvent ByteString) 196 , serverEvent :: TChan ((a,u), ConnectionEvent b)
198 , serverReleaseKey :: releaseKey 197 , serverReleaseKey :: releaseKey
199 , conmap :: TVar (Map a (ConnectionRecord u)) 198 , conmap :: TVar (Map a (ConnectionRecord u))
200 , listenmap :: TVar (Map PortNumber ServerHandle) 199 , listenmap :: TVar (Map PortNumber ServerHandle)
201 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay)) 200 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptibleDelay))
202 } 201 }
203 202
204control :: Server a u releaseKey -> ServerInstruction a u -> IO () 203control :: Server a u releaseKey b -> ServerInstruction a u -> IO ()
205control sv = atomically . putTMVar (serverCommand sv) 204control sv = atomically . putTMVar (serverCommand sv)
206 205
207type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b) 206type Allocate releaseKey m = forall b. IO b -> (b -> IO ()) -> m (releaseKey, b)
@@ -226,8 +225,12 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io
226-- > let loop = do 225-- > let loop = do
227-- > (_,event) <- atomically $ readTChan (serverEvent sv) 226-- > (_,event) <- atomically $ readTChan (serverEvent sv)
228-- > case event of 227-- > case event of
229-- > Got bytes -> putStrLn $ "got: " ++ show bytes 228-- > Connection getPingFlag readData writeData -> do
230-- > _ -> return () 229-- > forkIO $ do
230-- > fix $ \readLoop -> do
231-- > readData >>= mapM $ \bytes ->
232-- > putStrLn $ "got: " ++ show bytes
233-- > readLoop
231-- > case event of EOF -> return () 234-- > case event of EOF -> return ()
232-- > _ -> loop 235-- > _ -> loop
233-- > liftIO loop 236-- > liftIO loop
@@ -238,8 +241,10 @@ noCleanUp io _ = ( (,) () ) `liftM` liftIO io
238server :: 241server ::
239 -- forall (m :: * -> *) a u conkey releaseKey. 242 -- forall (m :: * -> *) a u conkey releaseKey.
240 (Show conkey, MonadIO m, Ord conkey) => 243 (Show conkey, MonadIO m, Ord conkey) =>
241 Allocate releaseKey m -> m (Server conkey u releaseKey) 244 Allocate releaseKey m
242server allocate = do 245 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
246 -> m (Server conkey u releaseKey x)
247server allocate sessionConduits = do
243 (key,cmds) <- allocate (atomically newEmptyTMVar) 248 (key,cmds) <- allocate (atomically newEmptyTMVar)
244 (atomically . flip putTMVar Quit) 249 (atomically . flip putTMVar Quit)
245 server <- liftIO . atomically $ do 250 server <- liftIO . atomically $ do
@@ -288,6 +293,8 @@ server allocate = do
288 `fmap` atomically (readTVar $ listenmap server) 293 `fmap` atomically (readTVar $ listenmap server)
289 when (not listening) $ do 294 when (not listening) $ do
290 295
296 hPutStrLn stderr $ "Started listening on "++show port
297
291 let family = AF_INET6 298 let family = AF_INET6
292 address = case family of 299 address = case family of
293 AF_INET -> SockAddrInet port iNADDR_ANY 300 AF_INET -> SockAddrInet port iNADDR_ANY
@@ -297,13 +304,14 @@ server allocate = do
297 { serverWarn = hPutStrLn stderr 304 { serverWarn = hPutStrLn stderr
298 , serverSession = \sock _ h -> do 305 , serverSession = \sock _ h -> do
299 (conkey,u) <- makeConnKey params sock 306 (conkey,u) <- makeConnKey params sock
300 _ <- newConnection server params conkey u h In 307 _ <- newConnection server sessionConduits params conkey u h In
301 return () 308 return ()
302 } 309 }
303 310
304 atomically $ listenmap server `modifyTVar'` Map.insert port sserv 311 atomically $ listenmap server `modifyTVar'` Map.insert port sserv
305 312
306 doit server (Ignore port) = do 313 doit server (Ignore port) = do
314 hPutStrLn stderr $ "Stopping listen on "++show port
307 mb <- atomically $ do 315 mb <- atomically $ do
308 map <- readTVar $ listenmap server 316 map <- readTVar $ listenmap server
309 modifyTVar' (listenmap server) $ Map.delete port 317 modifyTVar' (listenmap server) $ Map.delete port
@@ -342,7 +350,7 @@ server allocate = do
342 connect sock addr 350 connect sock addr
343 (conkey,u) <- makeConnKey params (restrictSocket sock) 351 (conkey,u) <- makeConnKey params (restrictSocket sock)
344 h <- socketToHandle sock ReadWriteMode 352 h <- socketToHandle sock ReadWriteMode
345 newConnection server params conkey u h Out 353 newConnection server sessionConduits params conkey u h Out
346 return () 354 return ()
347 355
348 doit server (ConnectWithEndlessRetry addr params interval) = do 356 doit server (ConnectWithEndlessRetry addr params interval) = do
@@ -380,7 +388,7 @@ server allocate = do
380 connect sock addr 388 connect sock addr
381 (conkey,u) <- makeConnKey params (restrictSocket sock) 389 (conkey,u) <- makeConnKey params (restrictSocket sock)
382 h <- socketToHandle sock ReadWriteMode 390 h <- socketToHandle sock ReadWriteMode
383 threads <- newConnection server params conkey u h Out 391 threads <- newConnection server sessionConduits params conkey u h Out
384 atomically $ do threadsWait threads 392 atomically $ do threadsWait threads
385 readTVar retryVar 393 readTVar retryVar
386 fin_utc <- getCurrentTime 394 fin_utc <- getCurrentTime
@@ -428,30 +436,29 @@ socketFamily (SockAddrInet6 _ _ _ _) = AF_INET6
428socketFamily (SockAddrUnix _) = AF_UNIX 436socketFamily (SockAddrUnix _) = AF_UNIX
429 437
430 438
431conevent :: ConnectionState -> ConnectionEvent b 439conevent :: ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
432conevent con = Connection pingflag read write 440 -> ConnectionState
441 -> ConnectionEvent x
442conevent sessionConduits con = Connection pingflag read write
433 where 443 where
434 pingflag = swapTVar (pingFlag (connPingTimer con)) False 444 pingflag = swapTVar (pingFlag (connPingTimer con)) False
435 read = connRead con 445 (read,write) = sessionConduits (connRead con) (connWrite con)
436 write = connWrite con
437 446
438newConnection :: Ord a 447newConnection :: Ord a
439 => Server a u1 releaseKey 448 => Server a u1 releaseKey x
449 -> ( IO (Maybe ByteString) -> (ByteString -> IO Bool) -> ( Source IO x, Sink (Flush x) IO () ) )
440 -> ConnectionParameters conkey u 450 -> ConnectionParameters conkey u
441 -> a 451 -> a
442 -> u1 452 -> u1
443 -> Handle 453 -> Handle
444 -> InOrOut 454 -> InOrOut
445 -> IO ConnectionThreads 455 -> IO ConnectionThreads
446newConnection server params conkey u h inout = do 456newConnection server sessionConduits params conkey u h inout = do
447 hSetBuffering h NoBuffering 457 hSetBuffering h NoBuffering
448 let (forward,idle_ms,timeout_ms) = 458 let (idle_ms,timeout_ms) =
449 case (inout,duplex params) of 459 case (inout,duplex params) of
450 (Out,False) -> ( const $ return () 460 (Out,False) -> ( 0, 0 )
451 , 0 461 _ -> ( pingInterval params
452 , 0 )
453 _ -> ( announce . ((conkey,u),) . Got
454 , pingInterval params
455 , timeout params ) 462 , timeout params )
456 463
457 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms 464 new <- do pinglogic <- forkPingMachine idle_ms timeout_ms
@@ -473,7 +480,7 @@ newConnection server params conkey u h inout = do
473 (newCon,e) <- return $ 480 (newCon,e) <- return $
474 if duplex params 481 if duplex params
475 then let newcon = SaneConnection new 482 then let newcon = SaneConnection new
476 in ( newcon, ((conkey,u), conevent newcon) ) 483 in ( newcon, ((conkey,u), conevent sessionConduits newcon) )
477 else ( case inout of 484 else ( case inout of
478 In -> ReadOnlyConnection new 485 In -> ReadOnlyConnection new
479 Out -> WriteOnlyConnection new 486 Out -> WriteOnlyConnection new
@@ -496,22 +503,6 @@ newConnection server params conkey u h inout = do
496 kont <- updateConMap conkey u new what 503 kont <- updateConMap conkey u new what
497 putTMVar started () 504 putTMVar started ()
498 return kont 505 return kont
499#ifdef TEST
500 -- enable this for 'Got' events
501 forkIO $ do -- inout==In || duplex params then forkIO $ do
502 warn $ "waiting gots thread: " <> bshow (conkey,inout)
503 atomically $ takeTMVar started
504 -- pingBump pinglogic -- start the ping timer
505 fix $ \loop -> do
506 -- warn $ "read thread: " <> bshow (conkey,inout)
507 mb <- threadsRead new
508 -- pingBump pinglogic
509 warn $ "got: " <> bshow (mb,(conkey,inout))
510 maybe (return ())
511 (atomically . forward >=> const loop)
512 mb
513 warn $ "quit-gots: " <> bshow (conkey,inout)
514#endif
515 return new 506 return new
516 where 507 where
517 508
@@ -562,17 +553,17 @@ newConnection server params conkey u h inout = do
562 announce ((conkey,u),EOF) 553 announce ((conkey,u),EOF)
563 connClose replaced 554 connClose replaced
564 let newcon = SaneConnection new 555 let newcon = SaneConnection new
565 announce $ ((conkey,u),conevent newcon) 556 announce $ ((conkey,u),conevent sessionConduits newcon)
566 return $ newcon 557 return $ newcon
567 else 558 else
568 case replaced of 559 case replaced of
569 WriteOnlyConnection w | inout==In -> 560 WriteOnlyConnection w | inout==In ->
570 do let newcon = ConnectionPair new w 561 do let newcon = ConnectionPair new w
571 announce ((conkey,u),conevent newcon) 562 announce ((conkey,u),conevent sessionConduits newcon)
572 return newcon 563 return newcon
573 ReadOnlyConnection r | inout==Out -> 564 ReadOnlyConnection r | inout==Out ->
574 do let newcon = ConnectionPair r new 565 do let newcon = ConnectionPair r new
575 announce ((conkey,u),conevent newcon) 566 announce ((conkey,u),conevent sessionConduits newcon)
576 return newcon 567 return newcon
577 _ -> do -- connFlush todo 568 _ -> do -- connFlush todo
578 announce ((conkey,u0), EOF) 569 announce ((conkey,u0), EOF)
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 362cc8a8..b63f04c3 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -46,7 +46,12 @@ import Control.Monad.Trans (lift)
46import Control.Monad.IO.Class (MonadIO, liftIO) 46import Control.Monad.IO.Class (MonadIO, liftIO)
47import Control.Monad.Fix (fix) 47import Control.Monad.Fix (fix)
48import Control.Monad 48import Control.Monad
49import Control.Concurrent (forkIO) 49#ifdef THREAD_DEBUG
50import Control.Concurrent.Lifted.Instrument (forkIO,myThreadId,labelThread)
51#else
52import Control.Concurrent.Lifted (forkIO,myThreadId)
53import GHC.Conc (labelThread)
54#endif
50import Control.Concurrent.STM 55import Control.Concurrent.STM
51-- import Control.Concurrent.STM.TChan 56-- import Control.Concurrent.STM.TChan
52import Network.SocketLike 57import Network.SocketLike
@@ -80,6 +85,7 @@ import qualified System.Random
80import Data.Void (Void) 85import Data.Void (Void)
81import System.Endian (toBE32) 86import System.Endian (toBE32)
82import Control.Applicative 87import Control.Applicative
88import System.IO
83 89
84peerport :: PortNumber 90peerport :: PortNumber
85peerport = 5269 91peerport = 5269
@@ -873,7 +879,7 @@ makePong namespace mid to from =
873 ] 879 ]
874 880
875 881
876xmppInbound :: Server ConnectionKey SockAddr ReleaseKey 882xmppInbound :: Server ConnectionKey SockAddr ReleaseKey XML.Event
877 -> XMPPServerParameters 883 -> XMPPServerParameters
878 -> ConnectionKey 884 -> ConnectionKey
879 -> SockAddr 885 -> SockAddr
@@ -1175,7 +1181,7 @@ presenceStanza stanza_type type_attr me jid =
1175 ] 1181 ]
1176 , EventEndElement "{jabber:server}presence" ] 1182 , EventEndElement "{jabber:server}presence" ]
1177 1183
1178forkConnection :: Server ConnectionKey SockAddr ReleaseKey 1184forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event
1179 -> XMPPServerParameters 1185 -> XMPPServerParameters
1180 -> ConnectionKey 1186 -> ConnectionKey
1181 -> SockAddr 1187 -> SockAddr
@@ -1235,7 +1241,8 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1235 ,readTMVar rdone >> return (return ()) 1241 ,readTMVar rdone >> return (return ())
1236 ] 1242 ]
1237 what 1243 what
1238 forkIO $ do (greet_src >> slot_src) $$ snk 1244 forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k)
1245 (greet_src >> slot_src) $$ snk
1239 last <- atomically $ readTVar lastStanza 1246 last <- atomically $ readTVar lastStanza
1240 es <- while (atomically . fmap not $ Slotted.isEmpty slots) 1247 es <- while (atomically . fmap not $ Slotted.isEmpty slots)
1241 (atomically . Slotted.pull $ slots) 1248 (atomically . Slotted.pull $ slots)
@@ -1259,11 +1266,13 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1259 -- TODO: Probably some stanzas should be queued or saved for re-connect. 1266 -- TODO: Probably some stanzas should be queued or saved for re-connect.
1260 mapM_ fail $ filter notError (maybeToList last ++ es') 1267 mapM_ fail $ filter notError (maybeToList last ++ es')
1261 wlog $ "end post-queue fork: " ++ show k 1268 wlog $ "end post-queue fork: " ++ show k
1269
1262 output <- atomically newTChan 1270 output <- atomically newTChan
1263 hacks <- atomically $ newTVar Map.empty 1271 hacks <- atomically $ newTVar Map.empty
1264 msgids <- atomically $ newTVar [] 1272 msgids <- atomically $ newTVar []
1265 forkIO $ do 1273 forkIO $ do
1266 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer 1274 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
1275 myThreadId >>= flip labelThread ("pre-queue."++show k)
1267 verbosity <- xmppVerbosity xmpp 1276 verbosity <- xmppVerbosity xmpp
1268 fix $ \loop -> do 1277 fix $ \loop -> do
1269 what <- atomically $ foldr1 orElse 1278 what <- atomically $ foldr1 orElse
@@ -1338,6 +1347,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
1338 what 1347 what
1339 wlog $ "end pre-queue fork: " ++ show k 1348 wlog $ "end pre-queue fork: " ++ show k
1340 forkIO $ do 1349 forkIO $ do
1350 myThreadId >>= flip labelThread ("reader."++show k)
1341 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) 1351 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
1342 src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone 1352 src $$ xmppInbound sv xmpp k laddr pingflag stanzas output rdone
1343 atomically $ putTMVar rdone () 1353 atomically $ putTMVar rdone ()
@@ -1437,7 +1447,7 @@ sendRoster query xmpp replyto = do
1437 -} 1447 -}
1438 1448
1439 1449
1440socketFromKey :: Server ConnectionKey SockAddr ReleaseKey -> ConnectionKey -> IO SockAddr 1450socketFromKey :: Server ConnectionKey SockAddr ReleaseKey XML.Event -> ConnectionKey -> IO SockAddr
1441socketFromKey sv k = do 1451socketFromKey sv k = do
1442 map <- atomically $ readTVar (conmap sv) 1452 map <- atomically $ readTVar (conmap sv)
1443 let mcd = Map.lookup k map 1453 let mcd = Map.lookup k map
@@ -1607,7 +1617,7 @@ makeErrorStanza stanza = do
1607 ] 1617 ]
1608 1618
1609monitor :: 1619monitor ::
1610 Server ConnectionKey SockAddr ReleaseKey 1620 Server ConnectionKey SockAddr ReleaseKey XML.Event
1611 -> ConnectionParameters ConnectionKey SockAddr 1621 -> ConnectionParameters ConnectionKey SockAddr
1612 -> XMPPServerParameters 1622 -> XMPPServerParameters
1613 -> IO b 1623 -> IO b
@@ -1619,12 +1629,10 @@ monitor sv params xmpp = do
1619 action <- atomically $ foldr1 orElse 1629 action <- atomically $ foldr1 orElse
1620 [ readTChan chan >>= \((k,u),e) -> return $ do 1630 [ readTChan chan >>= \((k,u),e) -> return $ do
1621 case e of 1631 case e of
1622 Connection pingflag conread conwrite -> do 1632 Connection pingflag xsrc xsnk -> do
1623 wlog $ tomsg k "Connection" 1633 wlog $ tomsg k "Connection"
1624 let (xsrc,xsnk) = xmlStream conread conwrite 1634 outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas
1625 outs <- forkConnection sv xmpp k u pingflag xsrc xsnk stanzas 1635 xmppNewConnection xmpp k u outs
1626 xmppNewConnection xmpp k u outs
1627 return ()
1628 ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure" 1636 ConnectFailure addr -> return () -- wlog $ tomsg k "ConnectFailure"
1629 EOF -> do wlog $ tomsg k "EOF" 1637 EOF -> do wlog $ tomsg k "EOF"
1630 xmppEOF xmpp k 1638 xmppEOF xmpp k
@@ -1775,7 +1783,7 @@ monitor sv params xmpp = do
1775 _ = str :: String 1783 _ = str :: String
1776 1784
1777data XMPPServer 1785data XMPPServer
1778 = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey 1786 = XMPPServer { _xmpp_sv :: Server ConnectionKey SockAddr ReleaseKey XML.Event
1779 , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr 1787 , _xmpp_peer_params :: ConnectionParameters ConnectionKey SockAddr
1780 } 1788 }
1781 1789
@@ -1787,7 +1795,7 @@ xmppServer :: ( MonadResource m
1787 , MonadIO m 1795 , MonadIO m
1788 ) => XMPPServerParameters -> m XMPPServer 1796 ) => XMPPServerParameters -> m XMPPServer
1789xmppServer xmpp = do 1797xmppServer xmpp = do
1790 sv <- server allocate 1798 sv <- server allocate xmlStream
1791 -- some fuzz helps avoid simultaneity 1799 -- some fuzz helps avoid simultaneity
1792 pingfuzz <- liftIO $ do 1800 pingfuzz <- liftIO $ do
1793 gen <- System.Random.getStdGen 1801 gen <- System.Random.getStdGen
@@ -1803,8 +1811,12 @@ xmppServer xmpp = do
1803 , timeout = 0 1811 , timeout = 0
1804 } 1812 }
1805 liftIO $ do 1813 liftIO $ do
1806 forkIO $ monitor sv peer_params xmpp 1814 forkIO $ do
1815 myThreadId >>= flip labelThread ("XMPP.monitor")
1816 monitor sv peer_params xmpp
1817 hPutStrLn stderr $ "Starting peer listen"
1807 control sv (Listen peerport peer_params) 1818 control sv (Listen peerport peer_params)
1819 hPutStrLn stderr $ "Starting client listen"
1808 control sv (Listen clientport client_params) 1820 control sv (Listen clientport client_params)
1809 return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params } 1821 return XMPPServer { _xmpp_sv = sv, _xmpp_peer_params = peer_params }
1810 1822