diff options
-rw-r--r-- | Presence/Server.hs | 4 | ||||
-rw-r--r-- | xmppServer.hs | 45 |
2 files changed, 38 insertions, 11 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 4ee55821..1d586900 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -143,7 +143,7 @@ data InOrOut = In | Out | |||
143 | data ConnectionEvent b | 143 | data ConnectionEvent b |
144 | = Got b | 144 | = Got b |
145 | -- ^ Arrival of data from a socket | 145 | -- ^ Arrival of data from a socket |
146 | | Connection (IO Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) | 146 | | Connection (STM Bool) (IO (Maybe ByteString)) (ByteString -> IO Bool) |
147 | -- ^ A new connection was established | 147 | -- ^ A new connection was established |
148 | | HalfConnection InOrOut | 148 | | HalfConnection InOrOut |
149 | -- ^ Half of a half-duplex connection is avaliable. | 149 | -- ^ Half of a half-duplex connection is avaliable. |
@@ -314,7 +314,7 @@ killListener (thread,sock) = do sClose sock | |||
314 | 314 | ||
315 | conevent con = Connection pingflag read write | 315 | conevent con = Connection pingflag read write |
316 | where | 316 | where |
317 | pingflag = atomically $ swapTVar (pingFlag (connPingTimer con)) False | 317 | pingflag = swapTVar (pingFlag (connPingTimer con)) False |
318 | read = connRead con | 318 | read = connRead con |
319 | write = connWrite con | 319 | write = connWrite con |
320 | 320 | ||
diff --git a/xmppServer.hs b/xmppServer.hs index 459d8f8d..48745e51 100644 --- a/xmppServer.hs +++ b/xmppServer.hs | |||
@@ -60,7 +60,7 @@ xmlStream conread conwrite = (xsrc,xsnk) | |||
60 | snk = awaitForever $ liftIO . conwrite | 60 | snk = awaitForever $ liftIO . conwrite |
61 | 61 | ||
62 | 62 | ||
63 | type FlagCommand = IO Bool | 63 | type FlagCommand = STM Bool |
64 | type ReadCommand = IO (Maybe ByteString) | 64 | type ReadCommand = IO (Maybe ByteString) |
65 | type WriteCommand = ByteString -> IO Bool | 65 | type WriteCommand = ByteString -> IO Bool |
66 | 66 | ||
@@ -128,7 +128,7 @@ forkConnection :: ConnectionKey | |||
128 | -> Source IO XML.Event | 128 | -> Source IO XML.Event |
129 | -> Sink XML.Event IO () | 129 | -> Sink XML.Event IO () |
130 | -> TChan Stanza | 130 | -> TChan Stanza |
131 | -> IO (Slotted.UpdateStream () XML.Event) | 131 | -> IO (TChan Stanza) |
132 | forkConnection k pingflag src snk stanzas = do | 132 | forkConnection k pingflag src snk stanzas = do |
133 | rdone <- atomically newEmptyTMVar | 133 | rdone <- atomically newEmptyTMVar |
134 | forkIO $ do | 134 | forkIO $ do |
@@ -136,16 +136,42 @@ forkConnection k pingflag src snk stanzas = do | |||
136 | src $$ xmppInbound k pingflag src stanzas | 136 | src $$ xmppInbound k pingflag src stanzas |
137 | atomically $ putTMVar rdone () | 137 | atomically $ putTMVar rdone () |
138 | wlog $ "end reader fork: " ++ show k | 138 | wlog $ "end reader fork: " ++ show k |
139 | output <- atomically $ Slotted.new isEventBeginElement isEventEndElement | 139 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement |
140 | let _ = slots :: Slotted.UpdateStream () XML.Event | ||
140 | let slot_src = do | 141 | let slot_src = do |
141 | what <- lift . atomically $ orElse | 142 | what <- lift . atomically $ orElse |
142 | (Slotted.pull output >>= \x -> return $ do | 143 | (Slotted.pull slots >>= \x -> return $ do |
143 | yield x | 144 | yield x |
144 | slot_src) | 145 | slot_src) |
145 | (takeTMVar rdone >> return (return ())) | 146 | (readTMVar rdone >> return (return ())) |
146 | what | 147 | what |
147 | forkIO $ do slot_src $$ snk | 148 | forkIO $ do slot_src $$ snk |
148 | wlog $ "end writer fork: " ++ show k | 149 | wlog $ "end post-queue fork: " ++ show k |
150 | output <- atomically newTChan | ||
151 | forkIO $ do | ||
152 | fix $ \loop -> do | ||
153 | what <- atomically $ foldr1 orElse | ||
154 | [readTChan output >>= \stanza -> return $ do | ||
155 | let xchan = stanzaChan stanza | ||
156 | fix $ \inner -> do | ||
157 | what <- atomically $ orElse | ||
158 | (readTChan xchan >>= \mxml -> return $ do | ||
159 | case mxml of | ||
160 | Just xml -> do | ||
161 | atomically $ Slotted.push slots Nothing xml | ||
162 | inner | ||
163 | Nothing -> return ()) | ||
164 | (readTMVar rdone >> return (return ())) | ||
165 | what | ||
166 | loop | ||
167 | ,do pingflag >>= check | ||
168 | return $ do | ||
169 | wlog $ "TODO: send ping" | ||
170 | loop | ||
171 | ,readTMVar rdone >> return (return ()) | ||
172 | ] | ||
173 | what | ||
174 | wlog $ "end pre-queue fork: " ++ show k | ||
149 | return output | 175 | return output |
150 | 176 | ||
151 | monitor sv params = do | 177 | monitor sv params = do |
@@ -168,7 +194,8 @@ monitor sv params = do | |||
168 | _ -> return () | 194 | _ -> return () |
169 | , readTChan stanzas >>= \stanza -> return $ do | 195 | , readTChan stanzas >>= \stanza -> return $ do |
170 | xs <- chanContents (stanzaChan stanza) | 196 | xs <- chanContents (stanzaChan stanza) |
171 | prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs) | 197 | wlog "" |
198 | prettyPrint "STANZA: " (catMaybes xs) | ||
172 | ] | 199 | ] |
173 | action | 200 | action |
174 | loop | 201 | loop |
@@ -195,8 +222,8 @@ main = runResourceT $ do | |||
195 | sv <- server | 222 | sv <- server |
196 | lift $ do | 223 | lift $ do |
197 | peer_params <- return (connectionDefaults peerKey) | 224 | peer_params <- return (connectionDefaults peerKey) |
198 | { pingInterval = 0 | 225 | { pingInterval = 5000 |
199 | , timeout = 0 | 226 | , timeout = 5000 |
200 | , duplex = False } | 227 | , duplex = False } |
201 | client_params <- return $ connectionDefaults clientKey | 228 | client_params <- return $ connectionDefaults clientKey |
202 | forkIO $ monitor sv peer_params | 229 | forkIO $ monitor sv peer_params |