summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Presence/Server.hs2
-rw-r--r--Presence/XMPPServer.hs368
-rw-r--r--xmppServer.hs369
3 files changed, 383 insertions, 356 deletions
diff --git a/Presence/Server.hs b/Presence/Server.hs
index 312634cf..14eab06c 100644
--- a/Presence/Server.hs
+++ b/Presence/Server.hs
@@ -193,6 +193,8 @@ data Server a
193 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay)) 193 , retrymap :: TVar (Map SockAddr (TVar Bool,InterruptableDelay))
194 } 194 }
195 195
196control sv = atomically . putTMVar (serverCommand sv)
197
196-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT' 198-- | Construct a 'Server' object. Use 'Control.Monad.Trans.Resource.ResourceT'
197-- to ensure proper cleanup. For example, 199-- to ensure proper cleanup. For example,
198-- 200--
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
new file mode 100644
index 00000000..95110301
--- /dev/null
+++ b/Presence/XMPPServer.hs
@@ -0,0 +1,368 @@
1{-# LANGUAGE OverloadedStrings #-}
2module XMPPServer
3 ( xmppServer
4 ) where
5import Control.Monad.Trans.Resource (runResourceT)
6import Control.Monad.Trans (lift)
7import Control.Monad.IO.Class (MonadIO, liftIO)
8import Control.Monad.Fix (fix)
9import Control.Monad
10import Control.Concurrent (forkIO)
11import Control.Concurrent.STM
12-- import Control.Concurrent.STM.TChan
13import Network.Socket
14import XMPPTypes (withPort)
15import Text.Printf
16import System.Posix.Signals
17import Data.ByteString (ByteString)
18import qualified Data.ByteString.Char8 as Strict8
19-- import qualified Data.ByteString.Lazy.Char8 as Lazy8
20
21import Data.Conduit
22import qualified Data.Conduit.List as CL
23import qualified Data.Conduit.Binary as CB
24import Data.Conduit.Blaze (builderToByteStringFlush)
25
26import qualified Text.XML.Stream.Render as XML
27import qualified Text.XML.Stream.Parse as XML
28import Data.XML.Types as XML
29import Data.Maybe (catMaybes,fromJust)
30import Data.Monoid ( (<>) )
31import Data.Text (Text)
32import qualified Data.Text as Text (pack)
33
34import qualified Control.Concurrent.STM.UpdateStream as Slotted
35import ControlMaybe
36import Nesting
37import EventUtil
38import Server
39
40peerport = 5269
41clientport = 5222
42
43
44-- TODO: http://xmpp.org/rfcs/rfc6120.html#rules-remote-error
45
46addrToText :: SockAddr -> Text
47addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
48 where stripColon s = pre where (pre,port) = break (==':') s
49addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr)
50 where stripColon s = if null bracket then pre else pre ++ "]"
51 where
52 (pre,bracket) = break (==']') s
53
54wlog s = putStrLn s
55 where _ = s :: String
56wlogb s = Strict8.putStrLn s
57
58xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
59 , Sink (Flush XML.Event) IO () )
60xmlStream conread conwrite = (xsrc,xsnk)
61 where
62 xsrc = src $= XML.parseBytes XML.def
63 xsnk = -- XML.renderBytes XML.def =$ snk
64 XML.renderBuilderFlush XML.def
65 =$= builderToByteStringFlush
66 =$= discardFlush
67 =$ snk
68 where
69 discardFlush :: Monad m => ConduitM (Flush a) a m ()
70 discardFlush = awaitForever $ \x -> do
71 let unchunk (Chunk a) = a
72 ischunk (Chunk _) = True
73 ischunk _ = False
74 when (ischunk x) $ yield (unchunk x)
75
76 src = do
77 v <- lift conread
78 maybe (return ()) -- lift . wlog $ "conread: Nothing")
79 (\v -> yield v >> src)
80 v
81 snk = awaitForever $ liftIO . conwrite
82
83
84type FlagCommand = STM Bool
85type ReadCommand = IO (Maybe ByteString)
86type WriteCommand = ByteString -> IO Bool
87
88data Stanza
89 = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) }
90 | PingStanza { stanzaId :: Maybe Text
91 , stanzaChan :: TChan (Maybe XML.Event) }
92 | PongStanza { -- stanzaId :: Maybe Text
93 stanzaChan :: TChan (Maybe XML.Event) }
94
95copyToChannel f chan = awaitForever copy
96 where
97 copy x = do
98 liftIO . atomically $ writeTChan chan (f x)
99 yield x
100
101
102prettyPrint prefix xs =
103 liftIO $
104 CL.sourceList xs
105 $= XML.renderBytes (XML.def { XML.rsPretty=True })
106 =$= CB.lines
107 $$ CL.mapM_ (wlogb . (prefix <>))
108
109grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza))
110grockStanzaIQGet stanza = do
111 let mid = lookupAttrib "id" (tagAttrs stanza)
112 -- mfrom = lookupAttrib "from" (tagAttrs stanza)
113 mtag <- nextElement
114 flip (maybe $ return Nothing) mtag $ \tag -> do
115 case tagName tag of
116 "{urn:xmpp:ping}ping" -> do
117 return $ Just (PingStanza mid)
118 _ -> return Nothing
119
120ioWriteChan c v = liftIO . atomically $ writeTChan c v
121
122xmppInbound :: ConnectionKey -> FlagCommand
123 -> Source IO XML.Event
124 -> TChan Stanza
125 -> TChan Stanza
126 -> Sink XML.Event IO ()
127xmppInbound k pingflag src stanzas output = doNestingXML $ do
128 withXML $ \begindoc -> do
129 when (begindoc==EventBeginDocument) $ do
130 whenJust nextElement $ \xml -> do
131 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
132 fix $ \loop -> do
133 -- liftIO . wlog $ "waiting for stanza."
134 chan <- liftIO $ atomically newTChan
135 whenJust nextElement $ \stanza -> do
136 stanza_lvl <- nesting
137 ioWriteChan chan (Just stanza)
138 copyToChannel Just chan =$= do
139 dispatch <-
140 case () of
141 _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza
142 _ -> return $ Just UnrecognizedStanza
143 flip (maybe $ return ()) dispatch $ \dispatch ->
144 case dispatch chan of
145 d@(PingStanza {}) -> do
146 let to = "todo"
147 from = "todo"
148 let pong = peerPong (stanzaId d) to from
149 pongChan <- liftIO $ atomically newTChan
150 ioWriteChan output (PongStanza pongChan)
151 mapM_ (ioWriteChan pongChan . Just) pong
152 ioWriteChan pongChan Nothing
153 disp -> ioWriteChan stanzas disp
154 awaitCloser stanza_lvl
155 ioWriteChan chan Nothing
156 loop
157
158
159chanContents :: TChan x -> IO [x]
160chanContents ch = do
161 x <- atomically $ do
162 bempty <- isEmptyTChan ch
163 if bempty
164 then return Nothing
165 else fmap Just $ readTChan ch
166 maybe (return [])
167 (\x -> do
168 xs <- chanContents ch
169 return (x:xs))
170 x
171
172readUntilNothing :: TChan (Maybe x) -> IO [x]
173readUntilNothing ch = do
174 x <- atomically $ readTChan ch
175 maybe (return [])
176 (\x -> do
177 xs <- readUntilNothing ch
178 return (x:xs))
179 x
180
181
182greetPeer =
183 [ EventBeginDocument
184 , EventBeginElement (streamP "stream")
185 [ attr "xmlns" "jabber:server"
186 , attr "version" "1.0"
187 ]
188 ]
189
190goodbyePeer =
191 [ EventEndElement (streamP "stream")
192 , EventEndDocument
193 ]
194
195data XMPPState
196 = PingSlot
197 deriving (Eq,Ord)
198
199
200peerPing :: Maybe Text -> Text -> Text -> [XML.Event]
201peerPing mid to from =
202 [ EventBeginElement "{jabber:server}iq"
203 $ (case mid of
204 Just c -> (("id",[ContentText c]):)
205 _ -> id )
206 [ ("type",[ContentText "get"])
207 , attr "to" to
208 , attr "from" from
209 ]
210 , EventBeginElement "{urn:xmpp:ping}ping" []
211 , EventEndElement "{urn:xmpp:ping}ping"
212 , EventEndElement "{jabber:server}iq" ]
213
214peerPong mid to from =
215 [ EventBeginElement "{jabber:server}iq"
216 $(case mid of
217 Just c -> (("id",[ContentText c]):)
218 _ -> id)
219 [ attr "type" "result"
220 , attr "to" to
221 , attr "from" from
222 ]
223 , EventEndElement "{jabber:server}iq"
224 ]
225
226
227forkConnection :: ConnectionKey
228 -> FlagCommand
229 -> Source IO XML.Event
230 -> Sink (Flush XML.Event) IO ()
231 -> TChan Stanza
232 -> IO (TChan Stanza)
233forkConnection k pingflag src snk stanzas = do
234 rdone <- atomically newEmptyTMVar
235 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement
236 needsFlush <- atomically $ newTVar False
237 let _ = slots :: Slotted.UpdateStream XMPPState XML.Event
238 let greet_src = do
239 CL.sourceList greetPeer =$= CL.map Chunk
240 yield Flush
241 slot_src = do
242 what <- lift . atomically $ foldr1 orElse
243 [Slotted.pull slots >>= \x -> do
244 writeTVar needsFlush True
245 return $ do
246 yield (Chunk x)
247 slot_src
248 ,do Slotted.isEmpty slots >>= check
249 readTVar needsFlush >>= check
250 writeTVar needsFlush False
251 return $ do
252 yield Flush
253 slot_src
254 ,readTMVar rdone >> return (return ())
255 ]
256 what
257 forkIO $ do (greet_src >> slot_src) $$ snk
258 wlog $ "end post-queue fork: " ++ show k
259 output <- atomically newTChan
260 forkIO $ do
261 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
262 fix $ \loop -> do
263 what <- atomically $ foldr1 orElse
264 [readTChan output >>= \stanza -> return $ do
265 let xchan = stanzaChan stanza
266 fix $ \inner -> do
267 what <- atomically $ orElse
268 (readTChan xchan >>= \mxml -> return $ do
269 case mxml of
270 Just xml -> do
271 atomically $ Slotted.push slots Nothing xml
272 inner
273 Nothing -> loop)
274 (readTMVar rdone >> return (return ()))
275 what
276 ,do pingflag >>= check
277 return $ do
278 let to = addrToText (callBackAddress k)
279 from = "todo" -- Look it up from Server object
280 -- or pass it with Connection event.
281 mid = Just "ping"
282 mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
283 (peerPing mid to from)
284 loop
285 ,readTMVar rdone >> return (return ())
286 ]
287 what
288 wlog $ "end pre-queue fork: " ++ show k
289 forkIO $ do
290 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
291 src $$ xmppInbound k pingflag src stanzas output
292 atomically $ putTMVar rdone ()
293 wlog $ "end reader fork: " ++ show k
294 return output
295
296data ConnectionKey
297 = PeerKey { callBackAddress :: SockAddr }
298 | ClientKey { localAddress :: SockAddr }
299 deriving (Show, Ord, Eq)
300
301{-
302data Peer = Peer
303 { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
304 , peerState :: TVar PeerState
305 }
306data PeerState
307 = PeerPendingConnect UTCTime
308 | PeerPendingAccept UTCTime
309 | PeerConnected (TChan Stanza)
310-}
311
312peerKey (sock,addr) = do
313 peer <-
314 sIsConnected sock >>= \c ->
315 if c then getPeerName sock -- addr is normally socketName
316 else return addr -- Weird hack: addr is would-be peer name
317 return $ PeerKey (peer `withPort` fromIntegral peerport)
318
319clientKey (sock,addr) = return $ ClientKey addr
320
321monitor sv params = do
322 chan <- return $ serverEvent sv
323 stanzas <- atomically newTChan
324 fix $ \loop -> do
325 action <- atomically $ foldr1 orElse
326 [ readTChan chan >>= \(k,e) -> return $ do
327 case e of
328 Connection pingflag conread conwrite -> do
329 wlog $ tomsg k "Connection"
330 let (xsrc,xsnk) = xmlStream conread conwrite
331 forkConnection k pingflag xsrc xsnk stanzas
332 return ()
333 ConnectFailure addr -> do
334 wlog $ tomsg k "ConnectFailure"
335 EOF -> wlog $ tomsg k "EOF"
336 HalfConnection In -> do
337 wlog $ tomsg k "ReadOnly"
338 control sv (Connect (callBackAddress k) params)
339 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
340 RequiresPing -> wlog $ tomsg k "RequiresPing"
341 _ -> return ()
342 , readTChan stanzas >>= \stanza -> return $ do
343 xs <- readUntilNothing (stanzaChan stanza)
344 wlog ""
345 prettyPrint "STANZA: " xs
346 ]
347 action
348 loop
349 where
350 tomsg k str = printf "%12s %s" str (show k)
351 where
352 _ = str :: String
353
354
355xmppServer :: (MonadResource m, MonadIO m) => m (Server ConnectionKey,ConnectionParameters ConnectionKey)
356xmppServer = do
357 sv <- server
358 let peer_params = (connectionDefaults peerKey)
359 { pingInterval = 10000
360 , timeout = 10000
361 , duplex = False }
362 client_params = connectionDefaults clientKey
363 liftIO $ do
364 forkIO $ monitor sv peer_params
365 control sv (Listen peerport peer_params)
366 -- todo
367 -- control sv (Listen clientport client_params)
368 return (sv,peer_params)
diff --git a/xmppServer.hs b/xmppServer.hs
index 7a61b00d..ce681cb5 100644
--- a/xmppServer.hs
+++ b/xmppServer.hs
@@ -1,361 +1,22 @@
1{-# LANGUAGE OverloadedStrings #-} 1import System.Posix.Signals
2import Control.Monad.Trans.Resource (runResourceT)
3import Control.Monad.Trans (lift)
4import Control.Monad.IO.Class (liftIO)
5import Control.Monad.Fix (fix)
6import Control.Monad
7import Control.Concurrent (forkIO)
8import Control.Concurrent.STM 2import Control.Concurrent.STM
9-- import Control.Concurrent.STM.TChan 3import Control.Monad.Trans.Resource (runResourceT)
4import Control.Monad.IO.Class (MonadIO, liftIO)
10import Network.Socket 5import Network.Socket
11import XMPPTypes (withPort) 6 ( addrAddress
12import Text.Printf 7 , getAddrInfo
13import System.Posix.Signals 8 , defaultHints
14import Data.ByteString (ByteString) 9 , addrFlags
15import qualified Data.ByteString.Char8 as Strict8 10 , AddrInfoFlag(AI_CANONNAME)
16-- import qualified Data.ByteString.Lazy.Char8 as Lazy8 11 )
17
18import Data.Conduit
19import qualified Data.Conduit.List as CL
20import qualified Data.Conduit.Binary as CB
21import Data.Conduit.Blaze (builderToByteStringFlush)
22 12
23import qualified Text.XML.Stream.Render as XML
24import qualified Text.XML.Stream.Parse as XML
25import Data.XML.Types as XML
26import Data.Maybe (catMaybes,fromJust)
27import Data.Monoid ( (<>) )
28import Data.Text (Text)
29import qualified Data.Text as Text (pack)
30 13
31import qualified Control.Concurrent.STM.UpdateStream as Slotted 14import XMPPServer
32import ControlMaybe
33import Nesting
34import EventUtil
35import Server 15import Server
36 16
37addrToText :: SockAddr -> Text
38addrToText (addr@(SockAddrInet _ _)) = Text.pack $ stripColon (show addr)
39 where stripColon s = pre where (pre,port) = break (==':') s
40addrToText (addr@(SockAddrInet6 _ _ _ _)) = Text.pack $ stripColon (show addr)
41 where stripColon s = if null bracket then pre else pre ++ "]"
42 where
43 (pre,bracket) = break (==']') s
44
45wlog s = putStrLn s
46 where _ = s :: String
47wlogb s = Strict8.putStrLn s
48
49control sv = atomically . putTMVar (serverCommand sv)
50
51xmlStream :: ReadCommand -> WriteCommand -> ( Source IO XML.Event
52 , Sink (Flush XML.Event) IO () )
53xmlStream conread conwrite = (xsrc,xsnk)
54 where
55 xsrc = src $= XML.parseBytes XML.def
56 xsnk = -- XML.renderBytes XML.def =$ snk
57 XML.renderBuilderFlush XML.def
58 =$= builderToByteStringFlush
59 =$= discardFlush
60 =$ snk
61 where
62 discardFlush :: Monad m => ConduitM (Flush a) a m ()
63 discardFlush = awaitForever $ \x -> do
64 let unchunk (Chunk a) = a
65 ischunk (Chunk _) = True
66 ischunk _ = False
67 when (ischunk x) $ yield (unchunk x)
68
69 src = do
70 v <- lift conread
71 maybe (return ()) -- lift . wlog $ "conread: Nothing")
72 (\v -> yield v >> src)
73 v
74 snk = awaitForever $ liftIO . conwrite
75
76
77type FlagCommand = STM Bool
78type ReadCommand = IO (Maybe ByteString)
79type WriteCommand = ByteString -> IO Bool
80
81data Stanza
82 = UnrecognizedStanza { stanzaChan :: TChan (Maybe XML.Event) }
83 | PingStanza { stanzaId :: Maybe Text
84 , stanzaChan :: TChan (Maybe XML.Event) }
85 | PongStanza { -- stanzaId :: Maybe Text
86 stanzaChan :: TChan (Maybe XML.Event) }
87
88copyToChannel f chan = awaitForever copy
89 where
90 copy x = do
91 liftIO . atomically $ writeTChan chan (f x)
92 yield x
93
94
95prettyPrint prefix xs =
96 liftIO $
97 CL.sourceList xs
98 $= XML.renderBytes (XML.def { XML.rsPretty=True })
99 =$= CB.lines
100 $$ CL.mapM_ (wlogb . (prefix <>))
101
102grockStanzaIQGet :: Monad m => XML.Event -> NestingXML o m (Maybe (TChan (Maybe Event) -> Stanza))
103grockStanzaIQGet stanza = do
104 let mid = lookupAttrib "id" (tagAttrs stanza)
105 -- mfrom = lookupAttrib "from" (tagAttrs stanza)
106 mtag <- nextElement
107 flip (maybe $ return Nothing) mtag $ \tag -> do
108 case tagName tag of
109 "{urn:xmpp:ping}ping" -> do
110 return $ Just (PingStanza mid)
111 _ -> return Nothing
112
113ioWriteChan c v = liftIO . atomically $ writeTChan c v
114
115xmppInbound :: ConnectionKey -> FlagCommand
116 -> Source IO XML.Event
117 -> TChan Stanza
118 -> TChan Stanza
119 -> Sink XML.Event IO ()
120xmppInbound k pingflag src stanzas output = doNestingXML $ do
121 withXML $ \begindoc -> do
122 when (begindoc==EventBeginDocument) $ do
123 whenJust nextElement $ \xml -> do
124 withJust (elementAttrs "stream" xml) $ \stream_attrs -> do
125 fix $ \loop -> do
126 -- liftIO . wlog $ "waiting for stanza."
127 chan <- liftIO $ atomically newTChan
128 whenJust nextElement $ \stanza -> do
129 stanza_lvl <- nesting
130 ioWriteChan chan (Just stanza)
131 copyToChannel Just chan =$= do
132 dispatch <-
133 case () of
134 _ | stanza `isServerIQOf` "get" -> grockStanzaIQGet stanza
135 _ -> return $ Just UnrecognizedStanza
136 flip (maybe $ return ()) dispatch $ \dispatch ->
137 case dispatch chan of
138 d@(PingStanza {}) -> do
139 let to = "todo"
140 from = "todo"
141 let pong = peerPong (stanzaId d) to from
142 pongChan <- liftIO $ atomically newTChan
143 ioWriteChan output (PongStanza pongChan)
144 mapM_ (ioWriteChan pongChan . Just) pong
145 ioWriteChan pongChan Nothing
146 disp -> ioWriteChan stanzas disp
147 awaitCloser stanza_lvl
148 ioWriteChan chan Nothing
149 loop
150
151
152chanContents :: TChan x -> IO [x]
153chanContents ch = do
154 x <- atomically $ do
155 bempty <- isEmptyTChan ch
156 if bempty
157 then return Nothing
158 else fmap Just $ readTChan ch
159 maybe (return [])
160 (\x -> do
161 xs <- chanContents ch
162 return (x:xs))
163 x
164
165readUntilNothing :: TChan (Maybe x) -> IO [x]
166readUntilNothing ch = do
167 x <- atomically $ readTChan ch
168 maybe (return [])
169 (\x -> do
170 xs <- readUntilNothing ch
171 return (x:xs))
172 x
173
174
175greetPeer =
176 [ EventBeginDocument
177 , EventBeginElement (streamP "stream")
178 [ attr "xmlns" "jabber:server"
179 , attr "version" "1.0"
180 ]
181 ]
182
183goodbyePeer =
184 [ EventEndElement (streamP "stream")
185 , EventEndDocument
186 ]
187
188data XMPPState
189 = PingSlot
190 deriving (Eq,Ord)
191
192
193peerPing :: Maybe Text -> Text -> Text -> [XML.Event]
194peerPing mid to from =
195 [ EventBeginElement "{jabber:server}iq"
196 $ (case mid of
197 Just c -> (("id",[ContentText c]):)
198 _ -> id )
199 [ ("type",[ContentText "get"])
200 , attr "to" to
201 , attr "from" from
202 ]
203 , EventBeginElement "{urn:xmpp:ping}ping" []
204 , EventEndElement "{urn:xmpp:ping}ping"
205 , EventEndElement "{jabber:server}iq" ]
206
207peerPong mid to from =
208 [ EventBeginElement "{jabber:server}iq"
209 $(case mid of
210 Just c -> (("id",[ContentText c]):)
211 _ -> id)
212 [ attr "type" "result"
213 , attr "to" to
214 , attr "from" from
215 ]
216 , EventEndElement "{jabber:server}iq"
217 ]
218
219
220forkConnection :: ConnectionKey
221 -> FlagCommand
222 -> Source IO XML.Event
223 -> Sink (Flush XML.Event) IO ()
224 -> TChan Stanza
225 -> IO (TChan Stanza)
226forkConnection k pingflag src snk stanzas = do
227 rdone <- atomically newEmptyTMVar
228 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement
229 needsFlush <- atomically $ newTVar False
230 let _ = slots :: Slotted.UpdateStream XMPPState XML.Event
231 let greet_src = do
232 CL.sourceList greetPeer =$= CL.map Chunk
233 yield Flush
234 slot_src = do
235 what <- lift . atomically $ foldr1 orElse
236 [Slotted.pull slots >>= \x -> do
237 writeTVar needsFlush True
238 return $ do
239 yield (Chunk x)
240 slot_src
241 ,do Slotted.isEmpty slots >>= check
242 readTVar needsFlush >>= check
243 writeTVar needsFlush False
244 return $ do
245 yield Flush
246 slot_src
247 ,readTMVar rdone >> return (return ())
248 ]
249 what
250 forkIO $ do (greet_src >> slot_src) $$ snk
251 wlog $ "end post-queue fork: " ++ show k
252 output <- atomically newTChan
253 forkIO $ do
254 -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer
255 fix $ \loop -> do
256 what <- atomically $ foldr1 orElse
257 [readTChan output >>= \stanza -> return $ do
258 let xchan = stanzaChan stanza
259 fix $ \inner -> do
260 what <- atomically $ orElse
261 (readTChan xchan >>= \mxml -> return $ do
262 case mxml of
263 Just xml -> do
264 atomically $ Slotted.push slots Nothing xml
265 inner
266 Nothing -> loop)
267 (readTMVar rdone >> return (return ()))
268 what
269 ,do pingflag >>= check
270 return $ do
271 let to = addrToText (callBackAddress k)
272 from = "todo" -- Look it up from Server object
273 -- or pass it with Connection event.
274 mid = Just "ping"
275 mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
276 (peerPing mid to from)
277 loop
278 ,readTMVar rdone >> return (return ())
279 ]
280 what
281 wlog $ "end pre-queue fork: " ++ show k
282 forkIO $ do
283 -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show)
284 src $$ xmppInbound k pingflag src stanzas output
285 atomically $ putTMVar rdone ()
286 wlog $ "end reader fork: " ++ show k
287 return output
288
289data ConnectionKey
290 = PeerKey { callBackAddress :: SockAddr }
291 | ClientKey { localAddress :: SockAddr }
292 deriving (Show, Ord, Eq)
293
294{-
295data Peer = Peer
296 { peerWanted :: TVar Bool -- ^ False when this peer is on a you-call-me basis
297 , peerState :: TVar PeerState
298 }
299data PeerState
300 = PeerPendingConnect UTCTime
301 | PeerPendingAccept UTCTime
302 | PeerConnected (TChan Stanza)
303-}
304
305peerKey (sock,addr) = do
306 peer <-
307 sIsConnected sock >>= \c ->
308 if c then getPeerName sock -- addr is normally socketName
309 else return addr -- Weird hack: addr is would-be peer name
310 return $ PeerKey (peer `withPort` fromIntegral peerport)
311
312clientKey (sock,addr) = return $ ClientKey addr
313
314monitor sv params = do
315 chan <- return $ serverEvent sv
316 stanzas <- atomically newTChan
317 fix $ \loop -> do
318 action <- atomically $ foldr1 orElse
319 [ readTChan chan >>= \(k,e) -> return $ do
320 case e of
321 Connection pingflag conread conwrite -> do
322 wlog $ tomsg k "Connection"
323 let (xsrc,xsnk) = xmlStream conread conwrite
324 forkConnection k pingflag xsrc xsnk stanzas
325 return ()
326 ConnectFailure addr -> do
327 wlog $ tomsg k "ConnectFailure"
328 EOF -> wlog $ tomsg k "EOF"
329 HalfConnection In -> do
330 wlog $ tomsg k "ReadOnly"
331 control sv (Connect (callBackAddress k) params)
332 HalfConnection Out -> wlog $ tomsg k "WriteOnly"
333 RequiresPing -> wlog $ tomsg k "RequiresPing"
334 _ -> return ()
335 , readTChan stanzas >>= \stanza -> return $ do
336 xs <- readUntilNothing (stanzaChan stanza)
337 wlog ""
338 prettyPrint "STANZA: " xs
339 ]
340 action
341 loop
342 where
343 tomsg k str = printf "%12s %s" str (show k)
344 where
345 _ = str :: String
346
347
348peerport = 5269
349clientport = 5222
350
351main = runResourceT $ do 17main = runResourceT $ do
352 sv <- server 18 (sv,peer_params) <- xmppServer
353 lift $ do 19 liftIO $ do
354 peer_params <- return (connectionDefaults peerKey)
355 { pingInterval = 10000
356 , timeout = 10000
357 , duplex = False }
358 client_params <- return $ connectionDefaults clientKey
359 let testaddr0 = "fd97:ca88:fa7c:b94b:c8b8:fad4:1021:a54d" 20 let testaddr0 = "fd97:ca88:fa7c:b94b:c8b8:fad4:1021:a54d"
360 -- testaddr0 = "fdef:9e0b:b502:52c3:c074:28d3:fcd7:bfb7" 21 -- testaddr0 = "fdef:9e0b:b502:52c3:c074:28d3:fcd7:bfb7"
361 testaddr<- fmap (addrAddress . head) $ 22 testaddr<- fmap (addrAddress . head) $
@@ -364,15 +25,11 @@ main = runResourceT $ do
364 (Just "5269") 25 (Just "5269")
365 putStrLn $ "Connecting to "++show testaddr 26 putStrLn $ "Connecting to "++show testaddr
366 control sv (ConnectWithEndlessRetry testaddr peer_params 10000) 27 control sv (ConnectWithEndlessRetry testaddr peer_params 10000)
367 forkIO $ monitor sv peer_params
368 control sv (Listen peerport peer_params)
369 -- control sv (Listen clientport client_params)
370 28
371 -- atomically $ newEmptyTMVar >>= readTMVar -- Wait for control-c
372 quitVar <- newEmptyTMVarIO 29 quitVar <- newEmptyTMVarIO
373 installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing 30 installHandler sigTERM (CatchOnce (atomically $ putTMVar quitVar True)) Nothing
374 installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing 31 installHandler sigINT (CatchOnce (atomically $ putTMVar quitVar True)) Nothing
375 quitMessage <- atomically $ takeTMVar quitVar 32 quitMessage <- atomically $ takeTMVar quitVar
376 33
377 wlog "goodbye." 34 putStrLn "goodbye."
378 return () 35 return ()