diff options
author | joe <joe@jerkface.net> | 2017-11-17 09:21:18 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-17 09:21:18 -0500 |
commit | 6cb062187e94a322863363acc34e58d9d4070de1 (patch) | |
tree | d7edcad7ea24bd870988b4ad3a9e3d4c1307df05 | |
parent | d0311af98728855128b6417a4a591186a42345f9 (diff) |
Factored slotsToSource out of forkConnection
-rw-r--r-- | Presence/XMPPServer.hs | 71 |
1 files changed, 39 insertions, 32 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index b63f04c3..3505a0a2 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -1181,6 +1181,44 @@ presenceStanza stanza_type type_attr me jid = | |||
1181 | ] | 1181 | ] |
1182 | , EventEndElement "{jabber:server}presence" ] | 1182 | , EventEndElement "{jabber:server}presence" ] |
1183 | 1183 | ||
1184 | slotsToSource :: | ||
1185 | Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event) | ||
1186 | -> TVar Int | ||
1187 | -> TVar (Maybe (StanzaWrap XML.Event)) | ||
1188 | -> TVar Bool | ||
1189 | -> TMVar () | ||
1190 | -> Source IO (Flush XML.Event) | ||
1191 | slotsToSource slots nesting lastStanza needsFlush rdone = | ||
1192 | fix $ \slot_src -> join . lift . atomically $ foldr1 orElse | ||
1193 | [Slotted.pull slots >>= \x -> do | ||
1194 | x <- case x of | ||
1195 | Left wrapped -> do | ||
1196 | writeTVar nesting 1 | ||
1197 | writeTVar lastStanza (Just wrapped) | ||
1198 | return $ stanzaChan wrapped | ||
1199 | Right x -> do | ||
1200 | when (isEventBeginElement x) | ||
1201 | $ modifyTVar' nesting (+1) | ||
1202 | when (isEventEndElement x) $ do | ||
1203 | n <- readTVar nesting | ||
1204 | when (n==1) $ writeTVar lastStanza Nothing | ||
1205 | modifyTVar' nesting (subtract 1) | ||
1206 | return x | ||
1207 | writeTVar needsFlush True | ||
1208 | return $ do | ||
1209 | -- liftIO $ wlog $ "yielding Chunk: " ++ show x | ||
1210 | yield (Chunk x) | ||
1211 | slot_src | ||
1212 | ,do Slotted.isEmpty slots >>= check | ||
1213 | readTVar needsFlush >>= check | ||
1214 | writeTVar needsFlush False | ||
1215 | return $ do | ||
1216 | -- liftIO $ wlog "yielding Flush" | ||
1217 | yield Flush | ||
1218 | slot_src | ||
1219 | ,readTMVar rdone >> return (return ()) | ||
1220 | ] | ||
1221 | |||
1184 | forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event | 1222 | forkConnection :: Server ConnectionKey SockAddr ReleaseKey XML.Event |
1185 | -> XMPPServerParameters | 1223 | -> XMPPServerParameters |
1186 | -> ConnectionKey | 1224 | -> ConnectionKey |
@@ -1210,37 +1248,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
1210 | let greet_src = do | 1248 | let greet_src = do |
1211 | CL.sourceList (greet' namespace me) =$= CL.map Chunk | 1249 | CL.sourceList (greet' namespace me) =$= CL.map Chunk |
1212 | yield Flush | 1250 | yield Flush |
1213 | slot_src = do | 1251 | slot_src = slotsToSource slots nesting lastStanza needsFlush rdone |
1214 | what <- lift . atomically $ foldr1 orElse | ||
1215 | [Slotted.pull slots >>= \x -> do | ||
1216 | x <- case x of | ||
1217 | Left wrapped -> do | ||
1218 | writeTVar nesting 1 | ||
1219 | writeTVar lastStanza (Just wrapped) | ||
1220 | return $ stanzaChan wrapped | ||
1221 | Right x -> do | ||
1222 | when (isEventBeginElement x) | ||
1223 | $ modifyTVar' nesting (+1) | ||
1224 | when (isEventEndElement x) $ do | ||
1225 | n <- readTVar nesting | ||
1226 | when (n==1) $ writeTVar lastStanza Nothing | ||
1227 | modifyTVar' nesting (subtract 1) | ||
1228 | return x | ||
1229 | writeTVar needsFlush True | ||
1230 | return $ do | ||
1231 | -- liftIO $ wlog $ "yielding Chunk: " ++ show x | ||
1232 | yield (Chunk x) | ||
1233 | slot_src | ||
1234 | ,do Slotted.isEmpty slots >>= check | ||
1235 | readTVar needsFlush >>= check | ||
1236 | writeTVar needsFlush False | ||
1237 | return $ do | ||
1238 | -- liftIO $ wlog "yielding Flush" | ||
1239 | yield Flush | ||
1240 | slot_src | ||
1241 | ,readTMVar rdone >> return (return ()) | ||
1242 | ] | ||
1243 | what | ||
1244 | forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k) | 1252 | forkIO $ do myThreadId >>= flip labelThread ("post-queue."++show k) |
1245 | (greet_src >> slot_src) $$ snk | 1253 | (greet_src >> slot_src) $$ snk |
1246 | last <- atomically $ readTVar lastStanza | 1254 | last <- atomically $ readTVar lastStanza |
@@ -1641,7 +1649,6 @@ monitor sv params xmpp = do | |||
1641 | control sv (Connect (callBackAddress k) params) | 1649 | control sv (Connect (callBackAddress k) params) |
1642 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | 1650 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" |
1643 | RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" | 1651 | RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" |
1644 | _ -> return () | ||
1645 | , readTChan stanzas >>= \stanza -> return $ do | 1652 | , readTChan stanzas >>= \stanza -> return $ do |
1646 | {- | 1653 | {- |
1647 | dup <- case stanzaType stanza of | 1654 | dup <- case stanzaType stanza of |