summaryrefslogtreecommitdiff
path: root/Presence
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2014-02-14 00:02:25 -0500
committerjoe <joe@jerkface.net>2014-02-14 00:02:25 -0500
commitb4a27723cc386e3a397e6f8c8e32564ac57c4b0e (patch)
tree8446dabcf3697ab473d495571b5abf573f62c2b9 /Presence
parent237d226408a7788a30cb49f959619e05338054a0 (diff)
Refactored most of xmppServer into a new module XMPPServer
Diffstat (limited to 'Presence')
-rw-r--r--Presence/Server.hs2
-rw-r--r--Presence/XMPPServer.hs368
2 files changed, 370 insertions, 0 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 312634cf..14eab06c 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -193,6 +193,8 @@ data Server a
193 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) 193 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
194 } 194 }
195 195
196control sv = atomically . putTMVar (serverCommand sv)
197
196-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' 198-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
197-- to ensure proper cleanup. For example, 199-- to ensure proper cleanup. For example,
198-- 200--
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
new file mode 100644
index 00000000..95110301
--- /dev/null
+++ b/Presence/XMPPServer.hs
@@ -0,0 +1,368 @@
1{-# LANGUAGE OverloadedStrings #-}
2module XMPPServer
3 ( xmppServer
4 ) where
5import Control.Monad.Trans.Resource (runResourceT)
6import Control.Monad.Trans (lift)
7import Control.Monad.IO.Class (MonadIO, liftIO)
8import Control.Monad.Fix (fix)
9import Control.Monad
10import Control.Concurrent (forkIO)
11import Control.Concurrent.STM
12-- import Control.Concurrent.STM.TChan
13import Network.Socket
14import XMPPTypes (withPort)
15import Text.Printf
16import System.Posix.Signals
17import Data.ByteString (ByteString)
18import qualified Data.ByteString.Char8 as Strict8
19-- import qualified Data.ByteString.Lazy.Char8 as Lazy8
20
21import Data.Conduit
22import qualified Data.Conduit.List as CL
23import qualified Data.Conduit.Binary as CB
24import Data.Conduit.Blaze (builderToByteStringFlush)
25
26import qualified Text.XML.Stream.Render as XML
27import qualified Text.XML.Stream.Parse as XML
28import Data.XML.Types as XML
29import Data.Maybe (catMaybes,fromJust)
30import Data.Monoid ( (<>) )
31import Data.Text (Text)
32import qualified Data.Text as Text (pack)
33
34import qualified Control.Concurrent.STM.UpdateStream as Slotted
35import ControlMaybe
36import Nesting
37import EventUtil
38import Server
39
40peerport = 5269
41clientport = 5222
42
43
44-- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error
45
46addrToText :: SockAddr -> Text
47addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
48 where stripColon s = pre where (pre,port) = break (==':') s
49addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr)
50 where stripColon s = if null bracket then pre else pre ++ "]"
51 where
52 (pre,bracket) = break (==']') s
53
54wlog s = putStrLn s
55 where _ = s :: String
56wlogb s = Strict8.putStrLn s
57
58xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
59 , Sink (Flush XML.Event) IO () )
60xmlStream conread conwrite = (xsrc,xsnk)
61 where
62 xsrc = src $= XML.parseBytes XML.def
63 xsnk = -- XML.renderBytes XML.def =$ snk
64 XML.renderBuilderFlush XML.def
65 =$= builderToByteStringFlush
66 =$= discardFlush
67 =$ snk
68 where
69 discardFlush :: Monad m => ConduitM (Flush a) a m ()
70 discardFlush = awaitForever $ \x -> do
71 let unchunk (Chunk a) = a
72 ischunk (Chunk _) = True
73 ischunk _ = False
74 when (ischunk x) $ yield (unchunk x)
75
76 src = do
77 v <- lift conread
78 maybe (return ()) -- lift . wlog $ "conread: Nothing")
79 (\v -> yield v >> src)
80 v
81 snk = awaitForever $ liftIO . conwrite
82
83
84type FlagCommand = STM Bool
85type ReadCommand = IO (Maybe ByteString)
86type WriteCommand = ByteString -> IO Bool
87
88data Stanza
89 = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) }
90 | PingStanza { stanzaId :: Maybe Text
91 , stanzaChan :: TChan (Maybe XML.Event) }
92 | PongStanza { -- stanzaId :: Maybe Text
93 stanzaChan :: TChan (Maybe XML.Event) }
94
95copyToChannel f chan = awaitForever copy
96 where
97 copy x = do
98 liftIO . atomically $ writeTChan chan (f x)
99 yield x
100
101
102prettyPrint prefix xs =
103 liftIO $
104 CL.sourceList xs
105 $= XML.renderBytes (XML.def { XML.rsPretty=True })
106 =$= CB.lines
107 $$ CL.mapM_ (wlogb . (prefix <>))
108
109grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza))
110grockStanzaIQGet stanza = do
111 let mid = lookupAttrib "id" (tagAttrs stanza)
112 -- mfrom = lookupAttrib "from" (tagAttrs stanza)
113 mtag <- nextElement
114 flip (maybe $ return Nothing) mtag $ \tag -> do
115 case tagName tag of
116 "{urn:xmpp:ping}ping" -> do
117 return $ Just (PingStanza mid)
118 _ -> return Nothing
119
120ioWriteChan c v = liftIO . atomically $ writeTChan c v
121
122xmppInbound :: ConnectionKey -> FlagCommand
123 -> Source IO XML.Event
124 -> TChan Stanza
125 -> TChan Stanza
126 -> Sink XML.Event IO ()
127xmppInbound k pingflag src stanzas output = doNestingXML $ do
128 withXML $ \begindoc -> do
129 when (begindoc==EventBeginDocument) $ do
130 whenJust nextElement $ \xml -> do
131 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
132 fix $ \loop -> do
133 -- liftIO . wlog $ "waiting for stanza."
134 chan <- liftIO $ atomically newTChan
135 whenJust nextElement $ \stanza -> do
136 stanza_lvl <- nesting
137 ioWriteChan chan (Just stanza)
138 copyToChannel Just chan =$= do
139 dispatch <-
140 case () of
141 _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza
142 _ -> return $ Just UnrecognizedStanza
143 flip (maybe $ return ()) dispatch $ \dispatch ->
144 case dispatch chan of
145 d@(PingStanza {}) -> do
146 let to = "todo"
147 from = "todo"
148 let pong = peerPong (stanzaId d) to from
149 pongChan <- liftIO $ atomically newTChan
150 ioWriteChan output (PongStanza pongChan)
151 mapM_ (ioWriteChan pongChan . Just) pong
152 ioWriteChan pongChan Nothing
153 disp -> ioWriteChan stanzas disp
154 awaitCloser stanza_lvl
155 ioWriteChan chan Nothing
156 loop
157
158
159chanContents :: TChan x -> IO [x]
160chanContents ch = do
161 x <- atomically $ do
162 bempty <- isEmptyTChan ch
163 if bempty
164 then return Nothing
165 else fmap Just $ readTChan ch
166 maybe (return [])
167 (\x -> do
168 xs <- chanContents ch
169 return (x:xs))
170 x
171
172readUntilNothing :: TChan (Maybe x) -> IO [x]
173readUntilNothing ch = do
174 x <- atomically $ readTChan ch
175 maybe (return [])
176 (\x -> do
177 xs <- readUntilNothing ch
178 return (x:xs))
179 x
180
181
182greetPeer =
183 [ EventBeginDocument
184 , EventBeginElement (streamP "stream")
185 [ attr "xmlns" "jabber:server"
186 , attr "version" "1.0"
187 ]
188 ]
189
190goodbyePeer =
191 [ EventEndElement (streamP "stream")
192 , EventEndDocument
193 ]
194
195data XMPPState
196 = PingSlot
197 deriving (Eq,Ord)
198
199
200peerPing :: Maybe Text -> Text -> Text -> [XML.Event]
201peerPing mid to from =
202 [ EventBeginElement "{jabber:server}iq"
203 $ (case mid of
204 Just c -> (("id",[ContentText c]):)
205 _ -> id )
206 [ ("type",[ContentText "get"])
207 , attr "to" to
208 , attr "from" from
209 ]
210 , EventBeginElement "{urn:xmpp:ping}ping" []
211 , EventEndElement "{urn:xmpp:ping}ping"
212 , EventEndElement "{jabber:server}iq" ]
213
214peerPong mid to from =
215 [ EventBeginElement "{jabber:server}iq"
216 $(case mid of
217 Just c -> (("id",[ContentText c]):)
218 _ -> id)
219 [ attr "type" "result"
220 , attr "to" to
221 , attr "from" from
222 ]
223 , EventEndElement "{jabber:server}iq"
224 ]
225
226
227forkConnection :: ConnectionKey
228 -> FlagCommand
229 -> Source IO XML.Event
230 -> Sink (Flush XML.Event) IO ()
231 -> TChan Stanza
232 -> IO (TChan Stanza)
233forkConnection k pingflag src snk stanzas = do
234 rdone <- atomically newEmptyTMVar
235 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement
236 needsFlush <- atomically $ newTVar False
237 let _ = slots :: Slotted.UpdateStream XMPPState XML.Event
238 let greet_src = do
239 CL.sourceList greetPeer =$= CL.map Chunk
240 yield Flush
241 slot_src = do
242 what <- lift . atomically $ foldr1 orElse
243 [Slotted.pull slots >>= \x -> do
244 writeTVar needsFlush True
245 return $ do
246 yield (Chunk x)
247 slot_src
248 ,do Slotted.isEmpty slots >>= check
249 readTVar needsFlush >>= check
250 writeTVar needsFlush False
251 return $ do
252 yield Flush
253 slot_src
254 ,readTMVar rdone >> return (return ())
255 ]
256 what
257 forkIO $ do (greet_src >> slot_src) $$ snk
258 wlog $ "end post-queue fork: " ++ show k
259 output <- atomically newTChan
260 forkIO $ do
261 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
262 fix $ \loop -> do
263 what <- atomically $ foldr1 orElse
264 [readTChan output >>= \stanza -> return $ do
265 let xchan = stanzaChan stanza
266 fix $ \inner -> do
267 what <- atomically $ orElse
268 (readTChan xchan >>= \mxml -> return $ do
269 case mxml of
270 Just xml -> do
271 atomically $ Slotted.push slots Nothing xml
272 inner
273 Nothing -> loop)
274 (readTMVar rdone >> return (return ()))
275 what
276 ,do pingflag >>= check
277 return $ do
278 let to = addrToText (callBackAddress k)
279 from = "todo" -- Look it up from Server object
280 -- or pass it with Connection event.
281 mid = Just "ping"
282 mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
283 (peerPing mid to from)
284 loop
285 ,readTMVar rdone >> return (return ())
286 ]
287 what
288 wlog $ "end pre-queue fork: " ++ show k
289 forkIO $ do
290 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
291 src $$ xmppInbound k pingflag src stanzas output
292 atomically $ putTMVar rdone ()
293 wlog $ "end reader fork: " ++ show k
294 return output
295
296data ConnectionKey
297 = PeerKey { callBackAddress :: SockAddr }
298 | ClientKey { localAddress :: SockAddr }
299 deriving (Show, Ord, Eq)
300
301{-
302data Peer = Peer
303 { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
304 , peerState :: TVar PeerState
305 }
306data PeerState
307 = PeerPendingConnect UTCTime
308 | PeerPendingAccept UTCTime
309 | PeerConnected (TChan Stanza)
310-}
311
312peerKey (sock,addr) = do
313 peer <-
314 sIsConnected sock >>= \c ->
315 if c then getPeerName sock -- addr is normally socketName
316 else return addr -- Weird hack: addr is would-be peer name
317 return $ PeerKey (peer `withPort` fromIntegral peerport)
318
319clientKey (sock,addr) = return $ ClientKey addr
320
321monitor sv params = do
322 chan <- return $ serverEvent sv
323 stanzas <- atomically newTChan
324 fix $ \loop -> do
325 action <- atomically $ foldr1 orElse
326 [ readTChan chan >>= \(k,e) -> return $ do
327 case e of
328 Connection pingflag conread conwrite -> do
329 wlog $ tomsg k "Connection"
330 let (xsrc,xsnk) = xmlStream conread conwrite
331 forkConnection k pingflag xsrc xsnk stanzas
332 return ()
333 ConnectFailure addr -> do
334 wlog $ tomsg k "ConnectFailure"
335 EOF -> wlog $ tomsg k "EOF"
336 HalfConnection In -> do
337 wlog $ tomsg k "ReadOnly"
338 control sv (Connect (callBackAddress k) params)
339 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
340 RequiresPing -> wlog $ tomsg k "RequiresPing"
341 _ -> return ()
342 , readTChan stanzas >>= \stanza -> return $ do
343 xs <- readUntilNothing (stanzaChan stanza)
344 wlog ""
345 prettyPrint "STANZA: " xs
346 ]
347 action
348 loop
349 where
350 tomsg k str = printf "%12s %s" str (show k)
351 where
352 _ = str :: String
353
354
355xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,ConnectionParameters ConnectionKey)
356xmppServer = do
357 sv <- server
358 let peer_params = (connectionDefaults peerKey)
359 { pingInterval = 10000
360 , timeout = 10000
361 , duplex = False }
362 client_params = connectionDefaults clientKey
363 liftIO $ do
364 forkIO $ monitor sv peer_params
365 control sv (Listen peerport peer_params)
366 -- todo
367 -- control sv (Listen clientport client_params)
368 return (sv,peer_params)