summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-11 20:38:16 -0500
committerjoe <joe@jerkface.net>2014-02-11 20:38:16 -0500
commit44cc78636b564d479d76be989fbb36fd1e720e10 (patch)
treed95779bd2b36a4054b87bd0a80490b034d32b7a9
parent81cb17933a8dd8f69cc3acbd54c1688469eec0a3 (diff)
send-ping framework for xmppServer
-rw-r--r--Presence/Server.hs4
-rw-r--r--xmppServer.hs45
2 files changed, 38 insertions, 11 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 4ee55821..1d586900 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -143,7 +143,7 @@ data InOrOut = In | Out
143data ConnectionEvent b 143data ConnectionEvent b
144 = Got b 144 = Got b
145 -- ^ Arrival of data from a socket 145 -- ^ Arrival of data from a socket
146 | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) 146 | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool)
147 -- ^ A new connection was established 147 -- ^ A new connection was established
148 | HalfConnection InOrOut 148 | HalfConnection InOrOut
149 -- ^ Half of a half-duplex connection is avaliable. 149 -- ^ Half of a half-duplex connection is avaliable.
@@ -314,7 +314,7 @@ killListener (thread,sock) = do sClose sock
314 314
315conevent con = Connection pingflag read write 315conevent con = Connection pingflag read write
316 where 316 where
317 pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False 317 pingflag = swapTVar (pingFlag (connPingTimer con)) False
318 read = connRead con 318 read = connRead con
319 write = connWrite con 319 write = connWrite con
320 320
diff --git a/xmppServer.hs b/xmppServer.hs
index 459d8f8d..48745e51 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -60,7 +60,7 @@ xmlStream conread conwrite = (xsrc,xsnk)
60 snk = awaitForever $ liftIO . conwrite 60 snk = awaitForever $ liftIO . conwrite
61 61
62 62
63type FlagCommand = IO Bool 63type FlagCommand = STM Bool
64type ReadCommand = IO (Maybe ByteString) 64type ReadCommand = IO (Maybe ByteString)
65type WriteCommand = ByteString -> IO Bool 65type WriteCommand = ByteString -> IO Bool
66 66
@@ -128,7 +128,7 @@ forkConnection :: ConnectionKey
128 -> Source IO XML.Event 128 -> Source IO XML.Event
129 -> Sink XML.Event IO () 129 -> Sink XML.Event IO ()
130 -> TChan Stanza 130 -> TChan Stanza
131 -> IO (Slotted.UpdateStream () XML.Event) 131 -> IO (TChan Stanza)
132forkConnection k pingflag src snk stanzas = do 132forkConnection k pingflag src snk stanzas = do
133 rdone <- atomically newEmptyTMVar 133 rdone <- atomically newEmptyTMVar
134 forkIO $ do 134 forkIO $ do
@@ -136,16 +136,42 @@ forkConnection k pingflag src snk stanzas = do
136 src $$ xmppInbound k pingflag src stanzas 136 src $$ xmppInbound k pingflag src stanzas
137 atomically $ putTMVar rdone () 137 atomically $ putTMVar rdone ()
138 wlog $ "end reader fork: " ++ show k 138 wlog $ "end reader fork: " ++ show k
139 output <- atomically $ Slotted.new isEventBeginElement isEventEndElement 139 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement
140 let _ = slots :: Slotted.UpdateStream () XML.Event
140 let slot_src = do 141 let slot_src = do
141 what <- lift . atomically $ orElse 142 what <- lift . atomically $ orElse
142 (Slotted.pull output >>= \x -> return $ do 143 (Slotted.pull slots >>= \x -> return $ do
143 yield x 144 yield x
144 slot_src) 145 slot_src)
145 (takeTMVar rdone >> return (return ())) 146 (readTMVar rdone >> return (return ()))
146 what 147 what
147 forkIO $ do slot_src $$ snk 148 forkIO $ do slot_src $$ snk
148 wlog $ "end writer fork: " ++ show k 149 wlog $ "end post-queue fork: " ++ show k
150 output <- atomically newTChan
151 forkIO $ do
152 fix $ \loop -> do
153 what <- atomically $ foldr1 orElse
154 [readTChan output >>= \stanza -> return $ do
155 let xchan = stanzaChan stanza
156 fix $ \inner -> do
157 what <- atomically $ orElse
158 (readTChan xchan >>= \mxml -> return $ do
159 case mxml of
160 Just xml -> do
161 atomically $ Slotted.push slots Nothing xml
162 inner
163 Nothing -> return ())
164 (readTMVar rdone >> return (return ()))
165 what
166 loop
167 ,do pingflag >>= check
168 return $ do
169 wlog $ "TODO: send ping"
170 loop
171 ,readTMVar rdone >> return (return ())
172 ]
173 what
174 wlog $ "end pre-queue fork: " ++ show k
149 return output 175 return output
150 176
151monitor sv params = do 177monitor sv params = do
@@ -168,7 +194,8 @@ monitor sv params = do
168 _ -> return () 194 _ -> return ()
169 , readTChan stanzas >>= \stanza -> return $ do 195 , readTChan stanzas >>= \stanza -> return $ do
170 xs <- chanContents (stanzaChan stanza) 196 xs <- chanContents (stanzaChan stanza)
171 prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs) 197 wlog ""
198 prettyPrint "STANZA: " (catMaybes xs)
172 ] 199 ]
173 action 200 action
174 loop 201 loop
@@ -195,8 +222,8 @@ main = runResourceT $ do
195 sv <- server 222 sv <- server
196 lift $ do 223 lift $ do
197 peer_params <- return (connectionDefaults peerKey) 224 peer_params <- return (connectionDefaults peerKey)
198 { pingInterval = 0 225 { pingInterval = 5000
199 , timeout = 0 226 , timeout = 5000
200 , duplex = False } 227 , duplex = False }
201 client_params <- return $ connectionDefaults clientKey 228 client_params <- return $ connectionDefaults clientKey
202 forkIO $ monitor sv peer_params 229 forkIO $ monitor sv peer_params