summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-11 21:40:09 -0500
committerjoe <joe@jerkface.net>2014-02-11 21:40:09 -0500
commit7c520e503992d5f8c6a9259ef2c3220ef36d5f24 (patch)
tree8eb83a3a4cdd97704bb0185b54ab20df728f6f83
parentdf8038e9c7341ef470318aaafe517074aa553349 (diff)
Added Chunk/Flush logic to outgoing connections in xmppServer
-rw-r--r--Presence/EventUtil.hs23
-rw-r--r--xmppServer.hs64
2 files changed, 69 insertions, 18 deletions
diff --git a/Presence/EventUtil.hs b/Presence/EventUtil.hs
new file mode 100644
index 00000000..e62f8afc
--- /dev/null
+++ b/Presence/EventUtil.hs
@@ -0,0 +1,23 @@
1{-# LANGUAGE OverloadedStrings #-}
2module EventUtil where
3
4import Control.Monad
5import Data.XML.Types as XML
6
7getStreamName (EventBeginElement name _) = name
8
9isEventBeginElement (EventBeginElement {}) = True
10isEventBeginElement _ = False
11
12isEventEndElement (EventEndElement {}) = True
13isEventEndElement _ = False
14
15-- Note: This function ignores name space qualification
16elementAttrs expected (EventBeginElement name attrs)
17 | nameLocalName name==expected
18 = return attrs
19elementAttrs _ _ = mzero
20
21streamP name = Name name (Just "http://etherx.jabber.org/streams") (Just "stream")
22
23attr name value = (name,[ContentText value])
diff --git a/xmppServer.hs b/xmppServer.hs
index e8864706..cbd30bda 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -18,6 +18,7 @@ import qualified Data.ByteString.Char8 as Strict8
18import Data.Conduit 18import Data.Conduit
19import qualified Data.Conduit.List as CL 19import qualified Data.Conduit.List as CL
20import qualified Data.Conduit.Binary as CB 20import qualified Data.Conduit.Binary as CB
21import Data.Conduit.Blaze (builderToByteStringFlush)
21 22
22import qualified Text.XML.Stream.Render as XML 23import qualified Text.XML.Stream.Render as XML
23import qualified Text.XML.Stream.Parse as XML 24import qualified Text.XML.Stream.Parse as XML
@@ -28,6 +29,7 @@ import Data.Monoid ( (<>) )
28import qualified Control.Concurrent.STM.UpdateStream as Slotted 29import qualified Control.Concurrent.STM.UpdateStream as Slotted
29import ControlMaybe 30import ControlMaybe
30import NestingXML 31import NestingXML
32import EventUtil
31import Server 33import Server
32 34
33 35
@@ -37,20 +39,24 @@ wlogb s = Strict8.putStrLn s
37 39
38control sv = atomically . putTMVar (serverCommand sv) 40control sv = atomically . putTMVar (serverCommand sv)
39 41
40-- Note: This function ignores name space qualification 42discardFlush :: Monad m => ConduitM (Flush a) a m ()
41elementAttrs expected (EventBeginElement name attrs) 43discardFlush = awaitForever $ \x -> do
42 | nameLocalName name==expected 44 let unchunk (Chunk a) = a
43 = return attrs 45 ischunk (Chunk _) = True
44elementAttrs _ _ = mzero 46 ischunk _ = False
47 when (ischunk x) $ yield (unchunk x)
45 48
46getStreamName (EventBeginElement name _) = name
47 49
48xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event 50xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
49 , Sink XML.Event IO () ) 51 , Sink (Flush XML.Event) IO () )
50xmlStream conread conwrite = (xsrc,xsnk) 52xmlStream conread conwrite = (xsrc,xsnk)
51 where 53 where
52 xsrc = src $= XML.parseBytes XML.def 54 xsrc = src $= XML.parseBytes XML.def
53 xsnk = XML.renderBytes XML.def =$ snk 55 xsnk = -- XML.renderBytes XML.def =$ snk
56 XML.renderBuilderFlush XML.def
57 =$= builderToByteStringFlush
58 =$= discardFlush
59 =$ snk
54 60
55 src = do 61 src = do
56 v <- lift conread 62 v <- lift conread
@@ -117,16 +123,25 @@ chanContents ch = do
117 x 123 x
118 124
119 125
120isEventBeginElement (EventBeginElement {}) = True 126greetPeer =
121isEventBeginElement _ = False 127 [ EventBeginDocument
128 , EventBeginElement (streamP "stream")
129 [ attr "xmlns" "jabber:server"
130 , attr "version" "1.0"
131 ]
132 ]
133
134goodbyePeer =
135 [ EventEndElement (streamP "stream")
136 , EventEndDocument
137 ]
138
122 139
123isEventEndElement (EventEndElement {}) = True
124isEventEndElement _ = False
125 140
126forkConnection :: ConnectionKey 141forkConnection :: ConnectionKey
127 -> FlagCommand 142 -> FlagCommand
128 -> Source IO XML.Event 143 -> Source IO XML.Event
129 -> Sink XML.Event IO () 144 -> Sink (Flush XML.Event) IO ()
130 -> TChan Stanza 145 -> TChan Stanza
131 -> IO (TChan Stanza) 146 -> IO (TChan Stanza)
132forkConnection k pingflag src snk stanzas = do 147forkConnection k pingflag src snk stanzas = do
@@ -139,16 +154,29 @@ forkConnection k pingflag src snk stanzas = do
139 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement 154 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement
140 let _ = slots :: Slotted.UpdateStream () XML.Event 155 let _ = slots :: Slotted.UpdateStream () XML.Event
141 let slot_src = do 156 let slot_src = do
142 what <- lift . atomically $ orElse 157 CL.sourceList greetPeer =$= CL.map Chunk
143 (Slotted.pull slots >>= \x -> return $ do 158 yield Flush
144 yield x 159 needsFlush <- lift . atomically $ newTVar False
145 slot_src) 160 what <- lift . atomically $ foldr1 orElse
146 (readTMVar rdone >> return (return ())) 161 [Slotted.pull slots >>= \x -> do
162 writeTVar needsFlush True
163 return $ do
164 yield (Chunk x)
165 slot_src
166 ,do Slotted.isEmpty slots >>= check
167 readTVar needsFlush >>= check
168 writeTVar needsFlush False
169 return $ do
170 yield Flush
171 slot_src
172 ,readTMVar rdone >> return (return ())
173 ]
147 what 174 what
148 forkIO $ do slot_src $$ snk 175 forkIO $ do slot_src $$ snk
149 wlog $ "end post-queue fork: " ++ show k 176 wlog $ "end post-queue fork: " ++ show k
150 output <- atomically newTChan 177 output <- atomically newTChan
151 forkIO $ do 178 forkIO $ do
179 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
152 fix $ \loop -> do 180 fix $ \loop -> do
153 what <- atomically $ foldr1 orElse 181 what <- atomically $ foldr1 orElse
154 [readTChan output >>= \stanza -> return $ do 182 [readTChan output >>= \stanza -> return $ do