diff options
-rw-r--r-- | Presence/Server.hs | 2 | ||||
-rw-r--r-- | Presence/XMPPServer.hs | 368 | ||||
-rw-r--r-- | xmppServer.hs | 369 |
3 files changed, 383 insertions, 356 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 312634cf..14eab06c 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -193,6 +193,8 @@ data Server a | |||
193 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | 193 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) |
194 | } | 194 | } |
195 | 195 | ||
196 | control sv = atomically . putTMVar (serverCommand sv) | ||
197 | |||
196 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | 198 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' |
197 | -- to ensure proper cleanup. For example, | 199 | -- to ensure proper cleanup. For example, |
198 | -- | 200 | -- |
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs new file mode 100644 index 00000000..95110301 --- /dev/null +++ b/Presence/XMPPServer.hs | |||
@@ -0,0 +1,368 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | module XMPPServer | ||
3 | ( xmppServer | ||
4 | ) where | ||
5 | import Control.Monad.Trans.Resource (runResourceT) | ||
6 | import Control.Monad.Trans (lift) | ||
7 | import Control.Monad.IO.Class (MonadIO, liftIO) | ||
8 | import Control.Monad.Fix (fix) | ||
9 | import Control.Monad | ||
10 | import Control.Concurrent (forkIO) | ||
11 | import Control.Concurrent.STM | ||
12 | -- import Control.Concurrent.STM.TChan | ||
13 | import Network.Socket | ||
14 | import XMPPTypes (withPort) | ||
15 | import Text.Printf | ||
16 | import System.Posix.Signals | ||
17 | import Data.ByteString (ByteString) | ||
18 | import qualified Data.ByteString.Char8 as Strict8 | ||
19 | -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 | ||
20 | |||
21 | import Data.Conduit | ||
22 | import qualified Data.Conduit.List as CL | ||
23 | import qualified Data.Conduit.Binary as CB | ||
24 | import Data.Conduit.Blaze (builderToByteStringFlush) | ||
25 | |||
26 | import qualified Text.XML.Stream.Render as XML | ||
27 | import qualified Text.XML.Stream.Parse as XML | ||
28 | import Data.XML.Types as XML | ||
29 | import Data.Maybe (catMaybes,fromJust) | ||
30 | import Data.Monoid ( (<>) ) | ||
31 | import Data.Text (Text) | ||
32 | import qualified Data.Text as Text (pack) | ||
33 | |||
34 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | ||
35 | import ControlMaybe | ||
36 | import Nesting | ||
37 | import EventUtil | ||
38 | import Server | ||
39 | |||
40 | peerport = 5269 | ||
41 | clientport = 5222 | ||
42 | |||
43 | |||
44 | -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error | ||
45 | |||
46 | addrToText :: SockAddr -> Text | ||
47 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) | ||
48 | where stripColon s = pre where (pre,port) = break (==':') s | ||
49 | addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) | ||
50 | where stripColon s = if null bracket then pre else pre ++ "]" | ||
51 | where | ||
52 | (pre,bracket) = break (==']') s | ||
53 | |||
54 | wlog s = putStrLn s | ||
55 | where _ = s :: String | ||
56 | wlogb s = Strict8.putStrLn s | ||
57 | |||
58 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event | ||
59 | , Sink (Flush XML.Event) IO () ) | ||
60 | xmlStream conread conwrite = (xsrc,xsnk) | ||
61 | where | ||
62 | xsrc = src $= XML.parseBytes XML.def | ||
63 | xsnk = -- XML.renderBytes XML.def =$ snk | ||
64 | XML.renderBuilderFlush XML.def | ||
65 | =$= builderToByteStringFlush | ||
66 | =$= discardFlush | ||
67 | =$ snk | ||
68 | where | ||
69 | discardFlush :: Monad m => ConduitM (Flush a) a m () | ||
70 | discardFlush = awaitForever $ \x -> do | ||
71 | let unchunk (Chunk a) = a | ||
72 | ischunk (Chunk _) = True | ||
73 | ischunk _ = False | ||
74 | when (ischunk x) $ yield (unchunk x) | ||
75 | |||
76 | src = do | ||
77 | v <- lift conread | ||
78 | maybe (return ()) -- lift . wlog $ "conread: Nothing") | ||
79 | (\v -> yield v >> src) | ||
80 | v | ||
81 | snk = awaitForever $ liftIO . conwrite | ||
82 | |||
83 | |||
84 | type FlagCommand = STM Bool | ||
85 | type ReadCommand = IO (Maybe ByteString) | ||
86 | type WriteCommand = ByteString -> IO Bool | ||
87 | |||
88 | data Stanza | ||
89 | = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | ||
90 | | PingStanza { stanzaId :: Maybe Text | ||
91 | , stanzaChan :: TChan (Maybe XML.Event) } | ||
92 | | PongStanza { -- stanzaId :: Maybe Text | ||
93 | stanzaChan :: TChan (Maybe XML.Event) } | ||
94 | |||
95 | copyToChannel f chan = awaitForever copy | ||
96 | where | ||
97 | copy x = do | ||
98 | liftIO . atomically $ writeTChan chan (f x) | ||
99 | yield x | ||
100 | |||
101 | |||
102 | prettyPrint prefix xs = | ||
103 | liftIO $ | ||
104 | CL.sourceList xs | ||
105 | $= XML.renderBytes (XML.def { XML.rsPretty=True }) | ||
106 | =$= CB.lines | ||
107 | $$ CL.mapM_ (wlogb . (prefix <>)) | ||
108 | |||
109 | grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza)) | ||
110 | grockStanzaIQGet stanza = do | ||
111 | let mid = lookupAttrib "id" (tagAttrs stanza) | ||
112 | -- mfrom = lookupAttrib "from" (tagAttrs stanza) | ||
113 | mtag <- nextElement | ||
114 | flip (maybe $ return Nothing) mtag $ \tag -> do | ||
115 | case tagName tag of | ||
116 | "{urn:xmpp:ping}ping" -> do | ||
117 | return $ Just (PingStanza mid) | ||
118 | _ -> return Nothing | ||
119 | |||
120 | ioWriteChan c v = liftIO . atomically $ writeTChan c v | ||
121 | |||
122 | xmppInbound :: ConnectionKey -> FlagCommand | ||
123 | -> Source IO XML.Event | ||
124 | -> TChan Stanza | ||
125 | -> TChan Stanza | ||
126 | -> Sink XML.Event IO () | ||
127 | xmppInbound k pingflag src stanzas output = doNestingXML $ do | ||
128 | withXML $ \begindoc -> do | ||
129 | when (begindoc==EventBeginDocument) $ do | ||
130 | whenJust nextElement $ \xml -> do | ||
131 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do | ||
132 | fix $ \loop -> do | ||
133 | -- liftIO . wlog $ "waiting for stanza." | ||
134 | chan <- liftIO $ atomically newTChan | ||
135 | whenJust nextElement $ \stanza -> do | ||
136 | stanza_lvl <- nesting | ||
137 | ioWriteChan chan (Just stanza) | ||
138 | copyToChannel Just chan =$= do | ||
139 | dispatch <- | ||
140 | case () of | ||
141 | _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza | ||
142 | _ -> return $ Just UnrecognizedStanza | ||
143 | flip (maybe $ return ()) dispatch $ \dispatch -> | ||
144 | case dispatch chan of | ||
145 | d@(PingStanza {}) -> do | ||
146 | let to = "todo" | ||
147 | from = "todo" | ||
148 | let pong = peerPong (stanzaId d) to from | ||
149 | pongChan <- liftIO $ atomically newTChan | ||
150 | ioWriteChan output (PongStanza pongChan) | ||
151 | mapM_ (ioWriteChan pongChan . Just) pong | ||
152 | ioWriteChan pongChan Nothing | ||
153 | disp -> ioWriteChan stanzas disp | ||
154 | awaitCloser stanza_lvl | ||
155 | ioWriteChan chan Nothing | ||
156 | loop | ||
157 | |||
158 | |||
159 | chanContents :: TChan x -> IO [x] | ||
160 | chanContents ch = do | ||
161 | x <- atomically $ do | ||
162 | bempty <- isEmptyTChan ch | ||
163 | if bempty | ||
164 | then return Nothing | ||
165 | else fmap Just $ readTChan ch | ||
166 | maybe (return []) | ||
167 | (\x -> do | ||
168 | xs <- chanContents ch | ||
169 | return (x:xs)) | ||
170 | x | ||
171 | |||
172 | readUntilNothing :: TChan (Maybe x) -> IO [x] | ||
173 | readUntilNothing ch = do | ||
174 | x <- atomically $ readTChan ch | ||
175 | maybe (return []) | ||
176 | (\x -> do | ||
177 | xs <- readUntilNothing ch | ||
178 | return (x:xs)) | ||
179 | x | ||
180 | |||
181 | |||
182 | greetPeer = | ||
183 | [ EventBeginDocument | ||
184 | , EventBeginElement (streamP "stream") | ||
185 | [ attr "xmlns" "jabber:server" | ||
186 | , attr "version" "1.0" | ||
187 | ] | ||
188 | ] | ||
189 | |||
190 | goodbyePeer = | ||
191 | [ EventEndElement (streamP "stream") | ||
192 | , EventEndDocument | ||
193 | ] | ||
194 | |||
195 | data XMPPState | ||
196 | = PingSlot | ||
197 | deriving (Eq,Ord) | ||
198 | |||
199 | |||
200 | peerPing :: Maybe Text -> Text -> Text -> [XML.Event] | ||
201 | peerPing mid to from = | ||
202 | [ EventBeginElement "{jabber:server}iq" | ||
203 | $ (case mid of | ||
204 | Just c -> (("id",[ContentText c]):) | ||
205 | _ -> id ) | ||
206 | [ ("type",[ContentText "get"]) | ||
207 | , attr "to" to | ||
208 | , attr "from" from | ||
209 | ] | ||
210 | , EventBeginElement "{urn:xmpp:ping}ping" [] | ||
211 | , EventEndElement "{urn:xmpp:ping}ping" | ||
212 | , EventEndElement "{jabber:server}iq" ] | ||
213 | |||
214 | peerPong mid to from = | ||
215 | [ EventBeginElement "{jabber:server}iq" | ||
216 | $(case mid of | ||
217 | Just c -> (("id",[ContentText c]):) | ||
218 | _ -> id) | ||
219 | [ attr "type" "result" | ||
220 | , attr "to" to | ||
221 | , attr "from" from | ||
222 | ] | ||
223 | , EventEndElement "{jabber:server}iq" | ||
224 | ] | ||
225 | |||
226 | |||
227 | forkConnection :: ConnectionKey | ||
228 | -> FlagCommand | ||
229 | -> Source IO XML.Event | ||
230 | -> Sink (Flush XML.Event) IO () | ||
231 | -> TChan Stanza | ||
232 | -> IO (TChan Stanza) | ||
233 | forkConnection k pingflag src snk stanzas = do | ||
234 | rdone <- atomically newEmptyTMVar | ||
235 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement | ||
236 | needsFlush <- atomically $ newTVar False | ||
237 | let _ = slots :: Slotted.UpdateStream XMPPState XML.Event | ||
238 | let greet_src = do | ||
239 | CL.sourceList greetPeer =$= CL.map Chunk | ||
240 | yield Flush | ||
241 | slot_src = do | ||
242 | what <- lift . atomically $ foldr1 orElse | ||
243 | [Slotted.pull slots >>= \x -> do | ||
244 | writeTVar needsFlush True | ||
245 | return $ do | ||
246 | yield (Chunk x) | ||
247 | slot_src | ||
248 | ,do Slotted.isEmpty slots >>= check | ||
249 | readTVar needsFlush >>= check | ||
250 | writeTVar needsFlush False | ||
251 | return $ do | ||
252 | yield Flush | ||
253 | slot_src | ||
254 | ,readTMVar rdone >> return (return ()) | ||
255 | ] | ||
256 | what | ||
257 | forkIO $ do (greet_src >> slot_src) $$ snk | ||
258 | wlog $ "end post-queue fork: " ++ show k | ||
259 | output <- atomically newTChan | ||
260 | forkIO $ do | ||
261 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | ||
262 | fix $ \loop -> do | ||
263 | what <- atomically $ foldr1 orElse | ||
264 | [readTChan output >>= \stanza -> return $ do | ||
265 | let xchan = stanzaChan stanza | ||
266 | fix $ \inner -> do | ||
267 | what <- atomically $ orElse | ||
268 | (readTChan xchan >>= \mxml -> return $ do | ||
269 | case mxml of | ||
270 | Just xml -> do | ||
271 | atomically $ Slotted.push slots Nothing xml | ||
272 | inner | ||
273 | Nothing -> loop) | ||
274 | (readTMVar rdone >> return (return ())) | ||
275 | what | ||
276 | ,do pingflag >>= check | ||
277 | return $ do | ||
278 | let to = addrToText (callBackAddress k) | ||
279 | from = "todo" -- Look it up from Server object | ||
280 | -- or pass it with Connection event. | ||
281 | mid = Just "ping" | ||
282 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) | ||
283 | (peerPing mid to from) | ||
284 | loop | ||
285 | ,readTMVar rdone >> return (return ()) | ||
286 | ] | ||
287 | what | ||
288 | wlog $ "end pre-queue fork: " ++ show k | ||
289 | forkIO $ do | ||
290 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | ||
291 | src $$ xmppInbound k pingflag src stanzas output | ||
292 | atomically $ putTMVar rdone () | ||
293 | wlog $ "end reader fork: " ++ show k | ||
294 | return output | ||
295 | |||
296 | data ConnectionKey | ||
297 | = PeerKey { callBackAddress :: SockAddr } | ||
298 | | ClientKey { localAddress :: SockAddr } | ||
299 | deriving (Show, Ord, Eq) | ||
300 | |||
301 | {- | ||
302 | data Peer = Peer | ||
303 | { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis | ||
304 | , peerState :: TVar PeerState | ||
305 | } | ||
306 | data PeerState | ||
307 | = PeerPendingConnect UTCTime | ||
308 | | PeerPendingAccept UTCTime | ||
309 | | PeerConnected (TChan Stanza) | ||
310 | -} | ||
311 | |||
312 | peerKey (sock,addr) = do | ||
313 | peer <- | ||
314 | sIsConnected sock >>= \c -> | ||
315 | if c then getPeerName sock -- addr is normally socketName | ||
316 | else return addr -- Weird hack: addr is would-be peer name | ||
317 | return $ PeerKey (peer `withPort` fromIntegral peerport) | ||
318 | |||
319 | clientKey (sock,addr) = return $ ClientKey addr | ||
320 | |||
321 | monitor sv params = do | ||
322 | chan <- return $ serverEvent sv | ||
323 | stanzas <- atomically newTChan | ||
324 | fix $ \loop -> do | ||
325 | action <- atomically $ foldr1 orElse | ||
326 | [ readTChan chan >>= \(k,e) -> return $ do | ||
327 | case e of | ||
328 | Connection pingflag conread conwrite -> do | ||
329 | wlog $ tomsg k "Connection" | ||
330 | let (xsrc,xsnk) = xmlStream conread conwrite | ||
331 | forkConnection k pingflag xsrc xsnk stanzas | ||
332 | return () | ||
333 | ConnectFailure addr -> do | ||
334 | wlog $ tomsg k "ConnectFailure" | ||
335 | EOF -> wlog $ tomsg k "EOF" | ||
336 | HalfConnection In -> do | ||
337 | wlog $ tomsg k "ReadOnly" | ||
338 | control sv (Connect (callBackAddress k) params) | ||
339 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | ||
340 | RequiresPing -> wlog $ tomsg k "RequiresPing" | ||
341 | _ -> return () | ||
342 | , readTChan stanzas >>= \stanza -> return $ do | ||
343 | xs <- readUntilNothing (stanzaChan stanza) | ||
344 | wlog "" | ||
345 | prettyPrint "STANZA: " xs | ||
346 | ] | ||
347 | action | ||
348 | loop | ||
349 | where | ||
350 | tomsg k str = printf "%12s %s" str (show k) | ||
351 | where | ||
352 | _ = str :: String | ||
353 | |||
354 | |||
355 | xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,ConnectionParameters ConnectionKey) | ||
356 | xmppServer = do | ||
357 | sv <- server | ||
358 | let peer_params = (connectionDefaults peerKey) | ||
359 | { pingInterval = 10000 | ||
360 | , timeout = 10000 | ||
361 | , duplex = False } | ||
362 | client_params = connectionDefaults clientKey | ||
363 | liftIO $ do | ||
364 | forkIO $ monitor sv peer_params | ||
365 | control sv (Listen peerport peer_params) | ||
366 | -- todo | ||
367 | -- control sv (Listen clientport client_params) | ||
368 | return (sv,peer_params) | ||
diff --git a/xmppServer.hs b/xmppServer.hs index 7a61b00d..ce681cb5 100644 --- a/xmppServer.hs +++ b/xmppServer.hs | |||
@@ -1,361 +1,22 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | 1 | import System.Posix.Signals |
2 | import Control.Monad.Trans.Resource (runResourceT) | ||
3 | import Control.Monad.Trans (lift) | ||
4 | import Control.Monad.IO.Class (liftIO) | ||
5 | import Control.Monad.Fix (fix) | ||
6 | import Control.Monad | ||
7 | import Control.Concurrent (forkIO) | ||
8 | import Control.Concurrent.STM | 2 | import Control.Concurrent.STM |
9 | -- import Control.Concurrent.STM.TChan | 3 | import Control.Monad.Trans.Resource (runResourceT) |
4 | import Control.Monad.IO.Class (MonadIO, liftIO) | ||
10 | import Network.Socket | 5 | import Network.Socket |
11 | import XMPPTypes (withPort) | 6 | ( addrAddress |
12 | import Text.Printf | 7 | , getAddrInfo |
13 | import System.Posix.Signals | 8 | , defaultHints |
14 | import Data.ByteString (ByteString) | 9 | , addrFlags |
15 | import qualified Data.ByteString.Char8 as Strict8 | 10 | , AddrInfoFlag(AI_CANONNAME) |
16 | -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 | 11 | ) |
17 | |||
18 | import Data.Conduit | ||
19 | import qualified Data.Conduit.List as CL | ||
20 | import qualified Data.Conduit.Binary as CB | ||
21 | import Data.Conduit.Blaze (builderToByteStringFlush) | ||
22 | 12 | ||
23 | import qualified Text.XML.Stream.Render as XML | ||
24 | import qualified Text.XML.Stream.Parse as XML | ||
25 | import Data.XML.Types as XML | ||
26 | import Data.Maybe (catMaybes,fromJust) | ||
27 | import Data.Monoid ( (<>) ) | ||
28 | import Data.Text (Text) | ||
29 | import qualified Data.Text as Text (pack) | ||
30 | 13 | ||
31 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | 14 | import XMPPServer |
32 | import ControlMaybe | ||
33 | import Nesting | ||
34 | import EventUtil | ||
35 | import Server | 15 | import Server |
36 | 16 | ||
37 | addrToText :: SockAddr -> Text | ||
38 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) | ||
39 | where stripColon s = pre where (pre,port) = break (==':') s | ||
40 | addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) | ||
41 | where stripColon s = if null bracket then pre else pre ++ "]" | ||
42 | where | ||
43 | (pre,bracket) = break (==']') s | ||
44 | |||
45 | wlog s = putStrLn s | ||
46 | where _ = s :: String | ||
47 | wlogb s = Strict8.putStrLn s | ||
48 | |||
49 | control sv = atomically . putTMVar (serverCommand sv) | ||
50 | |||
51 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event | ||
52 | , Sink (Flush XML.Event) IO () ) | ||
53 | xmlStream conread conwrite = (xsrc,xsnk) | ||
54 | where | ||
55 | xsrc = src $= XML.parseBytes XML.def | ||
56 | xsnk = -- XML.renderBytes XML.def =$ snk | ||
57 | XML.renderBuilderFlush XML.def | ||
58 | =$= builderToByteStringFlush | ||
59 | =$= discardFlush | ||
60 | =$ snk | ||
61 | where | ||
62 | discardFlush :: Monad m => ConduitM (Flush a) a m () | ||
63 | discardFlush = awaitForever $ \x -> do | ||
64 | let unchunk (Chunk a) = a | ||
65 | ischunk (Chunk _) = True | ||
66 | ischunk _ = False | ||
67 | when (ischunk x) $ yield (unchunk x) | ||
68 | |||
69 | src = do | ||
70 | v <- lift conread | ||
71 | maybe (return ()) -- lift . wlog $ "conread: Nothing") | ||
72 | (\v -> yield v >> src) | ||
73 | v | ||
74 | snk = awaitForever $ liftIO . conwrite | ||
75 | |||
76 | |||
77 | type FlagCommand = STM Bool | ||
78 | type ReadCommand = IO (Maybe ByteString) | ||
79 | type WriteCommand = ByteString -> IO Bool | ||
80 | |||
81 | data Stanza | ||
82 | = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | ||
83 | | PingStanza { stanzaId :: Maybe Text | ||
84 | , stanzaChan :: TChan (Maybe XML.Event) } | ||
85 | | PongStanza { -- stanzaId :: Maybe Text | ||
86 | stanzaChan :: TChan (Maybe XML.Event) } | ||
87 | |||
88 | copyToChannel f chan = awaitForever copy | ||
89 | where | ||
90 | copy x = do | ||
91 | liftIO . atomically $ writeTChan chan (f x) | ||
92 | yield x | ||
93 | |||
94 | |||
95 | prettyPrint prefix xs = | ||
96 | liftIO $ | ||
97 | CL.sourceList xs | ||
98 | $= XML.renderBytes (XML.def { XML.rsPretty=True }) | ||
99 | =$= CB.lines | ||
100 | $$ CL.mapM_ (wlogb . (prefix <>)) | ||
101 | |||
102 | grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza)) | ||
103 | grockStanzaIQGet stanza = do | ||
104 | let mid = lookupAttrib "id" (tagAttrs stanza) | ||
105 | -- mfrom = lookupAttrib "from" (tagAttrs stanza) | ||
106 | mtag <- nextElement | ||
107 | flip (maybe $ return Nothing) mtag $ \tag -> do | ||
108 | case tagName tag of | ||
109 | "{urn:xmpp:ping}ping" -> do | ||
110 | return $ Just (PingStanza mid) | ||
111 | _ -> return Nothing | ||
112 | |||
113 | ioWriteChan c v = liftIO . atomically $ writeTChan c v | ||
114 | |||
115 | xmppInbound :: ConnectionKey -> FlagCommand | ||
116 | -> Source IO XML.Event | ||
117 | -> TChan Stanza | ||
118 | -> TChan Stanza | ||
119 | -> Sink XML.Event IO () | ||
120 | xmppInbound k pingflag src stanzas output = doNestingXML $ do | ||
121 | withXML $ \begindoc -> do | ||
122 | when (begindoc==EventBeginDocument) $ do | ||
123 | whenJust nextElement $ \xml -> do | ||
124 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do | ||
125 | fix $ \loop -> do | ||
126 | -- liftIO . wlog $ "waiting for stanza." | ||
127 | chan <- liftIO $ atomically newTChan | ||
128 | whenJust nextElement $ \stanza -> do | ||
129 | stanza_lvl <- nesting | ||
130 | ioWriteChan chan (Just stanza) | ||
131 | copyToChannel Just chan =$= do | ||
132 | dispatch <- | ||
133 | case () of | ||
134 | _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza | ||
135 | _ -> return $ Just UnrecognizedStanza | ||
136 | flip (maybe $ return ()) dispatch $ \dispatch -> | ||
137 | case dispatch chan of | ||
138 | d@(PingStanza {}) -> do | ||
139 | let to = "todo" | ||
140 | from = "todo" | ||
141 | let pong = peerPong (stanzaId d) to from | ||
142 | pongChan <- liftIO $ atomically newTChan | ||
143 | ioWriteChan output (PongStanza pongChan) | ||
144 | mapM_ (ioWriteChan pongChan . Just) pong | ||
145 | ioWriteChan pongChan Nothing | ||
146 | disp -> ioWriteChan stanzas disp | ||
147 | awaitCloser stanza_lvl | ||
148 | ioWriteChan chan Nothing | ||
149 | loop | ||
150 | |||
151 | |||
152 | chanContents :: TChan x -> IO [x] | ||
153 | chanContents ch = do | ||
154 | x <- atomically $ do | ||
155 | bempty <- isEmptyTChan ch | ||
156 | if bempty | ||
157 | then return Nothing | ||
158 | else fmap Just $ readTChan ch | ||
159 | maybe (return []) | ||
160 | (\x -> do | ||
161 | xs <- chanContents ch | ||
162 | return (x:xs)) | ||
163 | x | ||
164 | |||
165 | readUntilNothing :: TChan (Maybe x) -> IO [x] | ||
166 | readUntilNothing ch = do | ||
167 | x <- atomically $ readTChan ch | ||
168 | maybe (return []) | ||
169 | (\x -> do | ||
170 | xs <- readUntilNothing ch | ||
171 | return (x:xs)) | ||
172 | x | ||
173 | |||
174 | |||
175 | greetPeer = | ||
176 | [ EventBeginDocument | ||
177 | , EventBeginElement (streamP "stream") | ||
178 | [ attr "xmlns" "jabber:server" | ||
179 | , attr "version" "1.0" | ||
180 | ] | ||
181 | ] | ||
182 | |||
183 | goodbyePeer = | ||
184 | [ EventEndElement (streamP "stream") | ||
185 | , EventEndDocument | ||
186 | ] | ||
187 | |||
188 | data XMPPState | ||
189 | = PingSlot | ||
190 | deriving (Eq,Ord) | ||
191 | |||
192 | |||
193 | peerPing :: Maybe Text -> Text -> Text -> [XML.Event] | ||
194 | peerPing mid to from = | ||
195 | [ EventBeginElement "{jabber:server}iq" | ||
196 | $ (case mid of | ||
197 | Just c -> (("id",[ContentText c]):) | ||
198 | _ -> id ) | ||
199 | [ ("type",[ContentText "get"]) | ||
200 | , attr "to" to | ||
201 | , attr "from" from | ||
202 | ] | ||
203 | , EventBeginElement "{urn:xmpp:ping}ping" [] | ||
204 | , EventEndElement "{urn:xmpp:ping}ping" | ||
205 | , EventEndElement "{jabber:server}iq" ] | ||
206 | |||
207 | peerPong mid to from = | ||
208 | [ EventBeginElement "{jabber:server}iq" | ||
209 | $(case mid of | ||
210 | Just c -> (("id",[ContentText c]):) | ||
211 | _ -> id) | ||
212 | [ attr "type" "result" | ||
213 | , attr "to" to | ||
214 | , attr "from" from | ||
215 | ] | ||
216 | , EventEndElement "{jabber:server}iq" | ||
217 | ] | ||
218 | |||
219 | |||
220 | forkConnection :: ConnectionKey | ||
221 | -> FlagCommand | ||
222 | -> Source IO XML.Event | ||
223 | -> Sink (Flush XML.Event) IO () | ||
224 | -> TChan Stanza | ||
225 | -> IO (TChan Stanza) | ||
226 | forkConnection k pingflag src snk stanzas = do | ||
227 | rdone <- atomically newEmptyTMVar | ||
228 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement | ||
229 | needsFlush <- atomically $ newTVar False | ||
230 | let _ = slots :: Slotted.UpdateStream XMPPState XML.Event | ||
231 | let greet_src = do | ||
232 | CL.sourceList greetPeer =$= CL.map Chunk | ||
233 | yield Flush | ||
234 | slot_src = do | ||
235 | what <- lift . atomically $ foldr1 orElse | ||
236 | [Slotted.pull slots >>= \x -> do | ||
237 | writeTVar needsFlush True | ||
238 | return $ do | ||
239 | yield (Chunk x) | ||
240 | slot_src | ||
241 | ,do Slotted.isEmpty slots >>= check | ||
242 | readTVar needsFlush >>= check | ||
243 | writeTVar needsFlush False | ||
244 | return $ do | ||
245 | yield Flush | ||
246 | slot_src | ||
247 | ,readTMVar rdone >> return (return ()) | ||
248 | ] | ||
249 | what | ||
250 | forkIO $ do (greet_src >> slot_src) $$ snk | ||
251 | wlog $ "end post-queue fork: " ++ show k | ||
252 | output <- atomically newTChan | ||
253 | forkIO $ do | ||
254 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | ||
255 | fix $ \loop -> do | ||
256 | what <- atomically $ foldr1 orElse | ||
257 | [readTChan output >>= \stanza -> return $ do | ||
258 | let xchan = stanzaChan stanza | ||
259 | fix $ \inner -> do | ||
260 | what <- atomically $ orElse | ||
261 | (readTChan xchan >>= \mxml -> return $ do | ||
262 | case mxml of | ||
263 | Just xml -> do | ||
264 | atomically $ Slotted.push slots Nothing xml | ||
265 | inner | ||
266 | Nothing -> loop) | ||
267 | (readTMVar rdone >> return (return ())) | ||
268 | what | ||
269 | ,do pingflag >>= check | ||
270 | return $ do | ||
271 | let to = addrToText (callBackAddress k) | ||
272 | from = "todo" -- Look it up from Server object | ||
273 | -- or pass it with Connection event. | ||
274 | mid = Just "ping" | ||
275 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) | ||
276 | (peerPing mid to from) | ||
277 | loop | ||
278 | ,readTMVar rdone >> return (return ()) | ||
279 | ] | ||
280 | what | ||
281 | wlog $ "end pre-queue fork: " ++ show k | ||
282 | forkIO $ do | ||
283 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | ||
284 | src $$ xmppInbound k pingflag src stanzas output | ||
285 | atomically $ putTMVar rdone () | ||
286 | wlog $ "end reader fork: " ++ show k | ||
287 | return output | ||
288 | |||
289 | data ConnectionKey | ||
290 | = PeerKey { callBackAddress :: SockAddr } | ||
291 | | ClientKey { localAddress :: SockAddr } | ||
292 | deriving (Show, Ord, Eq) | ||
293 | |||
294 | {- | ||
295 | data Peer = Peer | ||
296 | { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis | ||
297 | , peerState :: TVar PeerState | ||
298 | } | ||
299 | data PeerState | ||
300 | = PeerPendingConnect UTCTime | ||
301 | | PeerPendingAccept UTCTime | ||
302 | | PeerConnected (TChan Stanza) | ||
303 | -} | ||
304 | |||
305 | peerKey (sock,addr) = do | ||
306 | peer <- | ||
307 | sIsConnected sock >>= \c -> | ||
308 | if c then getPeerName sock -- addr is normally socketName | ||
309 | else return addr -- Weird hack: addr is would-be peer name | ||
310 | return $ PeerKey (peer `withPort` fromIntegral peerport) | ||
311 | |||
312 | clientKey (sock,addr) = return $ ClientKey addr | ||
313 | |||
314 | monitor sv params = do | ||
315 | chan <- return $ serverEvent sv | ||
316 | stanzas <- atomically newTChan | ||
317 | fix $ \loop -> do | ||
318 | action <- atomically $ foldr1 orElse | ||
319 | [ readTChan chan >>= \(k,e) -> return $ do | ||
320 | case e of | ||
321 | Connection pingflag conread conwrite -> do | ||
322 | wlog $ tomsg k "Connection" | ||
323 | let (xsrc,xsnk) = xmlStream conread conwrite | ||
324 | forkConnection k pingflag xsrc xsnk stanzas | ||
325 | return () | ||
326 | ConnectFailure addr -> do | ||
327 | wlog $ tomsg k "ConnectFailure" | ||
328 | EOF -> wlog $ tomsg k "EOF" | ||
329 | HalfConnection In -> do | ||
330 | wlog $ tomsg k "ReadOnly" | ||
331 | control sv (Connect (callBackAddress k) params) | ||
332 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | ||
333 | RequiresPing -> wlog $ tomsg k "RequiresPing" | ||
334 | _ -> return () | ||
335 | , readTChan stanzas >>= \stanza -> return $ do | ||
336 | xs <- readUntilNothing (stanzaChan stanza) | ||
337 | wlog "" | ||
338 | prettyPrint "STANZA: " xs | ||
339 | ] | ||
340 | action | ||
341 | loop | ||
342 | where | ||
343 | tomsg k str = printf "%12s %s" str (show k) | ||
344 | where | ||
345 | _ = str :: String | ||
346 | |||
347 | |||
348 | peerport = 5269 | ||
349 | clientport = 5222 | ||
350 | |||
351 | main = runResourceT $ do | 17 | main = runResourceT $ do |
352 | sv <- server | 18 | (sv,peer_params) <- xmppServer |
353 | lift $ do | 19 | liftIO $ do |
354 | peer_params <- return (connectionDefaults peerKey) | ||
355 | { pingInterval = 10000 | ||
356 | , timeout = 10000 | ||
357 | , duplex = False } | ||
358 | client_params <- return $ connectionDefaults clientKey | ||
359 | let testaddr0 = "fd97:ca88:fa7c:b94b:c8b8:fad4:1021:a54d" | 20 | let testaddr0 = "fd97:ca88:fa7c:b94b:c8b8:fad4:1021:a54d" |
360 | -- testaddr0 = "fdef:9e0b:b502:52c3:c074:28d3:fcd7:bfb7" | 21 | -- testaddr0 = "fdef:9e0b:b502:52c3:c074:28d3:fcd7:bfb7" |
361 | testaddr<- fmap (addrAddress . head) $ | 22 | testaddr<- fmap (addrAddress . head) $ |
@@ -364,15 +25,11 @@ main = runResourceT $ do | |||
364 | (Just "5269") | 25 | (Just "5269") |
365 | putStrLn $ "Connecting to "++show testaddr | 26 | putStrLn $ "Connecting to "++show testaddr |
366 | control sv (ConnectWithEndlessRetry testaddr peer_params 10000) | 27 | control sv (ConnectWithEndlessRetry testaddr peer_params 10000) |
367 | forkIO $ monitor sv peer_params | ||
368 | control sv (Listen peerport peer_params) | ||
369 | -- control sv (Listen clientport client_params) | ||
370 | 28 | ||
371 | -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c | ||
372 | quitVar <- newEmptyTMVarIO | 29 | quitVar <- newEmptyTMVarIO |
373 | installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing | 30 | installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing |
374 | installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing | 31 | installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing |
375 | quitMessage <- atomically $ takeTMVar quitVar | 32 | quitMessage <- atomically $ takeTMVar quitVar |
376 | 33 | ||
377 | wlog "goodbye." | 34 | putStrLn "goodbye." |
378 | return () | 35 | return () |