summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Presence/EventUtil.hs3
-rw-r--r--Presence/XMPPServer.hs49
2 files changed, 39 insertions, 13 deletions
diff --git a/Presence/EventUtil.hs b/Presence/EventUtil.hs
index bdea9fa2..a1c48e33 100644
--- a/Presence/EventUtil.hs
+++ b/Presence/EventUtil.hs
@@ -55,3 +55,6 @@ iqTypeError = "error"
55 55
56tagName (EventBeginElement n _) = n 56tagName (EventBeginElement n _) = n
57tagName _ = "" 57tagName _ = ""
58
59closerFor (EventBeginElement n _) = EventEndElement n
60closerFor _ = error "closerFor: unsupported event"
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 95110301..a9e7e336 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -42,6 +42,10 @@ clientport = 5222
42 42
43 43
44-- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error 44-- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error
45-- client connection
46-- socat script to send stanza fragment
47-- copyToChannel can keep a stack of closers to append to finish-off a stanza
48-- the TMVar () from forkConnection can be passed and with a stanza to detect interruption
45 49
46addrToText :: SockAddr -> Text 50addrToText :: SockAddr -> Text
47addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) 51addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
@@ -92,10 +96,19 @@ data Stanza
92 | PongStanza { -- stanzaId :: Maybe Text 96 | PongStanza { -- stanzaId :: Maybe Text
93 stanzaChan :: TChan (Maybe XML.Event) } 97 stanzaChan :: TChan (Maybe XML.Event) }
94 98
95copyToChannel f chan = awaitForever copy 99copyToChannel f chan closer_stack = awaitForever copy
96 where 100 where
97 copy x = do 101 copy x = do
98 liftIO . atomically $ writeTChan chan (f x) 102 liftIO . atomically $ writeTChan chan (f x)
103 case x of
104 EventBeginDocument {} -> do
105 let clsr = closerFor x
106 liftIO . atomically $
107 modifyTVar' closer_stack (clsr:)
108 EventEndDocument {} -> do
109 liftIO . atomically $
110 modifyTVar' closer_stack (drop 1)
111 _ -> return ()
99 yield x 112 yield x
100 113
101 114
@@ -123,19 +136,23 @@ xmppInbound :: ConnectionKey -> FlagCommand
123 -> Source IO XML.Event 136 -> Source IO XML.Event
124 -> TChan Stanza 137 -> TChan Stanza
125 -> TChan Stanza 138 -> TChan Stanza
139 -> TMVar ()
126 -> Sink XML.Event IO () 140 -> Sink XML.Event IO ()
127xmppInbound k pingflag src stanzas output = doNestingXML $ do 141xmppInbound k pingflag src stanzas output donevar = doNestingXML $ do
128 withXML $ \begindoc -> do 142 withXML $ \begindoc -> do
129 when (begindoc==EventBeginDocument) $ do 143 when (begindoc==EventBeginDocument) $ do
130 whenJust nextElement $ \xml -> do 144 whenJust nextElement $ \xml -> do
131 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do 145 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
132 fix $ \loop -> do 146 fix $ \loop -> do
133 -- liftIO . wlog $ "waiting for stanza." 147 -- liftIO . wlog $ "waiting for stanza."
134 chan <- liftIO $ atomically newTChan 148 (chan,clsrs) <- liftIO . atomically $
149 liftM2 (,) newTChan (newTVar [])
135 whenJust nextElement $ \stanza -> do 150 whenJust nextElement $ \stanza -> do
136 stanza_lvl <- nesting 151 stanza_lvl <- nesting
137 ioWriteChan chan (Just stanza) 152 liftIO . atomically $ do
138 copyToChannel Just chan =$= do 153 writeTChan chan (Just stanza)
154 modifyTVar' clsrs (closerFor stanza:)
155 copyToChannel Just chan clsrs =$= do
139 dispatch <- 156 dispatch <-
140 case () of 157 case () of
141 _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza 158 _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza
@@ -146,10 +163,13 @@ xmppInbound k pingflag src stanzas output = doNestingXML $ do
146 let to = "todo" 163 let to = "todo"
147 from = "todo" 164 from = "todo"
148 let pong = peerPong (stanzaId d) to from 165 let pong = peerPong (stanzaId d) to from
166 liftIO $ wlog "got ping, sending pong..."
149 pongChan <- liftIO $ atomically newTChan 167 pongChan <- liftIO $ atomically newTChan
150 ioWriteChan output (PongStanza pongChan) 168 ioWriteChan output (PongStanza pongChan)
151 mapM_ (ioWriteChan pongChan . Just) pong 169 void . liftIO . forkIO $ do
152 ioWriteChan pongChan Nothing 170 mapM_ (ioWriteChan pongChan . Just) pong
171 ioWriteChan pongChan Nothing
172 liftIO $ wlog "finished pong stanza"
153 disp -> ioWriteChan stanzas disp 173 disp -> ioWriteChan stanzas disp
154 awaitCloser stanza_lvl 174 awaitCloser stanza_lvl
155 ioWriteChan chan Nothing 175 ioWriteChan chan Nothing
@@ -279,8 +299,11 @@ forkConnection k pingflag src snk stanzas = do
279 from = "todo" -- Look it up from Server object 299 from = "todo" -- Look it up from Server object
280 -- or pass it with Connection event. 300 -- or pass it with Connection event.
281 mid = Just "ping" 301 mid = Just "ping"
302 ping = peerPing mid to from
282 mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) 303 mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
283 (peerPing mid to from) 304 ping
305 wlog ""
306 prettyPrint "P<- " ping
284 loop 307 loop
285 ,readTMVar rdone >> return (return ()) 308 ,readTMVar rdone >> return (return ())
286 ] 309 ]
@@ -288,7 +311,7 @@ forkConnection k pingflag src snk stanzas = do
288 wlog $ "end pre-queue fork: " ++ show k 311 wlog $ "end pre-queue fork: " ++ show k
289 forkIO $ do 312 forkIO $ do
290 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) 313 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
291 src $$ xmppInbound k pingflag src stanzas output 314 src $$ xmppInbound k pingflag src stanzas output rdone
292 atomically $ putTMVar rdone () 315 atomically $ putTMVar rdone ()
293 wlog $ "end reader fork: " ++ show k 316 wlog $ "end reader fork: " ++ show k
294 return output 317 return output
@@ -337,12 +360,12 @@ monitor sv params = do
337 wlog $ tomsg k "ReadOnly" 360 wlog $ tomsg k "ReadOnly"
338 control sv (Connect (callBackAddress k) params) 361 control sv (Connect (callBackAddress k) params)
339 HalfConnection Out -> wlog $ tomsg k "WriteOnly" 362 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
340 RequiresPing -> wlog $ tomsg k "RequiresPing" 363 RequiresPing -> return () -- wlog $ tomsg k "RequiresPing"
341 _ -> return () 364 _ -> return ()
342 , readTChan stanzas >>= \stanza -> return $ do 365 , readTChan stanzas >>= \stanza -> return $ do
343 xs <- readUntilNothing (stanzaChan stanza) 366 xs <- readUntilNothing (stanzaChan stanza)
344 wlog "" 367 wlog ""
345 prettyPrint "STANZA: " xs 368 prettyPrint "P-> " xs
346 ] 369 ]
347 action 370 action
348 loop 371 loop
@@ -356,8 +379,8 @@ xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,Connection
356xmppServer = do 379xmppServer = do
357 sv <- server 380 sv <- server
358 let peer_params = (connectionDefaults peerKey) 381 let peer_params = (connectionDefaults peerKey)
359 { pingInterval = 10000 382 { pingInterval = 5000
360 , timeout = 10000 383 , timeout = 1000
361 , duplex = False } 384 , duplex = False }
362 client_params = connectionDefaults clientKey 385 client_params = connectionDefaults clientKey
363 liftIO $ do 386 liftIO $ do