summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Control/Concurrent/STM/StatusCache.hs142
-rw-r--r--Control/Concurrent/STM/UpdateStream.hs165
-rw-r--r--Presence/NestingXML.hs10
-rw-r--r--xmppServer.hs160
4 files changed, 457 insertions, 20 deletions
diff --git a/Control/Concurrent/STM/StatusCache.hs b/Control/Concurrent/STM/StatusCache.hs
new file mode 100644
index 00000000..601de14c
--- /dev/null
+++ b/Control/Concurrent/STM/StatusCache.hs
@@ -0,0 +1,142 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.StatusCache
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- Motivation: Suppose you must communicate state changes over a stream. If
10-- the stream is consumed slowly, then it may occur that backlogged state
11-- changes are obsoleted by newer changes in state. In this case, it is
12-- desirable to discard the obsolete messages from the stream.
13--
14-- A streamed status message might be very large, and it would be wasteful in
15-- both time and space, to treat it as a monolithic blob that must be built
16-- completely before it can be sent. Therefore, we require that each message
17-- consist of a stream of smaller chunks of type @x@ and we require predicates
18-- that indicate when a chunk starts a new message and when it ends a message.
19--
20-- In the folowing example, our chunk type is Char and complete messages are
21-- delimited by the characters '(' and ')'. We process the input stream
22-- \"(aaaa)(bb)(ccccc)\" first with a delayed processor and then again with an
23-- efficient dedicated thread. The result follows:
24--
25-- > Backlogged consumer: (ccccc)
26-- > Fast consumer: (aaaa)(bb)(ccccc)
27--
28-- The complete source code:
29--
30-- > import Control.Monad (when, forever, (>=>))
31-- > import Control.Monad.STM (atomically)
32-- > import Control.Concurrent (forkIO, threadDelay)
33-- > import System.IO (hFlush, stdout)
34-- > import qualified Control.Concurrent.STM.StatusCache as Cache
35-- >
36-- > while pred body =
37-- > pred >>= flip when (body >> while pred body)
38-- >
39-- > main = do
40-- > q <- atomically $ Cache.new (== '(') (==')')
41-- >
42-- > putStr $ "Backlogged consumer: "
43-- > mapM_ (atomically . Cache.push q) "(aaaa)(bb)(ccccc)"
44-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
45-- > c <- atomically $ Cache.pull q
46-- > putChar c
47-- > putStrLn ""
48-- > hFlush stdout
49-- >
50-- > putStr "Fast consumer: "
51-- > forkIO $ forever $ do
52-- > c <- atomically $ Cache.pull q
53-- > putChar c
54-- > hFlush stdout
55-- > mapM_ (atomically . Cache.push q >=> const (threadDelay 10000))
56-- > "(aaaa)(bb)(ccccc)"
57-- > putStrLn ""
58--
59-- As shown above, it is intended that this module be imported qualified.
60--
61module Control.Concurrent.STM.StatusCache
62 ( StatusCache
63 , new
64 , push
65 , pull
66 , isStopper
67 , isStarter
68 , isEmpty
69 ) where
70import Control.Monad
71import Control.Monad.STM
72import Control.Concurrent.STM.TVar
73import Control.Concurrent.STM.TChan
74
75data StatusCache x =
76 StatusCache { feed :: TVar (TChan x)
77 , cache :: TVar (TChan x)
78 , feedFlag :: TVar Bool
79 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
80 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
81 }
82
83-- | @new isStart isStop@
84--
85-- The @isStart@ and @isStop@ predicates indicate when an element
86-- begins or ends a message.
87new :: (x -> Bool) -> (x -> Bool) -> STM (StatusCache x)
88new isStart isStop = do
89 feed <- newTChan >>= newTVar
90 cache <- newTChan >>= newTVar
91 flag <- newTVar True
92 return StatusCache { feed = feed
93 , cache = cache
94 , feedFlag = flag
95 , isStarter = isStart
96 , isStopper = isStop
97 }
98
99
100-- | Pull off a chunk from the 'StatusCache' for processing.
101--
102-- If chunks are not pulled off quickly, they may be obsoleted
103-- and discarded when new messages are 'push'ed.
104pull :: StatusCache x -> STM x
105pull q = do
106 hasCache <- readTVar (feedFlag q)
107 exhausted <- readTVar (feed q) >>= isEmptyTChan
108 when (hasCache && exhausted) $ do
109 next <- newTChan >>= swapTVar (cache q)
110 writeTVar (feedFlag q) False
111 writeTVar (feed q) next
112 chan <- readTVar $ feed q
113 exhausted <- isEmptyTChan chan
114 if exhausted then retry
115 else do
116 v <- readTChan chan
117 when (isStarter q v)
118 $ writeTVar (feedFlag q) False
119 return v
120
121-- | Enqueue a chunk into the 'StatusCache'.
122push :: StatusCache a -> a -> STM ()
123push q v = do
124 shouldCache <- readTVar (feedFlag q)
125 chan <-
126 if shouldCache then do
127 when (isStarter q v)
128 $ newTChan
129 >>= writeTVar (cache q)
130 readTVar $ cache q
131 else do
132 when (isStopper q v)
133 $ writeTVar (feedFlag q) True
134 readTVar $ feed q
135 writeTChan chan v
136
137-- | True when the 'StatusCache' is completely exhuasted.
138isEmpty :: StatusCache x -> STM Bool
139isEmpty q = do
140 empty_feed <- readTVar (feed q) >>= isEmptyTChan
141 empty_cache <- readTVar (cache q) >>= isEmptyTChan
142 return $ empty_feed && empty_cache
diff --git a/Control/Concurrent/STM/UpdateStream.hs b/Control/Concurrent/STM/UpdateStream.hs
new file mode 100644
index 00000000..a92168c0
--- /dev/null
+++ b/Control/Concurrent/STM/UpdateStream.hs
@@ -0,0 +1,165 @@
1---------------------------------------------------------------------------
2-- |
3-- Module : Control.Concurrent.STM.UpdateStream
4--
5--
6-- Maintainer : joe@jerkface.net
7-- Stability : experimental
8--
9-- An UpdateSream consists of pass-through messages that are queued up and
10-- slotted messages which might be obsoleted and discarded if they are not
11-- consumed before newer slotted messages with the same slot value occur.
12--
13-- Slots are implemented with "Control.Concurrent.STM.StatusCache" and it is
14-- recommended that you read that documentation first.
15--
16-- For example, the output
17--
18-- > Backlogged consumer: (set x = 5)(set y = 100)(Hello)(Joe)(was)(here)
19-- > Fast consumer: (set x = 2)(Hello)(set y = 23)(Joe)(set y = 100)(was)(set x = 5)(here)
20--
21-- is produced by the following code:
22--
23-- > import Control.Monad (when, forever, (>=>))
24-- > import Control.Monad.STM (atomically)
25-- > import Control.Concurrent (forkIO, threadDelay)
26-- > import System.IO (hFlush, stdout)
27-- > import qualified Control.Concurrent.STM.UpdateStream as Cache
28-- >
29-- > messages :: [(Maybe String, Char)]
30-- > messages = concat
31-- > [ slot "x" "(set x = 2)"
32-- > , message "(Hello)"
33-- > , slot "y" "(set y = 23)"
34-- > , message "(Joe)"
35-- > , slot "y" "(set y = 100)"
36-- > , message "(was)"
37-- > , slot "x" "(set x = 5)"
38-- > , message "(here)"
39-- > ]
40-- > where
41-- > slot v cs = map ((,) (Just v)) cs
42-- > message cs = map ((,) Nothing) cs
43-- >
44-- > main = do
45-- > q <- atomically $ Cache.new (== '(') (==')')
46-- > let go = mapM_ (atomically . (uncurry $ Cache.push q)
47-- > >=> const (threadDelay 10000))
48-- > messages
49-- > slowly = do
50-- > while (atomically $ fmap not $ Cache.isEmpty q) $ do
51-- > c <- atomically $ Cache.pull q
52-- > putChar c
53-- > putStrLn ""
54-- > hFlush stdout
55-- > where while pred body =
56-- > pred >>= flip when (body >> while pred body)
57-- > quickly = forkIO . forever $ do
58-- > c <- atomically $ Cache.pull q
59-- > putChar c
60-- > hFlush stdout
61-- > putStr $ "Backlogged consumer: "
62-- > go >> slowly
63-- > putStr "Fast consumer: "
64-- > quickly >> go
65-- > putStrLn ""
66--
67module Control.Concurrent.STM.UpdateStream
68 ( UpdateStream
69 , new
70 , push
71 , pull
72 , isStopper
73 , isStarter
74 , isEmpty
75 ) where
76
77import Control.Monad
78import Control.Monad.STM
79import Control.Concurrent.STM.TVar
80import Control.Concurrent.STM.TChan
81import Control.Concurrent.STM.StatusCache (StatusCache)
82import qualified Control.Concurrent.STM.StatusCache as Status
83import Data.Map (Map)
84import qualified Data.Map as Map
85import Data.Maybe
86import Data.Foldable (foldlM)
87
88data UpdateStream slot x =
89 UpdateStream { cache :: TVar (Map slot (StatusCache x))
90 , events :: TChan x
91 , inMessage :: TVar (Maybe slot)
92 , isStarter :: x -> Bool -- ^ True if the given chunk begins a message.
93 , isStopper :: x -> Bool -- ^ True if the given chunk ends a message.
94 }
95
96-- | @new isStart isStop@
97--
98-- The @isStart@ and @isStop@ predicates indicate when an element
99-- begins or ends a message.
100new :: Ord slot => (x->Bool) -> (x->Bool) -> STM (UpdateStream slot x)
101new isStart isStop = do
102 cache <- newTVar Map.empty
103 events <- newTChan
104 inMessage <- newTVar Nothing
105 return UpdateStream { cache = cache
106 , events = events
107 , inMessage = inMessage
108 , isStarter = isStart
109 , isStopper = isStop
110 }
111
112-- | Enqueue a chunk into the 'UpdateStream'
113--
114-- If a slot is provided, then the message may be obsoleted when a new message
115-- starts with the same slot value. Otherwise, the chunk is preserved. Note
116-- that the same slot value must be passed again for every chunk of the message
117-- as it is not remembered.
118push :: Ord slot => UpdateStream slot x -> Maybe slot -> x -> STM ()
119push u Nothing x = writeTChan (events u) x
120push u (Just n) x = do
121 smap <- readTVar (cache u)
122 scache <-
123 case Map.lookup n smap of
124 Just scache -> return scache
125 Nothing -> do
126 scache <- Status.new (isStarter u) (isStopper u)
127 modifyTVar (cache u) (Map.insert n scache)
128 return scache
129
130 Status.push scache x
131
132-- | Pull off a chunk from the 'UpdateStream' for processing.
133--
134-- If chunks are not pulled off quickly, they may be obsoleted
135-- and discarded when new messages are 'push'ed.
136pull :: Ord slot => UpdateStream slot x -> STM x
137pull u = do
138 (inm, mbscache) <- do
139 mn <- readTVar $ inMessage u
140 map <- readTVar (cache u)
141 return $ (isJust mn, mn >>= flip Map.lookup map)
142 let action =
143 case (inm,mbscache) of
144 (True,Just scache) -> Status.pull scache
145 (True,Nothing) -> readTChan (events u)
146 (False,_) -> do
147 cs <- fmap (Map.toList) $ readTVar (cache u)
148 cs <- filterM (return . not <=< Status.isEmpty . snd)
149 cs
150 maybe (readTChan $ events u)
151 (\(n,scache) -> do
152 writeTVar (inMessage u) (Just n)
153 Status.pull scache)
154 (listToMaybe cs)
155 x <- action
156 when (isStopper u x) $ writeTVar (inMessage u) Nothing
157 return x
158
159-- | True when the 'UpdateStream' is completely exhuasted.
160isEmpty :: UpdateStream slot x -> STM Bool
161isEmpty q = do
162 e <- isEmptyTChan (events q)
163 qs <- readTVar (cache q)
164 d <- foldlM (\b s -> fmap (b &&) $ Status.isEmpty s) True qs
165 return $ e && d
diff --git a/Presence/NestingXML.hs b/Presence/NestingXML.hs
index bf12c9ae..c26e3d5c 100644
--- a/Presence/NestingXML.hs
+++ b/Presence/NestingXML.hs
@@ -79,6 +79,16 @@ awaitCloser lvl = do
79 withXML $ \xml -> do 79 withXML $ \xml -> do
80 loop 80 loop
81 81
82doUntilCloser :: Monad m
83 => Int -> (Event -> NestingXML o m ()) -> NestingXML o m ()
84doUntilCloser lvl thunk = do
85 fix $ \loop -> do
86 lvl' <- nesting
87 when (lvl' >= lvl) $ do
88 withXML $ \xml -> do
89 thunk xml
90 loop
91
82nextElement :: Monad m => NestingXML o m (Maybe Event) 92nextElement :: Monad m => NestingXML o m (Maybe Event)
83nextElement = do 93nextElement = do
84 lvl <- nesting 94 lvl <- nesting
diff --git a/xmppServer.hs b/xmppServer.hs
index c720ea5f..459d8f8d 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -1,6 +1,9 @@
1{-# LANGUAGE OverloadedStrings #-}
1import Control.Monad.Trans.Resource (runResourceT) 2import Control.Monad.Trans.Resource (runResourceT)
2import Control.Monad.Trans (lift) 3import Control.Monad.Trans (lift)
4import Control.Monad.IO.Class (liftIO)
3import Control.Monad.Fix (fix) 5import Control.Monad.Fix (fix)
6import Control.Monad
4import Control.Concurrent (forkIO) 7import Control.Concurrent (forkIO)
5import Control.Concurrent.STM 8import Control.Concurrent.STM
6-- import Control.Concurrent.STM.TChan 9-- import Control.Concurrent.STM.TChan
@@ -8,18 +11,42 @@ import Network.Socket
8import XMPPTypes (withPort) 11import XMPPTypes (withPort)
9import Text.Printf 12import Text.Printf
10import System.Posix.Signals 13import System.Posix.Signals
14import Data.ByteString (ByteString)
15import qualified Data.ByteString.Char8 as Strict8
16-- import qualified Data.ByteString.Lazy.Char8 as Lazy8
11 17
12import Data.Conduit 18import Data.Conduit
19import qualified Data.Conduit.List as CL
20import qualified Data.Conduit.Binary as CB
13 21
14import qualified Text.XML.Stream.Render as XML 22import qualified Text.XML.Stream.Render as XML
15import qualified Text.XML.Stream.Parse as XML 23import qualified Text.XML.Stream.Parse as XML
24import Data.XML.Types as XML
25import Data.Maybe (catMaybes)
26import Data.Monoid ( (<>) )
16 27
28import qualified Control.Concurrent.STM.UpdateStream as Slotted
29import ControlMaybe
30import NestingXML
17import Server 31import Server
18 32
33
19wlog s = putStrLn s 34wlog s = putStrLn s
35 where _ = s :: String
36wlogb s = Strict8.putStrLn s
20 37
21control sv = atomically . putTMVar (serverCommand sv) 38control sv = atomically . putTMVar (serverCommand sv)
22 39
40-- Note: This function ignores name space qualification
41elementAttrs expected (EventBeginElement name attrs)
42 | nameLocalName name==expected
43 = return attrs
44elementAttrs _ _ = mzero
45
46getStreamName (EventBeginElement name _) = name
47
48xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
49 , Sink XML.Event IO () )
23xmlStream conread conwrite = (xsrc,xsnk) 50xmlStream conread conwrite = (xsrc,xsnk)
24 where 51 where
25 xsrc = src $= XML.parseBytes XML.def 52 xsrc = src $= XML.parseBytes XML.def
@@ -30,34 +57,125 @@ xmlStream conread conwrite = (xsrc,xsnk)
30 maybe (return ()) -- lift . wlog $ "conread: Nothing") 57 maybe (return ()) -- lift . wlog $ "conread: Nothing")
31 (\v -> yield v >> src) 58 (\v -> yield v >> src)
32 v 59 v
33 snk = awaitForever $ lift . conwrite 60 snk = awaitForever $ liftIO . conwrite
34 61
35 62
36forkConnection k pingflag src snk = do 63type FlagCommand = IO Bool
64type ReadCommand = IO (Maybe ByteString)
65type WriteCommand = ByteString -> IO Bool
66
67data Stanza
68 = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) }
69
70prettyPrint prefix xs =
71 liftIO $
72 CL.sourceList xs
73 $= XML.renderBytes (XML.def { XML.rsPretty=True })
74 =$= CB.lines
75 $$ CL.mapM_ (wlogb . (prefix <>))
76
77xmppInbound :: ConnectionKey -> FlagCommand
78 -> Source IO XML.Event
79 -> TChan Stanza
80 -> Sink XML.Event IO ()
81xmppInbound k pingflag src stanzas = doNestingXML $ do
82 withXML $ \begindoc -> do
83 when (begindoc==EventBeginDocument) $ do
84 -- liftIO . wlog $ "begin-doc"
85 withXML $ \xml -> do
86 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
87 -- liftIO . wlog $ "stream: " ++ show (getStreamName xml)
88 -- liftIO . wlog $ "stream atributes: " ++ show stream_attrs
89 fix $ \loop -> do
90 -- liftIO . wlog $ "waiting for stanza."
91 chan <- liftIO $ atomically newTChan
92 whenJust nextElement $ \stanza -> do
93 stanza_lvl <- nesting
94 liftIO . atomically $ writeTChan chan (Just stanza)
95 -- liftIO . wlog $ "stanza: "++show stanza
96
97 liftIO . atomically $ writeTChan stanzas $
98 UnrecognizedStanza chan
99 doUntilCloser stanza_lvl $ \xml -> do
100 liftIO . atomically $ writeTChan chan (Just xml)
101 -- liftIO . wlog $ "-stanza: " ++ show xml
102 liftIO . atomically $ writeTChan chan Nothing
103 loop
104
105
106chanContents :: TChan x -> IO [x]
107chanContents ch = do
108 x <- atomically $ do
109 bempty <- isEmptyTChan ch
110 if bempty
111 then return Nothing
112 else fmap Just $ readTChan ch
113 maybe (return [])
114 (\x -> do
115 xs <- chanContents ch
116 return (x:xs))
117 x
118
119
120isEventBeginElement (EventBeginElement {}) = True
121isEventBeginElement _ = False
122
123isEventEndElement (EventEndElement {}) = True
124isEventEndElement _ = False
125
126forkConnection :: ConnectionKey
127 -> FlagCommand
128 -> Source IO XML.Event
129 -> Sink XML.Event IO ()
130 -> TChan Stanza
131 -> IO (Slotted.UpdateStream () XML.Event)
132forkConnection k pingflag src snk stanzas = do
133 rdone <- atomically newEmptyTMVar
37 forkIO $ do 134 forkIO $ do
38 src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) 135 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
39 wlog $ "end fork: " ++ show k 136 src $$ xmppInbound k pingflag src stanzas
40 return () 137 atomically $ putTMVar rdone ()
138 wlog $ "end reader fork: " ++ show k
139 output <- atomically $ Slotted.new isEventBeginElement isEventEndElement
140 let slot_src = do
141 what <- lift . atomically $ orElse
142 (Slotted.pull output >>= \x -> return $ do
143 yield x
144 slot_src)
145 (takeTMVar rdone >> return (return ()))
146 what
147 forkIO $ do slot_src $$ snk
148 wlog $ "end writer fork: " ++ show k
149 return output
41 150
42monitor sv params = do 151monitor sv params = do
43 chan <- return $ serverEvent sv 152 chan <- return $ serverEvent sv
153 stanzas <- atomically newTChan
44 fix $ \loop -> do 154 fix $ \loop -> do
45 (k,e) <- atomically $ readTChan chan 155 action <- atomically $ foldr1 orElse
46 case e of 156 [ readTChan chan >>= \(k,e) -> return $ do
47 Connection pingflag conread conwrite -> do 157 case e of
48 let (xsrc,xsnk) = xmlStream conread conwrite 158 Connection pingflag conread conwrite -> do
49 forkConnection k pingflag xsrc xsnk 159 let (xsrc,xsnk) = xmlStream conread conwrite
50 wlog $ tomsg k "Connection" 160 forkConnection k pingflag xsrc xsnk stanzas
51 EOF -> wlog $ tomsg k "EOF" 161 wlog $ tomsg k "Connection"
52 HalfConnection In -> do 162 EOF -> wlog $ tomsg k "EOF"
53 wlog $ tomsg k "ReadOnly" 163 HalfConnection In -> do
54 control sv (Connect (callBackAddress k) params) 164 wlog $ tomsg k "ReadOnly"
55 HalfConnection Out -> wlog $ tomsg k "WriteOnly" 165 control sv (Connect (callBackAddress k) params)
56 RequiresPing -> wlog $ tomsg k "RequiresPing" 166 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
57 _ -> return () 167 RequiresPing -> wlog $ tomsg k "RequiresPing"
168 _ -> return ()
169 , readTChan stanzas >>= \stanza -> return $ do
170 xs <- chanContents (stanzaChan stanza)
171 prettyPrint "STANZA: " (catMaybes xs) ---- wlog $ "STANZA: "++ show (catMaybes xs)
172 ]
173 action
58 loop 174 loop
59 where 175 where
60 tomsg k str = printf "%12s %s" str (show k) 176 tomsg k str = printf "%12s %s" str (show k)
177 where
178 _ = str :: String
61 179
62data ConnectionKey 180data ConnectionKey
63 = PeerKey { callBackAddress :: SockAddr } 181 = PeerKey { callBackAddress :: SockAddr }
@@ -77,11 +195,13 @@ main = runResourceT $ do
77 sv <- server 195 sv <- server
78 lift $ do 196 lift $ do
79 peer_params <- return (connectionDefaults peerKey) 197 peer_params <- return (connectionDefaults peerKey)
80 { pingInterval = 2000, duplex = False } 198 { pingInterval = 0
199 , timeout = 0
200 , duplex = False }
81 client_params <- return $ connectionDefaults clientKey 201 client_params <- return $ connectionDefaults clientKey
82 forkIO $ monitor sv peer_params 202 forkIO $ monitor sv peer_params
83 control sv (Listen peerport peer_params) 203 control sv (Listen peerport peer_params)
84 control sv (Listen clientport client_params) 204 -- control sv (Listen clientport client_params)
85 205
86 -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c 206 -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c
87 quitVar <- newEmptyTMVarIO 207 quitVar <- newEmptyTMVarIO