diff options
author | joe <joe@jerkface.net> | 2013-06-28 21:19:13 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-06-28 21:19:13 -0400 |
commit | 31cca9914be082552119e0be863f7a16629c079c (patch) | |
tree | 79fdab3f628adcc6624f2a179370de166d1a8598 | |
parent | b7e6f3164af9149c432451e7ffc344f8d7a2f55a (diff) |
Attempt to control buffer sends with renderBytes...
-rw-r--r-- | Presence/ServerC.hs | 9 | ||||
-rw-r--r-- | Presence/XMPP.hs | 22 |
2 files changed, 22 insertions, 9 deletions
diff --git a/Presence/ServerC.hs b/Presence/ServerC.hs index 36e2d7bf..ae0bf154 100644 --- a/Presence/ServerC.hs +++ b/Presence/ServerC.hs | |||
@@ -18,6 +18,7 @@ import Data.ByteString.Char8 | |||
18 | ) | 18 | ) |
19 | import qualified Data.ByteString.Char8 as S | 19 | import qualified Data.ByteString.Char8 as S |
20 | ( hPutStr | 20 | ( hPutStr |
21 | , hPutStrLn | ||
21 | ) | 22 | ) |
22 | import System.IO | 23 | import System.IO |
23 | ( IOMode(..) | 24 | ( IOMode(..) |
@@ -115,18 +116,20 @@ doServer (HCons family port) g = runServer port (runConn g) | |||
115 | 116 | ||
116 | packets :: MonadIO m => Handle -> Source m S.ByteString | 117 | packets :: MonadIO m => Handle -> Source m S.ByteString |
117 | packets h = do | 118 | packets h = do |
118 | packet <- lift $ liftIO $ getPacket h | 119 | packet <- liftIO $ getPacket h |
119 | yield packet | 120 | yield packet |
120 | isEof <- lift $ liftIO $ hIsEOF h | 121 | isEof <- liftIO $ hIsEOF h |
121 | when (not isEof) (packets h) | 122 | when (not isEof) (packets h) |
122 | where | 123 | where |
123 | getPacket h = do { hWaitForInput h (-1) ; hGetNonBlocking h 1024 } | 124 | getPacket h = do { hWaitForInput h (-1) ; hGetNonBlocking h 1024 } |
124 | 125 | ||
125 | outgoing :: MonadIO m => Handle -> Sink S.ByteString m () | 126 | outgoing :: MonadIO m => Handle -> Sink S.ByteString m () |
126 | outgoing h = do | 127 | outgoing h = do |
128 | liftIO . L.putStrLn $ "outgoing: waiting" | ||
127 | mpacket <- await | 129 | mpacket <- await |
130 | liftIO . L.putStrLn $ "outgoing: got packet " <++> bshow mpacket | ||
128 | maybe (return ()) | 131 | maybe (return ()) |
129 | (\r -> (lift . liftIO . S.hPutStr h $ r) >> outgoing h) | 132 | (\r -> (liftIO . S.hPutStrLn h $ r) >> outgoing h) |
130 | mpacket | 133 | mpacket |
131 | 134 | ||
132 | 135 | ||
diff --git a/Presence/XMPP.hs b/Presence/XMPP.hs index 8508c3b4..f81af7c0 100644 --- a/Presence/XMPP.hs +++ b/Presence/XMPP.hs | |||
@@ -29,6 +29,7 @@ import Control.Concurrent (forkIO,killThread) | |||
29 | import Control.Concurrent.Async | 29 | import Control.Concurrent.Async |
30 | import Control.Exception (handle,SomeException(..),finally) | 30 | import Control.Exception (handle,SomeException(..),finally) |
31 | import Control.Monad.IO.Class | 31 | import Control.Monad.IO.Class |
32 | import Control.Monad.Trans.Class | ||
32 | import Control.Monad.Trans.Maybe | 33 | import Control.Monad.Trans.Maybe |
33 | import Todo | 34 | import Todo |
34 | import Control.Monad as Monad | 35 | import Control.Monad as Monad |
@@ -136,9 +137,12 @@ gatherElement opentag empty = gatherElement' (empty `mplus` return opentag) 1 | |||
136 | if (cnt>0) then gatherElement' ts' cnt' | 137 | if (cnt>0) then gatherElement' ts' cnt' |
137 | else return ts' | 138 | else return ts' |
138 | 139 | ||
140 | voidMaybeT body = (>> return ()) . runMaybeT $ body | ||
141 | fixMaybeT f = (>> return ()) . runMaybeT . fix $ f | ||
142 | |||
139 | fromClient :: (MonadIO m, XMPPSession session) => | 143 | fromClient :: (MonadIO m, XMPPSession session) => |
140 | session -> TChan Commands -> Sink XML.Event m () | 144 | session -> TChan Commands -> Sink XML.Event m () |
141 | fromClient session cmdChan = (>>return ()) . runMaybeT $ do | 145 | fromClient session cmdChan = voidMaybeT $ do |
142 | let log = liftIO . L.putStrLn . ("client-in: " <++>) | 146 | let log = liftIO . L.putStrLn . ("client-in: " <++>) |
143 | send = liftIO . atomically . writeTChan cmdChan . Send | 147 | send = liftIO . atomically . writeTChan cmdChan . Send |
144 | mawait >>= guard . (==EventBeginDocument) | 148 | mawait >>= guard . (==EventBeginDocument) |
@@ -172,17 +176,17 @@ prettyPrint prefix xs = | |||
172 | S.putStrLn prefix | 176 | S.putStrLn prefix |
173 | CL.sourceList xs $= renderBytes (def { rsPretty=True }) $$ CL.mapM_ S.putStr | 177 | CL.sourceList xs $= renderBytes (def { rsPretty=True }) $$ CL.mapM_ S.putStr |
174 | 178 | ||
175 | toClient :: MonadIO m => TChan Presence -> TChan Commands -> Source m XML.Event | 179 | toClient :: MonadIO m => TChan Presence -> TChan Commands -> Source m [XML.Event] |
176 | toClient pchan cmdChan = fix $ \loop -> do | 180 | toClient pchan cmdChan = fix $ \loop -> do |
177 | event <- liftIO . atomically $ | 181 | event <- liftIO . atomically $ |
178 | orElse (fmap Left $ readTChan pchan) | 182 | orElse (fmap Left $ readTChan pchan) |
179 | (fmap Right $ readTChan cmdChan) | 183 | (fmap Right $ readTChan cmdChan) |
180 | case event of | 184 | case event of |
181 | Right QuitThread -> return () | 185 | Right QuitThread -> return () |
182 | Right (Send xs) -> mapM_ yield xs >> prettyPrint "client-out: " xs >> loop | 186 | Right (Send xs) -> yield xs >> loop -- prettyPrint "client-out: " xs >> loop |
183 | Left presence -> do | 187 | Left presence -> do |
184 | xs <- liftIO $ xmlifyPresenceForClient presence | 188 | xs <- liftIO $ xmlifyPresenceForClient presence |
185 | Monad.mapM_ yield xs | 189 | yield xs |
186 | loop | 190 | loop |
187 | 191 | ||
188 | handleClient | 192 | handleClient |
@@ -198,7 +202,7 @@ handleClient st src snk = do | |||
198 | pchan <- subscribe session Nothing | 202 | pchan <- subscribe session Nothing |
199 | cmdChan <- atomically newTChan | 203 | cmdChan <- atomically newTChan |
200 | 204 | ||
201 | writer <- async ( toClient pchan cmdChan $$ renderBytes def =$ snk ) | 205 | writer <- async ( toClient pchan cmdChan $$ renderChunks =$ snk ) |
202 | finally ( src $= parseBytes def $$ fromClient session cmdChan ) | 206 | finally ( src $= parseBytes def $$ fromClient session cmdChan ) |
203 | $ do | 207 | $ do |
204 | atomically $ writeTChan cmdChan QuitThread | 208 | atomically $ writeTChan cmdChan QuitThread |
@@ -226,3 +230,9 @@ seekRemotePeers config chan = do | |||
226 | putStrLn "unimplemented: seekRemotePeers" | 230 | putStrLn "unimplemented: seekRemotePeers" |
227 | -- TODO | 231 | -- TODO |
228 | return () | 232 | return () |
233 | |||
234 | renderChunks :: (MonadUnsafeIO m, MonadIO m) => ConduitM [Event] ByteString m () | ||
235 | renderChunks = fixMaybeT $ \loop -> do | ||
236 | xs <- mawait | ||
237 | lift . when (not . null $ xs) $ ( CL.sourceList xs $= renderBytes def $$ CL.mapM_ yield ) | ||
238 | loop | ||