diff options
author | joe <joe@jerkface.net> | 2014-02-10 22:38:46 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-10 22:38:46 -0500 |
commit | 2d6dae13be15b61778ed35b3501df94a8e9dd78f (patch) | |
tree | 2ba13a1cc31886776614770a43ed18c60bfb39e3 | |
parent | afff51ac877ce8807801334745f1679dbf6440d0 (diff) |
more xmppServer work
-rw-r--r-- | Control/Concurrent/STM/StatusCache.hs | 142 | ||||
-rw-r--r-- | Control/Concurrent/STM/UpdateStream.hs | 165 | ||||
-rw-r--r-- | Presence/NestingXML.hs | 10 | ||||
-rw-r--r-- | xmppServer.hs | 160 |
4 files changed, 457 insertions, 20 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs new file mode 100644 index 00000000..601de14c --- /dev/null +++ b/Control/Concurrent/STM/StatusCache.hs | |||
@@ -0,0 +1,142 @@ | |||
1 | --------------------------------------------------------------------------- | ||
2 | -- | | ||
3 | -- Module : Control.Concurrent.STM.StatusCache | ||
4 | -- | ||
5 | -- | ||
6 | -- Maintainer : joe@jerkface.net | ||
7 | -- Stability : experimental | ||
8 | -- | ||
9 | -- Motivation: Suppose you must communicate state changes over a stream. If | ||
10 | -- the stream is consumed slowly, then it may occur that backlogged state | ||
11 | -- changes are obsoleted by newer changes in state. In this case, it is | ||
12 | -- desirable to discard the obsolete messages from the stream. | ||
13 | -- | ||
14 | -- A streamed status message might be very large, and it would be wasteful in | ||
15 | -- both time and space, to treat it as a monolithic blob that must be built | ||
16 | -- completely before it can be sent. Therefore, we require that each message | ||
17 | -- consist of a stream of smaller chunks of type @x@ and we require predicates | ||
18 | -- that indicate when a chunk starts a new message and when it ends a message. | ||
19 | -- | ||
20 | -- In the folowing example, our chunk type is Char and complete messages are | ||
21 | -- delimited by the characters '(' and ')'. We process the input stream | ||
22 | -- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an | ||
23 | -- efficient dedicated thread. The result follows: | ||
24 | -- | ||
25 | -- > Backlogged consumer: (ccccc) | ||
26 | -- > Fast consumer: (aaaa)(bb)(ccccc) | ||
27 | -- | ||
28 | -- The complete source code: | ||
29 | -- | ||
30 | -- > import Control.Monad (when, forever, (>=>)) | ||
31 | -- > import Control.Monad.STM (atomically) | ||
32 | -- > import Control.Concurrent (forkIO, threadDelay) | ||
33 | -- > import System.IO (hFlush, stdout) | ||
34 | -- > import qualified Control.Concurrent.STM.StatusCache as Cache | ||
35 | -- > | ||
36 | -- > while pred body = | ||
37 | -- > pred >>= flip when (body >> while pred body) | ||
38 | -- > | ||
39 | -- > main = do | ||
40 | -- > q <- atomically $ Cache.new (== '(') (==')') | ||
41 | -- > | ||
42 | -- > putStr $ "Backlogged consumer: " | ||
43 | -- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)" | ||
44 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | ||
45 | -- > c <- atomically $ Cache.pull q | ||
46 | -- > putChar c | ||
47 | -- > putStrLn "" | ||
48 | -- > hFlush stdout | ||
49 | -- > | ||
50 | -- > putStr "Fast consumer: " | ||
51 | -- > forkIO $ forever $ do | ||
52 | -- > c <- atomically $ Cache.pull q | ||
53 | -- > putChar c | ||
54 | -- > hFlush stdout | ||
55 | -- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000)) | ||
56 | -- > "(aaaa)(bb)(ccccc)" | ||
57 | -- > putStrLn "" | ||
58 | -- | ||
59 | -- As shown above, it is intended that this module be imported qualified. | ||
60 | -- | ||
61 | module Control.Concurrent.STM.StatusCache | ||
62 | ( StatusCache | ||
63 | , new | ||
64 | , push | ||
65 | , pull | ||
66 | , isStopper | ||
67 | , isStarter | ||
68 | , isEmpty | ||
69 | ) where | ||
70 | import Control.Monad | ||
71 | import Control.Monad.STM | ||
72 | import Control.Concurrent.STM.TVar | ||
73 | import Control.Concurrent.STM.TChan | ||
74 | |||
75 | data StatusCache x = | ||
76 | StatusCache { feed :: TVar (TChan x) | ||
77 | , cache :: TVar (TChan x) | ||
78 | , feedFlag :: TVar Bool | ||
79 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | ||
80 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | ||
81 | } | ||
82 | |||
83 | -- | @new isStart isStop@ | ||
84 | -- | ||
85 | -- The @isStart@ and @isStop@ predicates indicate when an element | ||
86 | -- begins or ends a message. | ||
87 | new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x) | ||
88 | new isStart isStop = do | ||
89 | feed <- newTChan >>= newTVar | ||
90 | cache <- newTChan >>= newTVar | ||
91 | flag <- newTVar True | ||
92 | return StatusCache { feed = feed | ||
93 | , cache = cache | ||
94 | , feedFlag = flag | ||
95 | , isStarter = isStart | ||
96 | , isStopper = isStop | ||
97 | } | ||
98 | |||
99 | |||
100 | -- | Pull off a chunk from the 'StatusCache' for processing. | ||
101 | -- | ||
102 | -- If chunks are not pulled off quickly, they may be obsoleted | ||
103 | -- and discarded when new messages are 'push'ed. | ||
104 | pull :: StatusCache x -> STM x | ||
105 | pull q = do | ||
106 | hasCache <- readTVar (feedFlag q) | ||
107 | exhausted <- readTVar (feed q) >>= isEmptyTChan | ||
108 | when (hasCache && exhausted) $ do | ||
109 | next <- newTChan >>= swapTVar (cache q) | ||
110 | writeTVar (feedFlag q) False | ||
111 | writeTVar (feed q) next | ||
112 | chan <- readTVar $ feed q | ||
113 | exhausted <- isEmptyTChan chan | ||
114 | if exhausted then retry | ||
115 | else do | ||
116 | v <- readTChan chan | ||
117 | when (isStarter q v) | ||
118 | $ writeTVar (feedFlag q) False | ||
119 | return v | ||
120 | |||
121 | -- | Enqueue a chunk into the 'StatusCache'. | ||
122 | push :: StatusCache a -> a -> STM () | ||
123 | push q v = do | ||
124 | shouldCache <- readTVar (feedFlag q) | ||
125 | chan <- | ||
126 | if shouldCache then do | ||
127 | when (isStarter q v) | ||
128 | $ newTChan | ||
129 | >>= writeTVar (cache q) | ||
130 | readTVar $ cache q | ||
131 | else do | ||
132 | when (isStopper q v) | ||
133 | $ writeTVar (feedFlag q) True | ||
134 | readTVar $ feed q | ||
135 | writeTChan chan v | ||
136 | |||
137 | -- | True when the 'StatusCache' is completely exhuasted. | ||
138 | isEmpty :: StatusCache x -> STM Bool | ||
139 | isEmpty q = do | ||
140 | empty_feed <- readTVar (feed q) >>= isEmptyTChan | ||
141 | empty_cache <- readTVar (cache q) >>= isEmptyTChan | ||
142 | return $ empty_feed && empty_cache | ||
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs new file mode 100644 index 00000000..a92168c0 --- /dev/null +++ b/Control/Concurrent/STM/UpdateStream.hs | |||
@@ -0,0 +1,165 @@ | |||
1 | --------------------------------------------------------------------------- | ||
2 | -- | | ||
3 | -- Module : Control.Concurrent.STM.UpdateStream | ||
4 | -- | ||
5 | -- | ||
6 | -- Maintainer : joe@jerkface.net | ||
7 | -- Stability : experimental | ||
8 | -- | ||
9 | -- An UpdateSream consists of pass-through messages that are queued up and | ||
10 | -- slotted messages which might be obsoleted and discarded if they are not | ||
11 | -- consumed before newer slotted messages with the same slot value occur. | ||
12 | -- | ||
13 | -- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is | ||
14 | -- recommended that you read that documentation first. | ||
15 | -- | ||
16 | -- For example, the output | ||
17 | -- | ||
18 | -- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here) | ||
19 | -- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here) | ||
20 | -- | ||
21 | -- is produced by the following code: | ||
22 | -- | ||
23 | -- > import Control.Monad (when, forever, (>=>)) | ||
24 | -- > import Control.Monad.STM (atomically) | ||
25 | -- > import Control.Concurrent (forkIO, threadDelay) | ||
26 | -- > import System.IO (hFlush, stdout) | ||
27 | -- > import qualified Control.Concurrent.STM.UpdateStream as Cache | ||
28 | -- > | ||
29 | -- > messages :: [(Maybe String, Char)] | ||
30 | -- > messages = concat | ||
31 | -- > [ slot "x" "(set x = 2)" | ||
32 | -- > , message "(Hello)" | ||
33 | -- > , slot "y" "(set y = 23)" | ||
34 | -- > , message "(Joe)" | ||
35 | -- > , slot "y" "(set y = 100)" | ||
36 | -- > , message "(was)" | ||
37 | -- > , slot "x" "(set x = 5)" | ||
38 | -- > , message "(here)" | ||
39 | -- > ] | ||
40 | -- > where | ||
41 | -- > slot v cs = map ((,) (Just v)) cs | ||
42 | -- > message cs = map ((,) Nothing) cs | ||
43 | -- > | ||
44 | -- > main = do | ||
45 | -- > q <- atomically $ Cache.new (== '(') (==')') | ||
46 | -- > let go = mapM_ (atomically . (uncurry $ Cache.push q) | ||
47 | -- > >=> const (threadDelay 10000)) | ||
48 | -- > messages | ||
49 | -- > slowly = do | ||
50 | -- > while (atomically $ fmap not $ Cache.isEmpty q) $ do | ||
51 | -- > c <- atomically $ Cache.pull q | ||
52 | -- > putChar c | ||
53 | -- > putStrLn "" | ||
54 | -- > hFlush stdout | ||
55 | -- > where while pred body = | ||
56 | -- > pred >>= flip when (body >> while pred body) | ||
57 | -- > quickly = forkIO . forever $ do | ||
58 | -- > c <- atomically $ Cache.pull q | ||
59 | -- > putChar c | ||
60 | -- > hFlush stdout | ||
61 | -- > putStr $ "Backlogged consumer: " | ||
62 | -- > go >> slowly | ||
63 | -- > putStr "Fast consumer: " | ||
64 | -- > quickly >> go | ||
65 | -- > putStrLn "" | ||
66 | -- | ||
67 | module Control.Concurrent.STM.UpdateStream | ||
68 | ( UpdateStream | ||
69 | , new | ||
70 | , push | ||
71 | , pull | ||
72 | , isStopper | ||
73 | , isStarter | ||
74 | , isEmpty | ||
75 | ) where | ||
76 | |||
77 | import Control.Monad | ||
78 | import Control.Monad.STM | ||
79 | import Control.Concurrent.STM.TVar | ||
80 | import Control.Concurrent.STM.TChan | ||
81 | import Control.Concurrent.STM.StatusCache (StatusCache) | ||
82 | import qualified Control.Concurrent.STM.StatusCache as Status | ||
83 | import Data.Map (Map) | ||
84 | import qualified Data.Map as Map | ||
85 | import Data.Maybe | ||
86 | import Data.Foldable (foldlM) | ||
87 | |||
88 | data UpdateStream slot x = | ||
89 | UpdateStream { cache :: TVar (Map slot (StatusCache x)) | ||
90 | , events :: TChan x | ||
91 | , inMessage :: TVar (Maybe slot) | ||
92 | , isStarter :: x -> Bool -- ^ True if the given chunk begins a message. | ||
93 | , isStopper :: x -> Bool -- ^ True if the given chunk ends a message. | ||
94 | } | ||
95 | |||
96 | -- | @new isStart isStop@ | ||
97 | -- | ||
98 | -- The @isStart@ and @isStop@ predicates indicate when an element | ||
99 | -- begins or ends a message. | ||
100 | new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x) | ||
101 | new isStart isStop = do | ||
102 | cache <- newTVar Map.empty | ||
103 | events <- newTChan | ||
104 | inMessage <- newTVar Nothing | ||
105 | return UpdateStream { cache = cache | ||
106 | , events = events | ||
107 | , inMessage = inMessage | ||
108 | , isStarter = isStart | ||
109 | , isStopper = isStop | ||
110 | } | ||
111 | |||
112 | -- | Enqueue a chunk into the 'UpdateStream' | ||
113 | -- | ||
114 | -- If a slot is provided, then the message may be obsoleted when a new message | ||
115 | -- starts with the same slot value. Otherwise, the chunk is preserved. Note | ||
116 | -- that the same slot value must be passed again for every chunk of the message | ||
117 | -- as it is not remembered. | ||
118 | push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM () | ||
119 | push u Nothing x = writeTChan (events u) x | ||
120 | push u (Just n) x = do | ||
121 | smap <- readTVar (cache u) | ||
122 | scache <- | ||
123 | case Map.lookup n smap of | ||
124 | Just scache -> return scache | ||
125 | Nothing -> do | ||
126 | scache <- Status.new (isStarter u) (isStopper u) | ||
127 | modifyTVar (cache u) (Map.insert n scache) | ||
128 | return scache | ||
129 | |||
130 | Status.push scache x | ||
131 | |||
132 | -- | Pull off a chunk from the 'UpdateStream' for processing. | ||
133 | -- | ||
134 | -- If chunks are not pulled off quickly, they may be obsoleted | ||
135 | -- and discarded when new messages are 'push'ed. | ||
136 | pull :: Ord slot => UpdateStream slot x -> STM x | ||
137 | pull u = do | ||
138 | (inm, mbscache) <- do | ||
139 | mn <- readTVar $ inMessage u | ||
140 | map <- readTVar (cache u) | ||
141 | return $ (isJust mn, mn >>= flip Map.lookup map) | ||
142 | let action = | ||
143 | case (inm,mbscache) of | ||
144 | (True,Just scache) -> Status.pull scache | ||
145 | (True,Nothing) -> readTChan (events u) | ||
146 | (False,_) -> do | ||
147 | cs <- fmap (Map.toList) $ readTVar (cache u) | ||
148 | cs <- filterM (return . not <=< Status.isEmpty . snd) | ||
149 | cs | ||
150 | maybe (readTChan $ events u) | ||
151 | (\(n,scache) -> do | ||
152 | writeTVar (inMessage u) (Just n) | ||
153 | Status.pull scache) | ||
154 | (listToMaybe cs) | ||
155 | x <- action | ||
156 | when (isStopper u x) $ writeTVar (inMessage u) Nothing | ||
157 | return x | ||
158 | |||
159 | -- | True when the 'UpdateStream' is completely exhuasted. | ||
160 | isEmpty :: UpdateStream slot x -> STM Bool | ||
161 | isEmpty q = do | ||
162 | e <- isEmptyTChan (events q) | ||
163 | qs <- readTVar (cache q) | ||
164 | d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs | ||
165 | return $ e && d | ||
diff --git a/Presence/NestingXML.hs b/Presence/NestingXML.hs index bf12c9ae..c26e3d5c 100644 --- a/Presence/NestingXML.hs +++ b/Presence/NestingXML.hs | |||
@@ -79,6 +79,16 @@ awaitCloser lvl = do | |||
79 | withXML $ \xml -> do | 79 | withXML $ \xml -> do |
80 | loop | 80 | loop |
81 | 81 | ||
82 | doUntilCloser :: Monad m | ||
83 | => Int -> (Event -> NestingXML o m ()) -> NestingXML o m () | ||
84 | doUntilCloser lvl thunk = do | ||
85 | fix $ \loop -> do | ||
86 | lvl' <- nesting | ||
87 | when (lvl' >= lvl) $ do | ||
88 | withXML $ \xml -> do | ||
89 | thunk xml | ||
90 | loop | ||
91 | |||
82 | nextElement :: Monad m => NestingXML o m (Maybe Event) | 92 | nextElement :: Monad m => NestingXML o m (Maybe Event) |
83 | nextElement = do | 93 | nextElement = do |
84 | lvl <- nesting | 94 | lvl <- nesting |
diff --git a/xmppServer.hs b/xmppServer.hs index c720ea5f..459d8f8d 100644 --- a/xmppServer.hs +++ b/xmppServer.hs | |||
@@ -1,6 +1,9 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
1 | import Control.Monad.Trans.Resource (runResourceT) | 2 | import Control.Monad.Trans.Resource (runResourceT) |
2 | import Control.Monad.Trans (lift) | 3 | import Control.Monad.Trans (lift) |
4 | import Control.Monad.IO.Class (liftIO) | ||
3 | import Control.Monad.Fix (fix) | 5 | import Control.Monad.Fix (fix) |
6 | import Control.Monad | ||
4 | import Control.Concurrent (forkIO) | 7 | import Control.Concurrent (forkIO) |
5 | import Control.Concurrent.STM | 8 | import Control.Concurrent.STM |
6 | -- import Control.Concurrent.STM.TChan | 9 | -- import Control.Concurrent.STM.TChan |
@@ -8,18 +11,42 @@ import Network.Socket | |||
8 | import XMPPTypes (withPort) | 11 | import XMPPTypes (withPort) |
9 | import Text.Printf | 12 | import Text.Printf |
10 | import System.Posix.Signals | 13 | import System.Posix.Signals |
14 | import Data.ByteString (ByteString) | ||
15 | import qualified Data.ByteString.Char8 as Strict8 | ||
16 | -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 | ||
11 | 17 | ||
12 | import Data.Conduit | 18 | import Data.Conduit |
19 | import qualified Data.Conduit.List as CL | ||
20 | import qualified Data.Conduit.Binary as CB | ||
13 | 21 | ||
14 | import qualified Text.XML.Stream.Render as XML | 22 | import qualified Text.XML.Stream.Render as XML |
15 | import qualified Text.XML.Stream.Parse as XML | 23 | import qualified Text.XML.Stream.Parse as XML |
24 | import Data.XML.Types as XML | ||
25 | import Data.Maybe (catMaybes) | ||
26 | import Data.Monoid ( (<>) ) | ||
16 | 27 | ||
28 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | ||
29 | import ControlMaybe | ||
30 | import NestingXML | ||
17 | import Server | 31 | import Server |
18 | 32 | ||
33 | |||
19 | wlog s = putStrLn s | 34 | wlog s = putStrLn s |
35 | where _ = s :: String | ||
36 | wlogb s = Strict8.putStrLn s | ||
20 | 37 | ||
21 | control sv = atomically . putTMVar (serverCommand sv) | 38 | control sv = atomically . putTMVar (serverCommand sv) |
22 | 39 | ||
40 | -- Note: This function ignores name space qualification | ||
41 | elementAttrs expected (EventBeginElement name attrs) | ||
42 | | nameLocalName name==expected | ||
43 | = return attrs | ||
44 | elementAttrs _ _ = mzero | ||
45 | |||
46 | getStreamName (EventBeginElement name _) = name | ||
47 | |||
48 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event | ||
49 | , Sink XML.Event IO () ) | ||
23 | xmlStream conread conwrite = (xsrc,xsnk) | 50 | xmlStream conread conwrite = (xsrc,xsnk) |
24 | where | 51 | where |
25 | xsrc = src $= XML.parseBytes XML.def | 52 | xsrc = src $= XML.parseBytes XML.def |
@@ -30,34 +57,125 @@ xmlStream conread conwrite = (xsrc,xsnk) | |||
30 | maybe (return ()) -- lift . wlog $ "conread: Nothing") | 57 | maybe (return ()) -- lift . wlog $ "conread: Nothing") |
31 | (\v -> yield v >> src) | 58 | (\v -> yield v >> src) |
32 | v | 59 | v |
33 | snk = awaitForever $ lift . conwrite | 60 | snk = awaitForever $ liftIO . conwrite |
34 | 61 | ||
35 | 62 | ||
36 | forkConnection k pingflag src snk = do | 63 | type FlagCommand = IO Bool |
64 | type ReadCommand = IO (Maybe ByteString) | ||
65 | type WriteCommand = ByteString -> IO Bool | ||
66 | |||
67 | data Stanza | ||
68 | = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | ||
69 | |||
70 | prettyPrint prefix xs = | ||
71 | liftIO $ | ||
72 | CL.sourceList xs | ||
73 | $= XML.renderBytes (XML.def { XML.rsPretty=True }) | ||
74 | =$= CB.lines | ||
75 | $$ CL.mapM_ (wlogb . (prefix <>)) | ||
76 | |||
77 | xmppInbound :: ConnectionKey -> FlagCommand | ||
78 | -> Source IO XML.Event | ||
79 | -> TChan Stanza | ||
80 | -> Sink XML.Event IO () | ||
81 | xmppInbound k pingflag src stanzas = doNestingXML $ do | ||
82 | withXML $ \begindoc -> do | ||
83 | when (begindoc==EventBeginDocument) $ do | ||
84 | -- liftIO . wlog $ "begin-doc" | ||
85 | withXML $ \xml -> do | ||
86 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do | ||
87 | -- liftIO . wlog $ "stream: " ++ show (getStreamName xml) | ||
88 | -- liftIO . wlog $ "stream atributes: " ++ show stream_attrs | ||
89 | fix $ \loop -> do | ||
90 | -- liftIO . wlog $ "waiting for stanza." | ||
91 | chan <- liftIO $ atomically newTChan | ||
92 | whenJust nextElement $ \stanza -> do | ||
93 | stanza_lvl <- nesting | ||
94 | liftIO . atomically $ writeTChan chan (Just stanza) | ||
95 | -- liftIO . wlog $ "stanza: "++show stanza | ||
96 | |||
97 | liftIO . atomically $ writeTChan stanzas $ | ||
98 | UnrecognizedStanza chan | ||
99 | doUntilCloser stanza_lvl $ \xml -> do | ||
100 | liftIO . atomically $ writeTChan chan (Just xml) | ||
101 | -- liftIO . wlog $ "-stanza: " ++ show xml | ||
102 | liftIO . atomically $ writeTChan chan Nothing | ||
103 | loop | ||
104 | |||
105 | |||
106 | chanContents :: TChan x -> IO [x] | ||
107 | chanContents ch = do | ||
108 | x <- atomically $ do | ||
109 | bempty <- isEmptyTChan ch | ||
110 | if bempty | ||
111 | then return Nothing | ||
112 | else fmap Just $ readTChan ch | ||
113 | maybe (return []) | ||
114 | (\x -> do | ||
115 | xs <- chanContents ch | ||
116 | return (x:xs)) | ||
117 | x | ||
118 | |||
119 | |||
120 | isEventBeginElement (EventBeginElement {}) = True | ||
121 | isEventBeginElement _ = False | ||
122 | |||
123 | isEventEndElement (EventEndElement {}) = True | ||
124 | isEventEndElement _ = False | ||
125 | |||
126 | forkConnection :: ConnectionKey | ||
127 | -> FlagCommand | ||
128 | -> Source IO XML.Event | ||
129 | -> Sink XML.Event IO () | ||
130 | -> TChan Stanza | ||
131 | -> IO (Slotted.UpdateStream () XML.Event) | ||
132 | forkConnection k pingflag src snk stanzas = do | ||
133 | rdone <- atomically newEmptyTMVar | ||
37 | forkIO $ do | 134 | forkIO $ do |
38 | src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | 135 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) |
39 | wlog $ "end fork: " ++ show k | 136 | src $$ xmppInbound k pingflag src stanzas |
40 | return () | 137 | atomically $ putTMVar rdone () |
138 | wlog $ "end reader fork: " ++ show k | ||
139 | output <- atomically $ Slotted.new isEventBeginElement isEventEndElement | ||
140 | let slot_src = do | ||
141 | what <- lift . atomically $ orElse | ||
142 | (Slotted.pull output >>= \x -> return $ do | ||
143 | yield x | ||
144 | slot_src) | ||
145 | (takeTMVar rdone >> return (return ())) | ||
146 | what | ||
147 | forkIO $ do slot_src $$ snk | ||
148 | wlog $ "end writer fork: " ++ show k | ||
149 | return output | ||
41 | 150 | ||
42 | monitor sv params = do | 151 | monitor sv params = do |
43 | chan <- return $ serverEvent sv | 152 | chan <- return $ serverEvent sv |
153 | stanzas <- atomically newTChan | ||
44 | fix $ \loop -> do | 154 | fix $ \loop -> do |
45 | (k,e) <- atomically $ readTChan chan | 155 | action <- atomically $ foldr1 orElse |
46 | case e of | 156 | [ readTChan chan >>= \(k,e) -> return $ do |
47 | Connection pingflag conread conwrite -> do | 157 | case e of |
48 | let (xsrc,xsnk) = xmlStream conread conwrite | 158 | Connection pingflag conread conwrite -> do |
49 | forkConnection k pingflag xsrc xsnk | 159 | let (xsrc,xsnk) = xmlStream conread conwrite |
50 | wlog $ tomsg k "Connection" | 160 | forkConnection k pingflag xsrc xsnk stanzas |
51 | EOF -> wlog $ tomsg k "EOF" | 161 | wlog $ tomsg k "Connection" |
52 | HalfConnection In -> do | 162 | EOF -> wlog $ tomsg k "EOF" |
53 | wlog $ tomsg k "ReadOnly" | 163 | HalfConnection In -> do |
54 | control sv (Connect (callBackAddress k) params) | 164 | wlog $ tomsg k "ReadOnly" |
55 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | 165 | control sv (Connect (callBackAddress k) params) |
56 | RequiresPing -> wlog $ tomsg k "RequiresPing" | 166 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" |
57 | _ -> return () | 167 | RequiresPing -> wlog $ tomsg k "RequiresPing" |
168 | _ -> return () | ||
169 | , readTChan stanzas >>= \stanza -> return $ do | ||
170 | xs <- chanContents (stanzaChan stanza) | ||
171 | prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs) | ||
172 | ] | ||
173 | action | ||
58 | loop | 174 | loop |
59 | where | 175 | where |
60 | tomsg k str = printf "%12s %s" str (show k) | 176 | tomsg k str = printf "%12s %s" str (show k) |
177 | where | ||
178 | _ = str :: String | ||
61 | 179 | ||
62 | data ConnectionKey | 180 | data ConnectionKey |
63 | = PeerKey { callBackAddress :: SockAddr } | 181 | = PeerKey { callBackAddress :: SockAddr } |
@@ -77,11 +195,13 @@ main = runResourceT $ do | |||
77 | sv <- server | 195 | sv <- server |
78 | lift $ do | 196 | lift $ do |
79 | peer_params <- return (connectionDefaults peerKey) | 197 | peer_params <- return (connectionDefaults peerKey) |
80 | { pingInterval = 2000, duplex = False } | 198 | { pingInterval = 0 |
199 | , timeout = 0 | ||
200 | , duplex = False } | ||
81 | client_params <- return $ connectionDefaults clientKey | 201 | client_params <- return $ connectionDefaults clientKey |
82 | forkIO $ monitor sv peer_params | 202 | forkIO $ monitor sv peer_params |
83 | control sv (Listen peerport peer_params) | 203 | control sv (Listen peerport peer_params) |
84 | control sv (Listen clientport client_params) | 204 | -- control sv (Listen clientport client_params) |
85 | 205 | ||
86 | -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c | 206 | -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c |
87 | quitVar <- newEmptyTMVarIO | 207 | quitVar <- newEmptyTMVarIO |