diff options
author | joe <joe@jerkface.net> | 2014-02-11 21:40:09 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-11 21:40:09 -0500 |
commit | 7c520e503992d5f8c6a9259ef2c3220ef36d5f24 (patch) | |
tree | 8eb83a3a4cdd97704bb0185b54ab20df728f6f83 | |
parent | df8038e9c7341ef470318aaafe517074aa553349 (diff) |
Added Chunk/Flush logic to outgoing connections in xmppServer
-rw-r--r-- | Presence/EventUtil.hs | 23 | ||||
-rw-r--r-- | xmppServer.hs | 64 |
2 files changed, 69 insertions, 18 deletions
diff --git a/Presence/EventUtil.hs b/Presence/EventUtil.hs new file mode 100644 index 00000000..e62f8afc --- /dev/null +++ b/Presence/EventUtil.hs | |||
@@ -0,0 +1,23 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | module EventUtil where | ||
3 | |||
4 | import Control.Monad | ||
5 | import Data.XML.Types as XML | ||
6 | |||
7 | getStreamName (EventBeginElement name _) = name | ||
8 | |||
9 | isEventBeginElement (EventBeginElement {}) = True | ||
10 | isEventBeginElement _ = False | ||
11 | |||
12 | isEventEndElement (EventEndElement {}) = True | ||
13 | isEventEndElement _ = False | ||
14 | |||
15 | -- Note: This function ignores name space qualification | ||
16 | elementAttrs expected (EventBeginElement name attrs) | ||
17 | | nameLocalName name==expected | ||
18 | = return attrs | ||
19 | elementAttrs _ _ = mzero | ||
20 | |||
21 | streamP name = Name name (Just "http://etherx.jabber.org/streams") (Just "stream") | ||
22 | |||
23 | attr name value = (name,[ContentText value]) | ||
diff --git a/xmppServer.hs b/xmppServer.hs index e8864706..cbd30bda 100644 --- a/xmppServer.hs +++ b/xmppServer.hs | |||
@@ -18,6 +18,7 @@ import qualified Data.ByteString.Char8 as Strict8 | |||
18 | import Data.Conduit | 18 | import Data.Conduit |
19 | import qualified Data.Conduit.List as CL | 19 | import qualified Data.Conduit.List as CL |
20 | import qualified Data.Conduit.Binary as CB | 20 | import qualified Data.Conduit.Binary as CB |
21 | import Data.Conduit.Blaze (builderToByteStringFlush) | ||
21 | 22 | ||
22 | import qualified Text.XML.Stream.Render as XML | 23 | import qualified Text.XML.Stream.Render as XML |
23 | import qualified Text.XML.Stream.Parse as XML | 24 | import qualified Text.XML.Stream.Parse as XML |
@@ -28,6 +29,7 @@ import Data.Monoid ( (<>) ) | |||
28 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | 29 | import qualified Control.Concurrent.STM.UpdateStream as Slotted |
29 | import ControlMaybe | 30 | import ControlMaybe |
30 | import NestingXML | 31 | import NestingXML |
32 | import EventUtil | ||
31 | import Server | 33 | import Server |
32 | 34 | ||
33 | 35 | ||
@@ -37,20 +39,24 @@ wlogb s = Strict8.putStrLn s | |||
37 | 39 | ||
38 | control sv = atomically . putTMVar (serverCommand sv) | 40 | control sv = atomically . putTMVar (serverCommand sv) |
39 | 41 | ||
40 | -- Note: This function ignores name space qualification | 42 | discardFlush :: Monad m => ConduitM (Flush a) a m () |
41 | elementAttrs expected (EventBeginElement name attrs) | 43 | discardFlush = awaitForever $ \x -> do |
42 | | nameLocalName name==expected | 44 | let unchunk (Chunk a) = a |
43 | = return attrs | 45 | ischunk (Chunk _) = True |
44 | elementAttrs _ _ = mzero | 46 | ischunk _ = False |
47 | when (ischunk x) $ yield (unchunk x) | ||
45 | 48 | ||
46 | getStreamName (EventBeginElement name _) = name | ||
47 | 49 | ||
48 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event | 50 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event |
49 | , Sink XML.Event IO () ) | 51 | , Sink (Flush XML.Event) IO () ) |
50 | xmlStream conread conwrite = (xsrc,xsnk) | 52 | xmlStream conread conwrite = (xsrc,xsnk) |
51 | where | 53 | where |
52 | xsrc = src $= XML.parseBytes XML.def | 54 | xsrc = src $= XML.parseBytes XML.def |
53 | xsnk = XML.renderBytes XML.def =$ snk | 55 | xsnk = -- XML.renderBytes XML.def =$ snk |
56 | XML.renderBuilderFlush XML.def | ||
57 | =$= builderToByteStringFlush | ||
58 | =$= discardFlush | ||
59 | =$ snk | ||
54 | 60 | ||
55 | src = do | 61 | src = do |
56 | v <- lift conread | 62 | v <- lift conread |
@@ -117,16 +123,25 @@ chanContents ch = do | |||
117 | x | 123 | x |
118 | 124 | ||
119 | 125 | ||
120 | isEventBeginElement (EventBeginElement {}) = True | 126 | greetPeer = |
121 | isEventBeginElement _ = False | 127 | [ EventBeginDocument |
128 | , EventBeginElement (streamP "stream") | ||
129 | [ attr "xmlns" "jabber:server" | ||
130 | , attr "version" "1.0" | ||
131 | ] | ||
132 | ] | ||
133 | |||
134 | goodbyePeer = | ||
135 | [ EventEndElement (streamP "stream") | ||
136 | , EventEndDocument | ||
137 | ] | ||
138 | |||
122 | 139 | ||
123 | isEventEndElement (EventEndElement {}) = True | ||
124 | isEventEndElement _ = False | ||
125 | 140 | ||
126 | forkConnection :: ConnectionKey | 141 | forkConnection :: ConnectionKey |
127 | -> FlagCommand | 142 | -> FlagCommand |
128 | -> Source IO XML.Event | 143 | -> Source IO XML.Event |
129 | -> Sink XML.Event IO () | 144 | -> Sink (Flush XML.Event) IO () |
130 | -> TChan Stanza | 145 | -> TChan Stanza |
131 | -> IO (TChan Stanza) | 146 | -> IO (TChan Stanza) |
132 | forkConnection k pingflag src snk stanzas = do | 147 | forkConnection k pingflag src snk stanzas = do |
@@ -139,16 +154,29 @@ forkConnection k pingflag src snk stanzas = do | |||
139 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement | 154 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement |
140 | let _ = slots :: Slotted.UpdateStream () XML.Event | 155 | let _ = slots :: Slotted.UpdateStream () XML.Event |
141 | let slot_src = do | 156 | let slot_src = do |
142 | what <- lift . atomically $ orElse | 157 | CL.sourceList greetPeer =$= CL.map Chunk |
143 | (Slotted.pull slots >>= \x -> return $ do | 158 | yield Flush |
144 | yield x | 159 | needsFlush <- lift . atomically $ newTVar False |
145 | slot_src) | 160 | what <- lift . atomically $ foldr1 orElse |
146 | (readTMVar rdone >> return (return ())) | 161 | [Slotted.pull slots >>= \x -> do |
162 | writeTVar needsFlush True | ||
163 | return $ do | ||
164 | yield (Chunk x) | ||
165 | slot_src | ||
166 | ,do Slotted.isEmpty slots >>= check | ||
167 | readTVar needsFlush >>= check | ||
168 | writeTVar needsFlush False | ||
169 | return $ do | ||
170 | yield Flush | ||
171 | slot_src | ||
172 | ,readTMVar rdone >> return (return ()) | ||
173 | ] | ||
147 | what | 174 | what |
148 | forkIO $ do slot_src $$ snk | 175 | forkIO $ do slot_src $$ snk |
149 | wlog $ "end post-queue fork: " ++ show k | 176 | wlog $ "end post-queue fork: " ++ show k |
150 | output <- atomically newTChan | 177 | output <- atomically newTChan |
151 | forkIO $ do | 178 | forkIO $ do |
179 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | ||
152 | fix $ \loop -> do | 180 | fix $ \loop -> do |
153 | what <- atomically $ foldr1 orElse | 181 | what <- atomically $ foldr1 orElse |
154 | [readTChan output >>= \stanza -> return $ do | 182 | [readTChan output >>= \stanza -> return $ do |