diff options
author | joe <joe@jerkface.net> | 2014-02-14 00:02:25 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2014-02-14 00:02:25 -0500 |
commit | b4a27723cc386e3a397e6f8c8e32564ac57c4b0e (patch) | |
tree | 8446dabcf3697ab473d495571b5abf573f62c2b9 /Presence | |
parent | 237d226408a7788a30cb49f959619e05338054a0 (diff) |
Refactored most of xmppServer into a new module XMPPServer
Diffstat (limited to 'Presence')
-rw-r--r-- | Presence/Server.hs | 2 | ||||
-rw-r--r-- | Presence/XMPPServer.hs | 368 |
2 files changed, 370 insertions, 0 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs index 312634cf..14eab06c 100644 --- a/Presence/Server.hs +++ b/Presence/Server.hs | |||
@@ -193,6 +193,8 @@ data Server a | |||
193 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) | 193 | , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) |
194 | } | 194 | } |
195 | 195 | ||
196 | control sv = atomically . putTMVar (serverCommand sv) | ||
197 | |||
196 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' | 198 | -- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' |
197 | -- to ensure proper cleanup. For example, | 199 | -- to ensure proper cleanup. For example, |
198 | -- | 200 | -- |
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs new file mode 100644 index 00000000..95110301 --- /dev/null +++ b/Presence/XMPPServer.hs | |||
@@ -0,0 +1,368 @@ | |||
1 | {-# LANGUAGE OverloadedStrings #-} | ||
2 | module XMPPServer | ||
3 | ( xmppServer | ||
4 | ) where | ||
5 | import Control.Monad.Trans.Resource (runResourceT) | ||
6 | import Control.Monad.Trans (lift) | ||
7 | import Control.Monad.IO.Class (MonadIO, liftIO) | ||
8 | import Control.Monad.Fix (fix) | ||
9 | import Control.Monad | ||
10 | import Control.Concurrent (forkIO) | ||
11 | import Control.Concurrent.STM | ||
12 | -- import Control.Concurrent.STM.TChan | ||
13 | import Network.Socket | ||
14 | import XMPPTypes (withPort) | ||
15 | import Text.Printf | ||
16 | import System.Posix.Signals | ||
17 | import Data.ByteString (ByteString) | ||
18 | import qualified Data.ByteString.Char8 as Strict8 | ||
19 | -- import qualified Data.ByteString.Lazy.Char8 as Lazy8 | ||
20 | |||
21 | import Data.Conduit | ||
22 | import qualified Data.Conduit.List as CL | ||
23 | import qualified Data.Conduit.Binary as CB | ||
24 | import Data.Conduit.Blaze (builderToByteStringFlush) | ||
25 | |||
26 | import qualified Text.XML.Stream.Render as XML | ||
27 | import qualified Text.XML.Stream.Parse as XML | ||
28 | import Data.XML.Types as XML | ||
29 | import Data.Maybe (catMaybes,fromJust) | ||
30 | import Data.Monoid ( (<>) ) | ||
31 | import Data.Text (Text) | ||
32 | import qualified Data.Text as Text (pack) | ||
33 | |||
34 | import qualified Control.Concurrent.STM.UpdateStream as Slotted | ||
35 | import ControlMaybe | ||
36 | import Nesting | ||
37 | import EventUtil | ||
38 | import Server | ||
39 | |||
40 | peerport = 5269 | ||
41 | clientport = 5222 | ||
42 | |||
43 | |||
44 | -- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error | ||
45 | |||
46 | addrToText :: SockAddr -> Text | ||
47 | addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr) | ||
48 | where stripColon s = pre where (pre,port) = break (==':') s | ||
49 | addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr) | ||
50 | where stripColon s = if null bracket then pre else pre ++ "]" | ||
51 | where | ||
52 | (pre,bracket) = break (==']') s | ||
53 | |||
54 | wlog s = putStrLn s | ||
55 | where _ = s :: String | ||
56 | wlogb s = Strict8.putStrLn s | ||
57 | |||
58 | xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event | ||
59 | , Sink (Flush XML.Event) IO () ) | ||
60 | xmlStream conread conwrite = (xsrc,xsnk) | ||
61 | where | ||
62 | xsrc = src $= XML.parseBytes XML.def | ||
63 | xsnk = -- XML.renderBytes XML.def =$ snk | ||
64 | XML.renderBuilderFlush XML.def | ||
65 | =$= builderToByteStringFlush | ||
66 | =$= discardFlush | ||
67 | =$ snk | ||
68 | where | ||
69 | discardFlush :: Monad m => ConduitM (Flush a) a m () | ||
70 | discardFlush = awaitForever $ \x -> do | ||
71 | let unchunk (Chunk a) = a | ||
72 | ischunk (Chunk _) = True | ||
73 | ischunk _ = False | ||
74 | when (ischunk x) $ yield (unchunk x) | ||
75 | |||
76 | src = do | ||
77 | v <- lift conread | ||
78 | maybe (return ()) -- lift . wlog $ "conread: Nothing") | ||
79 | (\v -> yield v >> src) | ||
80 | v | ||
81 | snk = awaitForever $ liftIO . conwrite | ||
82 | |||
83 | |||
84 | type FlagCommand = STM Bool | ||
85 | type ReadCommand = IO (Maybe ByteString) | ||
86 | type WriteCommand = ByteString -> IO Bool | ||
87 | |||
88 | data Stanza | ||
89 | = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) } | ||
90 | | PingStanza { stanzaId :: Maybe Text | ||
91 | , stanzaChan :: TChan (Maybe XML.Event) } | ||
92 | | PongStanza { -- stanzaId :: Maybe Text | ||
93 | stanzaChan :: TChan (Maybe XML.Event) } | ||
94 | |||
95 | copyToChannel f chan = awaitForever copy | ||
96 | where | ||
97 | copy x = do | ||
98 | liftIO . atomically $ writeTChan chan (f x) | ||
99 | yield x | ||
100 | |||
101 | |||
102 | prettyPrint prefix xs = | ||
103 | liftIO $ | ||
104 | CL.sourceList xs | ||
105 | $= XML.renderBytes (XML.def { XML.rsPretty=True }) | ||
106 | =$= CB.lines | ||
107 | $$ CL.mapM_ (wlogb . (prefix <>)) | ||
108 | |||
109 | grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza)) | ||
110 | grockStanzaIQGet stanza = do | ||
111 | let mid = lookupAttrib "id" (tagAttrs stanza) | ||
112 | -- mfrom = lookupAttrib "from" (tagAttrs stanza) | ||
113 | mtag <- nextElement | ||
114 | flip (maybe $ return Nothing) mtag $ \tag -> do | ||
115 | case tagName tag of | ||
116 | "{urn:xmpp:ping}ping" -> do | ||
117 | return $ Just (PingStanza mid) | ||
118 | _ -> return Nothing | ||
119 | |||
120 | ioWriteChan c v = liftIO . atomically $ writeTChan c v | ||
121 | |||
122 | xmppInbound :: ConnectionKey -> FlagCommand | ||
123 | -> Source IO XML.Event | ||
124 | -> TChan Stanza | ||
125 | -> TChan Stanza | ||
126 | -> Sink XML.Event IO () | ||
127 | xmppInbound k pingflag src stanzas output = doNestingXML $ do | ||
128 | withXML $ \begindoc -> do | ||
129 | when (begindoc==EventBeginDocument) $ do | ||
130 | whenJust nextElement $ \xml -> do | ||
131 | withJust (elementAttrs "stream" xml) $ \stream_attrs -> do | ||
132 | fix $ \loop -> do | ||
133 | -- liftIO . wlog $ "waiting for stanza." | ||
134 | chan <- liftIO $ atomically newTChan | ||
135 | whenJust nextElement $ \stanza -> do | ||
136 | stanza_lvl <- nesting | ||
137 | ioWriteChan chan (Just stanza) | ||
138 | copyToChannel Just chan =$= do | ||
139 | dispatch <- | ||
140 | case () of | ||
141 | _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza | ||
142 | _ -> return $ Just UnrecognizedStanza | ||
143 | flip (maybe $ return ()) dispatch $ \dispatch -> | ||
144 | case dispatch chan of | ||
145 | d@(PingStanza {}) -> do | ||
146 | let to = "todo" | ||
147 | from = "todo" | ||
148 | let pong = peerPong (stanzaId d) to from | ||
149 | pongChan <- liftIO $ atomically newTChan | ||
150 | ioWriteChan output (PongStanza pongChan) | ||
151 | mapM_ (ioWriteChan pongChan . Just) pong | ||
152 | ioWriteChan pongChan Nothing | ||
153 | disp -> ioWriteChan stanzas disp | ||
154 | awaitCloser stanza_lvl | ||
155 | ioWriteChan chan Nothing | ||
156 | loop | ||
157 | |||
158 | |||
159 | chanContents :: TChan x -> IO [x] | ||
160 | chanContents ch = do | ||
161 | x <- atomically $ do | ||
162 | bempty <- isEmptyTChan ch | ||
163 | if bempty | ||
164 | then return Nothing | ||
165 | else fmap Just $ readTChan ch | ||
166 | maybe (return []) | ||
167 | (\x -> do | ||
168 | xs <- chanContents ch | ||
169 | return (x:xs)) | ||
170 | x | ||
171 | |||
172 | readUntilNothing :: TChan (Maybe x) -> IO [x] | ||
173 | readUntilNothing ch = do | ||
174 | x <- atomically $ readTChan ch | ||
175 | maybe (return []) | ||
176 | (\x -> do | ||
177 | xs <- readUntilNothing ch | ||
178 | return (x:xs)) | ||
179 | x | ||
180 | |||
181 | |||
182 | greetPeer = | ||
183 | [ EventBeginDocument | ||
184 | , EventBeginElement (streamP "stream") | ||
185 | [ attr "xmlns" "jabber:server" | ||
186 | , attr "version" "1.0" | ||
187 | ] | ||
188 | ] | ||
189 | |||
190 | goodbyePeer = | ||
191 | [ EventEndElement (streamP "stream") | ||
192 | , EventEndDocument | ||
193 | ] | ||
194 | |||
195 | data XMPPState | ||
196 | = PingSlot | ||
197 | deriving (Eq,Ord) | ||
198 | |||
199 | |||
200 | peerPing :: Maybe Text -> Text -> Text -> [XML.Event] | ||
201 | peerPing mid to from = | ||
202 | [ EventBeginElement "{jabber:server}iq" | ||
203 | $ (case mid of | ||
204 | Just c -> (("id",[ContentText c]):) | ||
205 | _ -> id ) | ||
206 | [ ("type",[ContentText "get"]) | ||
207 | , attr "to" to | ||
208 | , attr "from" from | ||
209 | ] | ||
210 | , EventBeginElement "{urn:xmpp:ping}ping" [] | ||
211 | , EventEndElement "{urn:xmpp:ping}ping" | ||
212 | , EventEndElement "{jabber:server}iq" ] | ||
213 | |||
214 | peerPong mid to from = | ||
215 | [ EventBeginElement "{jabber:server}iq" | ||
216 | $(case mid of | ||
217 | Just c -> (("id",[ContentText c]):) | ||
218 | _ -> id) | ||
219 | [ attr "type" "result" | ||
220 | , attr "to" to | ||
221 | , attr "from" from | ||
222 | ] | ||
223 | , EventEndElement "{jabber:server}iq" | ||
224 | ] | ||
225 | |||
226 | |||
227 | forkConnection :: ConnectionKey | ||
228 | -> FlagCommand | ||
229 | -> Source IO XML.Event | ||
230 | -> Sink (Flush XML.Event) IO () | ||
231 | -> TChan Stanza | ||
232 | -> IO (TChan Stanza) | ||
233 | forkConnection k pingflag src snk stanzas = do | ||
234 | rdone <- atomically newEmptyTMVar | ||
235 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement | ||
236 | needsFlush <- atomically $ newTVar False | ||
237 | let _ = slots :: Slotted.UpdateStream XMPPState XML.Event | ||
238 | let greet_src = do | ||
239 | CL.sourceList greetPeer =$= CL.map Chunk | ||
240 | yield Flush | ||
241 | slot_src = do | ||
242 | what <- lift . atomically $ foldr1 orElse | ||
243 | [Slotted.pull slots >>= \x -> do | ||
244 | writeTVar needsFlush True | ||
245 | return $ do | ||
246 | yield (Chunk x) | ||
247 | slot_src | ||
248 | ,do Slotted.isEmpty slots >>= check | ||
249 | readTVar needsFlush >>= check | ||
250 | writeTVar needsFlush False | ||
251 | return $ do | ||
252 | yield Flush | ||
253 | slot_src | ||
254 | ,readTMVar rdone >> return (return ()) | ||
255 | ] | ||
256 | what | ||
257 | forkIO $ do (greet_src >> slot_src) $$ snk | ||
258 | wlog $ "end post-queue fork: " ++ show k | ||
259 | output <- atomically newTChan | ||
260 | forkIO $ do | ||
261 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | ||
262 | fix $ \loop -> do | ||
263 | what <- atomically $ foldr1 orElse | ||
264 | [readTChan output >>= \stanza -> return $ do | ||
265 | let xchan = stanzaChan stanza | ||
266 | fix $ \inner -> do | ||
267 | what <- atomically $ orElse | ||
268 | (readTChan xchan >>= \mxml -> return $ do | ||
269 | case mxml of | ||
270 | Just xml -> do | ||
271 | atomically $ Slotted.push slots Nothing xml | ||
272 | inner | ||
273 | Nothing -> loop) | ||
274 | (readTMVar rdone >> return (return ())) | ||
275 | what | ||
276 | ,do pingflag >>= check | ||
277 | return $ do | ||
278 | let to = addrToText (callBackAddress k) | ||
279 | from = "todo" -- Look it up from Server object | ||
280 | -- or pass it with Connection event. | ||
281 | mid = Just "ping" | ||
282 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) | ||
283 | (peerPing mid to from) | ||
284 | loop | ||
285 | ,readTMVar rdone >> return (return ()) | ||
286 | ] | ||
287 | what | ||
288 | wlog $ "end pre-queue fork: " ++ show k | ||
289 | forkIO $ do | ||
290 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | ||
291 | src $$ xmppInbound k pingflag src stanzas output | ||
292 | atomically $ putTMVar rdone () | ||
293 | wlog $ "end reader fork: " ++ show k | ||
294 | return output | ||
295 | |||
296 | data ConnectionKey | ||
297 | = PeerKey { callBackAddress :: SockAddr } | ||
298 | | ClientKey { localAddress :: SockAddr } | ||
299 | deriving (Show, Ord, Eq) | ||
300 | |||
301 | {- | ||
302 | data Peer = Peer | ||
303 | { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis | ||
304 | , peerState :: TVar PeerState | ||
305 | } | ||
306 | data PeerState | ||
307 | = PeerPendingConnect UTCTime | ||
308 | | PeerPendingAccept UTCTime | ||
309 | | PeerConnected (TChan Stanza) | ||
310 | -} | ||
311 | |||
312 | peerKey (sock,addr) = do | ||
313 | peer <- | ||
314 | sIsConnected sock >>= \c -> | ||
315 | if c then getPeerName sock -- addr is normally socketName | ||
316 | else return addr -- Weird hack: addr is would-be peer name | ||
317 | return $ PeerKey (peer `withPort` fromIntegral peerport) | ||
318 | |||
319 | clientKey (sock,addr) = return $ ClientKey addr | ||
320 | |||
321 | monitor sv params = do | ||
322 | chan <- return $ serverEvent sv | ||
323 | stanzas <- atomically newTChan | ||
324 | fix $ \loop -> do | ||
325 | action <- atomically $ foldr1 orElse | ||
326 | [ readTChan chan >>= \(k,e) -> return $ do | ||
327 | case e of | ||
328 | Connection pingflag conread conwrite -> do | ||
329 | wlog $ tomsg k "Connection" | ||
330 | let (xsrc,xsnk) = xmlStream conread conwrite | ||
331 | forkConnection k pingflag xsrc xsnk stanzas | ||
332 | return () | ||
333 | ConnectFailure addr -> do | ||
334 | wlog $ tomsg k "ConnectFailure" | ||
335 | EOF -> wlog $ tomsg k "EOF" | ||
336 | HalfConnection In -> do | ||
337 | wlog $ tomsg k "ReadOnly" | ||
338 | control sv (Connect (callBackAddress k) params) | ||
339 | HalfConnection Out -> wlog $ tomsg k "WriteOnly" | ||
340 | RequiresPing -> wlog $ tomsg k "RequiresPing" | ||
341 | _ -> return () | ||
342 | , readTChan stanzas >>= \stanza -> return $ do | ||
343 | xs <- readUntilNothing (stanzaChan stanza) | ||
344 | wlog "" | ||
345 | prettyPrint "STANZA: " xs | ||
346 | ] | ||
347 | action | ||
348 | loop | ||
349 | where | ||
350 | tomsg k str = printf "%12s %s" str (show k) | ||
351 | where | ||
352 | _ = str :: String | ||
353 | |||
354 | |||
355 | xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,ConnectionParameters ConnectionKey) | ||
356 | xmppServer = do | ||
357 | sv <- server | ||
358 | let peer_params = (connectionDefaults peerKey) | ||
359 | { pingInterval = 10000 | ||
360 | , timeout = 10000 | ||
361 | , duplex = False } | ||
362 | client_params = connectionDefaults clientKey | ||
363 | liftIO $ do | ||
364 | forkIO $ monitor sv peer_params | ||
365 | control sv (Listen peerport peer_params) | ||
366 | -- todo | ||
367 | -- control sv (Listen clientport client_params) | ||
368 | return (sv,peer_params) | ||