diff options
author | joe <joe@jerkface.net> | 2014-02-14 18:20:51 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-14 18:20:51 -0500 |
commit | fdd6100390a0c9e4d52cb068a093ea57b1ae0622 (patch) | |
tree | fa6bb7a9d094cbb1d6ced30683ccd14618494882 /Presence/XMPPServer.hs | |
parent | 418798796f77f1fa5cd2a52be62f8eb383cc25d8 (diff) |
copyToChan keeps a stack of closers
Diffstat (limited to 'Presence/XMPPServer.hs')
-rw-r--r-- | Presence/XMPPServer.hs | 49 |
1 files changed, 36 insertions, 13 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 95110301..a9e7e336 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -42,6 +42,10 @@ clientport = 5222 | |||
42 | 42 | ||
43 | 43 | ||
44 | -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error | 44 | -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error |
45 | -- client connection | ||
46 | -- socat script to send stanza fragment | ||
47 | -- copyToChannel can keep a stack of closers to append to finish-off a stanza | ||
48 | -- the TMVar () from forkConnection can be passed and with a stanza to detect interruption | ||
45 | 49 | ||
46 | addrToText :: SockAddr -> Text | 50 | addrToText :: SockAddr -> Text |
47 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) | 51 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) |
@@ -92,10 +96,19 @@ data Stanza | |||
92 | | PongStanza { -- stanzaId :: Maybe Text | 96 | | PongStanza { -- stanzaId :: Maybe Text |
93 | stanzaChan :: TChan (Maybe XML.Event) } | 97 | stanzaChan :: TChan (Maybe XML.Event) } |
94 | 98 | ||
95 | copyToChannel f chan = awaitForever copy | 99 | copyToChannel f chan closer_stack = awaitForever copy |
96 | where | 100 | where |
97 | copy x = do | 101 | copy x = do |
98 | liftIO . atomically $ writeTChan chan (f x) | 102 | liftIO . atomically $ writeTChan chan (f x) |
103 | case x of | ||
104 | EventBeginDocument {} -> do | ||
105 | let clsr = closerFor x | ||
106 | liftIO . atomically $ | ||
107 | modifyTVar' closer_stack (clsr:) | ||
108 | EventEndDocument {} -> do | ||
109 | liftIO . atomically $ | ||
110 | modifyTVar' closer_stack (drop 1) | ||
111 | _ -> return () | ||
99 | yield x | 112 | yield x |
100 | 113 | ||
101 | 114 | ||
@@ -123,19 +136,23 @@ xmppInbound :: ConnectionKey -> FlagCommand | |||
123 | -> Source IO XML.Event | 136 | -> Source IO XML.Event |
124 | -> TChan Stanza | 137 | -> TChan Stanza |
125 | -> TChan Stanza | 138 | -> TChan Stanza |
139 | -> TMVar () | ||
126 | -> Sink XML.Event IO () | 140 | -> Sink XML.Event IO () |
127 | xmppInbound k pingflag src stanzas output = doNestingXML $ do | 141 | xmppInbound k pingflag src stanzas output donevar = doNestingXML $ do |
128 | withXML $ \begindoc -> do | 142 | withXML $ \begindoc -> do |
129 | when (begindoc==EventBeginDocument) $ do | 143 | when (begindoc==EventBeginDocument) $ do |
130 | whenJust nextElement $ \xml -> do | 144 | whenJust nextElement $ \xml -> do |
131 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do | 145 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do |
132 | fix $ \loop -> do | 146 | fix $ \loop -> do |
133 | -- liftIO . wlog $ "waiting for stanza." | 147 | -- liftIO . wlog $ "waiting for stanza." |
134 | chan <- liftIO $ atomically newTChan | 148 | (chan,clsrs) <- liftIO . atomically $ |
149 | liftM2 (,) newTChan (newTVar []) | ||
135 | whenJust nextElement $ \stanza -> do | 150 | whenJust nextElement $ \stanza -> do |
136 | stanza_lvl <- nesting | 151 | stanza_lvl <- nesting |
137 | ioWriteChan chan (Just stanza) | 152 | liftIO . atomically $ do |
138 | copyToChannel Just chan =$= do | 153 | writeTChan chan (Just stanza) |
154 | modifyTVar' clsrs (closerFor stanza:) | ||
155 | copyToChannel Just chan clsrs =$= do | ||
139 | dispatch <- | 156 | dispatch <- |
140 | case () of | 157 | case () of |
141 | _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza | 158 | _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza |
@@ -146,10 +163,13 @@ xmppInbound k pingflag src stanzas output = doNestingXML $ do | |||
146 | let to = "todo" | 163 | let to = "todo" |
147 | from = "todo" | 164 | from = "todo" |
148 | let pong = peerPong (stanzaId d) to from | 165 | let pong = peerPong (stanzaId d) to from |
166 | liftIO $ wlog "got ping, sending pong..." | ||
149 | pongChan <- liftIO $ atomically newTChan | 167 | pongChan <- liftIO $ atomically newTChan |
150 | ioWriteChan output (PongStanza pongChan) | 168 | ioWriteChan output (PongStanza pongChan) |
151 | mapM_ (ioWriteChan pongChan . Just) pong | 169 | void . liftIO . forkIO $ do |
152 | ioWriteChan pongChan Nothing | 170 | mapM_ (ioWriteChan pongChan . Just) pong |
171 | ioWriteChan pongChan Nothing | ||
172 | liftIO $ wlog "finished pong stanza" | ||
153 | disp -> ioWriteChan stanzas disp | 173 | disp -> ioWriteChan stanzas disp |
154 | awaitCloser stanza_lvl | 174 | awaitCloser stanza_lvl |
155 | ioWriteChan chan Nothing | 175 | ioWriteChan chan Nothing |
@@ -279,8 +299,11 @@ forkConnection k pingflag src snk stanzas = do | |||
279 | from = "todo" -- Look it up from Server object | 299 | from = "todo" -- Look it up from Server object |
280 | -- or pass it with Connection event. | 300 | -- or pass it with Connection event. |
281 | mid = Just "ping" | 301 | mid = Just "ping" |
302 | ping = peerPing mid to from | ||
282 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) | 303 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) |
283 | (peerPing mid to from) | 304 | ping |
305 | wlog "" | ||
306 | prettyPrint "P<- " ping | ||
284 | loop | 307 | loop |
285 | ,readTMVar rdone >> return (return ()) | 308 | ,readTMVar rdone >> return (return ()) |
286 | ] | 309 | ] |
@@ -288,7 +311,7 @@ forkConnection k pingflag src snk stanzas = do | |||
288 | wlog $ "end pre-queue fork: " ++ show k | 311 | wlog $ "end pre-queue fork: " ++ show k |
289 | forkIO $ do | 312 | forkIO $ do |
290 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | 313 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) |
291 | src $$ xmppInbound k pingflag src stanzas output | 314 | src $$ xmppInbound k pingflag src stanzas output rdone |
292 | atomically $ putTMVar rdone () | 315 | atomically $ putTMVar rdone () |
293 | wlog $ "end reader fork: " ++ show k | 316 | wlog $ "end reader fork: " ++ show k |
294 | return output | 317 | return output |
@@ -337,12 +360,12 @@ monitor sv params = do | |||
337 | wlog $ tomsg k "ReadOnly" | 360 | wlog $ tomsg k "ReadOnly" |
338 | control sv (Connect (callBackAddress k) params) | 361 | control sv (Connect (callBackAddress k) params) |
339 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | 362 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" |
340 | RequiresPing -> wlog $ tomsg k "RequiresPing" | 363 | RequiresPing -> return () -- wlog $ tomsg k "RequiresPing" |
341 | _ -> return () | 364 | _ -> return () |
342 | , readTChan stanzas >>= \stanza -> return $ do | 365 | , readTChan stanzas >>= \stanza -> return $ do |
343 | xs <- readUntilNothing (stanzaChan stanza) | 366 | xs <- readUntilNothing (stanzaChan stanza) |
344 | wlog "" | 367 | wlog "" |
345 | prettyPrint "STANZA: " xs | 368 | prettyPrint "P-> " xs |
346 | ] | 369 | ] |
347 | action | 370 | action |
348 | loop | 371 | loop |
@@ -356,8 +379,8 @@ xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,Connection | |||
356 | xmppServer = do | 379 | xmppServer = do |
357 | sv <- server | 380 | sv <- server |
358 | let peer_params = (connectionDefaults peerKey) | 381 | let peer_params = (connectionDefaults peerKey) |
359 | { pingInterval = 10000 | 382 | { pingInterval = 5000 |
360 | , timeout = 10000 | 383 | , timeout = 1000 |
361 | , duplex = False } | 384 | , duplex = False } |
362 | client_params = connectionDefaults clientKey | 385 | client_params = connectionDefaults clientKey |
363 | liftIO $ do | 386 | liftIO $ do |