summaryrefslogtreecommitdiff
path: root/Presence/XMPPServer.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2013-06-21 01:34:36 -0400
committerjoe <joe@jerkface.net>2013-06-21 01:34:36 -0400
commitca280b85f19c16e0ae890a79b9b2e7db4a98e2c6 (patch)
tree3de465c50a4c43f1a6a02f5804e1d6417828ccca /Presence/XMPPServer.hs
parentfa6a523704984bd98762a4e639b739e73320068f (diff)
Parse/Lex fixes. Presence now received by remote peer.
Diffstat (limited to 'Presence/XMPPServer.hs')
-rw-r--r--Presence/XMPPServer.hs177
1 files changed, 149 insertions, 28 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 062fcacb..76c14f52 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -10,12 +10,19 @@ import Todo
10import Data.HList.TypeEqGeneric1() 10import Data.HList.TypeEqGeneric1()
11import Data.HList.TypeCastGeneric1() 11import Data.HList.TypeCastGeneric1()
12import ByteStringOperators 12import ByteStringOperators
13import System.IO
14 ( IOMode(..)
15 , BufferMode(..)
16 , hSetBuffering
17 )
13 18
14import Server 19import Server
15import Data.ByteString.Lazy.Char8 as L 20import Data.ByteString.Lazy.Char8 as L
16 ( hPutStrLn 21 ( hPutStrLn
17 , unlines 22 , unlines
23 , lines
18 , splitWith 24 , splitWith
25 , drop
19 , ByteString 26 , ByteString
20 , pack 27 , pack
21 , unpack ) 28 , unpack )
@@ -26,11 +33,11 @@ import System.IO
26 ) 33 )
27import Data.HList 34import Data.HList
28import AdaptServer 35import AdaptServer
29import Text.XML.HaXml.Lex (xmlLex) 36import Text.XML.HaXml.Lex (xmlLex,TokenT(..))
30import Text.XML.HaXml.Parse (XParser,xmlParseWith,element,{-doctypedecl,-}processinginstruction,elemOpenTag,elemCloseTag) 37import Text.XML.HaXml.Parse (XParser,xmlParseWith,element,{-doctypedecl,-}processinginstruction,elemOpenTag,elemCloseTag)
31import Text.XML.HaXml.Types as Hax hiding (Element) -- (ElemTag,QName(..),Namespace(..),Element(..),Content(..),AttValue(..)) 38import Text.XML.HaXml.Types as Hax hiding (Element) -- (ElemTag,QName(..),Namespace(..),Element(..),Content(..),AttValue(..))
32import qualified Text.XML.HaXml.Types as Hax (Element(..)) 39import qualified Text.XML.HaXml.Types as Hax (Element(..))
33import Text.XML.HaXml.Posn (Posn) 40import Text.XML.HaXml.Posn (Posn, posnLine, posnColumn)
34import Text.XML.HaXml.Lex (TokenT) 41import Text.XML.HaXml.Lex (TokenT)
35import qualified Text.XML.HaXml.Pretty as PP 42import qualified Text.XML.HaXml.Pretty as PP
36import Text.PrettyPrint 43import Text.PrettyPrint
@@ -45,12 +52,14 @@ import Control.Monad.IO.Class
45import Control.DeepSeq 52import Control.DeepSeq
46import Control.Concurrent.STM 53import Control.Concurrent.STM
47import Control.Concurrent 54import Control.Concurrent
48import Control.Exception 55import Control.Exception as Exception
49import Text.Show.ByteString as L 56import Text.Show.ByteString as L
50import Data.Binary.Builder as B 57import Data.Binary.Builder as B
51import Data.Binary.Put 58import Data.Binary.Put
52import qualified Data.Map as Map 59import qualified Data.Map as Map
53import GHC.Conc 60import GHC.Conc
61import Network.BSD
62import Control.Concurrent.Async
54 63
55-- | Jabber ID (JID) datatype 64-- | Jabber ID (JID) datatype
56data JID = JID { name :: Maybe ByteString 65data JID = JID { name :: Maybe ByteString
@@ -332,7 +341,8 @@ xmppParse ls = runTryParse $ do
332listenForXmppClients session_factory port st = do 341listenForXmppClients session_factory port st = do
333 -- standard port: 5222 342 -- standard port: 5222
334 let (start,dopkt) = 343 let (start,dopkt) =
335 adaptServer ( xmlLex "local-client" . unpack 344 adaptServer ( dropTill
345 , xmlLexPartial "local-client" . unpack
336 , xmppParse) 346 , xmppParse)
337 (startCon session_factory,doCon) 347 (startCon session_factory,doCon)
338 doServer (port .*. st) 348 doServer (port .*. st)
@@ -343,39 +353,49 @@ listenForXmppClients session_factory port st = do
343startPeer session_factory sock st = do 353startPeer session_factory sock st = do
344 let h = hOccursFst st :: Handle 354 let h = hOccursFst st :: Handle
345 name <- fmap bshow $ getPeerName sock 355 name <- fmap bshow $ getPeerName sock
346 L.putStrLn $ "REMOTE: connected " <++> name 356 L.putStrLn $ "REMOTE-IN: connected " <++> name
347 let quit = L.putStrLn $ "REMOTE: disconnected " <++> name 357 let quit = L.putStrLn $ "REMOTE-IN: disconnected " <++> name
348 return ( ConnectionFinalizer quit .*. st ) 358 return ( ConnectionFinalizer quit .*. st )
349 359
350doPeer st elem cont = do 360doPeer st elem cont = do
361 L.putStrLn $ "REMOTE-IN: received " <++> bshow elem
351 cont () 362 cont ()
352 363
364xmlLexPartial name cs =
365 let ls = xmlLex name cs
366 isTokError (_,TokError _) = True
367 isTokError _ = False
368 (gs,bs) = break isTokError ls
369 in if any (not . isTokError) bs
370 then ls
371 else gs
372
373
353listenForRemotePeers session_factory port st = do 374listenForRemotePeers session_factory port st = do
354 -- standard port: 5269 375 -- standard port: 5269
355 let (start,dopkt) = 376 let (start,dopkt) =
356 adaptServer ( xmlLex "remote-peer" . unpack 377 adaptServer ( dropTill
378 , xmlLexPartial "remote-inbound" . unpack
357 , xmppParse) 379 , xmppParse)
358 (startPeer session_factory,doPeer) 380 (startPeer session_factory,doPeer)
359 doServer (port .*. st) 381 doServer (port .*. st)
360 dopkt 382 dopkt
361 start 383 start
362 384
363newServerConnections = atomically $ newTVar Map.empty 385dropTill bs ((fst->posn):_) =
364{- 386 let ls = zip [1..] (L.lines bs)
365sendMessage cons msg peer = do 387 ln = posnLine posn
366 (is_new,entry) <- atomically $ do 388 col = posnColumn posn
367 consmap <- readTVar cons 389 ls' = map snd $ dropWhile ((<ln).fst) ls
368 let found = Map.lookup peer consmap 390 in case ls' of
369 newEntry = () 391 [] -> ""
370 entry = maybe newEntry id found 392 fstLine:ls'' -> foldr1 (<++>) (L.drop (fromIntegral (col-1)) fstLine : ls'')
371 is_new = isNothing found
372 when is_new
373 $ writeTVar cons (Map.insert peer entry consmap)
374 return (is_new,entry)
375 L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg
376 when is_new $ connect_to_server entry peer
377 393
378-} 394
395data OutBoundMessage = OutBoundPresence Presence
396 deriving Prelude.Show
397
398newServerConnections = atomically $ newTVar Map.empty
379 399
380sendMessage cons msg peer = do 400sendMessage cons msg peer = do
381 found <- atomically $ do 401 found <- atomically $ do
@@ -384,18 +404,87 @@ sendMessage cons msg peer = do
384 let newEntry = do 404 let newEntry = do
385 chan <- atomically newTChan 405 chan <- atomically newTChan
386 t <- forkIO $ connect_to_server chan peer 406 t <- forkIO $ connect_to_server chan peer
387 return (chan,t) 407 L.putStrLn $ "remote-map new: " <++> peer
388 entry <- maybe newEntry 408 return (True,(chan,t))
409 (is_new,entry) <- maybe newEntry
389 ( \(chan,t) -> do 410 ( \(chan,t) -> do
390 st <- threadStatus t 411 st <- threadStatus t
412 let running = do
413 L.putStrLn $ "remote-map, thread running: " <++> peer
414 return (False,(chan,t))
415 died = do
416 L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> peer
417 newEntry
391 case st of 418 case st of
392 ThreadRunning -> return (chan,t) 419 ThreadRunning -> running
393 _ -> newEntry 420 ThreadBlocked _ -> running
421 ThreadDied -> died
422 ThreadFinished -> died
394 ) 423 )
395 found 424 found
396 L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg 425 L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg
426 atomically $ writeTChan (fst entry) msg
427 when is_new . atomically $
428 readTVar cons >>= writeTVar cons . Map.insert peer entry
429
430connect_to_server chan peer = (>> return ()) . runMaybeT $ do
431 let port = "5269"
432
433 connected <- liftIO . async $ connect' (unpack peer) port
434
435 sock <- MaybeT . fix $ \loop -> do
436 e <- atomically $ orElse
437 (fmap Right $ waitSTM connected)
438 (fmap Left $ readTChan chan)
439 case e of
440 Left event -> do
441 L.putStrLn $ "REMOTE-OUT NOT READY: " <++> bshow event
442 loop
443 Right sock -> return sock
444
445 liftIO $ do
446 h <- socketToHandle sock ReadWriteMode
447 hSetBuffering h NoBuffering
448 hPutStrLn h "<stream>"
449 L.putStrLn $ "REMOTE-OUT: <stream>"
450 fix $ \loop -> do
451 event <- atomically $ readTChan chan
452 case event of
453 OutBoundPresence p -> do
454 let r = xmlifyPresence p
455 hPutStrLn h r
456 L.putStrLn $ "REMOTE-OUT:\n" <++> r <++> "\n"
457 loop
458 hPutStrLn h "</stream>"
459 L.putStrLn $ "REMOTE-OUT: </stream>"
397 460
398connect_to_server chan peer = return () 461
462 {-
463 pending <- newTVarIO True
464 thread <- forkIO $
465 runMaybeT $ do
466 let port = "5269"
467 sock <- MaybeT $ connect' (unpack peer) port
468 liftIO $ do
469 h <- socketToHandle sock ReadWriteMode
470 hSetBuffering h NoBuffering
471 hPutStrLn h "<stream>"
472 atomically $ writeTVar pending False
473 fix $ \loop -> do
474 event <- atomically $ readTChan chan
475 case event of
476 OutBoundPresence p -> do
477 let r = xmlifyPresence p
478 hPutStrLn h r
479 L.putStrLn $ "REMOTE:\n" <++> r <++> "\n"
480 loop
481 hPutStrLn h "</stream>"
482 fix $ \loop -> do
483 event <- atomically $ readTChan chan
484 when (readTVarIO pending) loop
485 joinThread thread
486 return ()
487 -}
399 488
400parseJID :: ByteString -> JID 489parseJID :: ByteString -> JID
401parseJID bjid = 490parseJID bjid =
@@ -410,6 +499,38 @@ parseJID bjid =
410 _ -> Nothing 499 _ -> Nothing
411 in JID name server rsrc 500 in JID name server rsrc
412 501
502connect' :: HostName -> ServiceName -> IO (Maybe Socket)
503connect' host serv = do
504 proto <- getProtocolNumber "tcp"
505 let hints = defaultHints { addrFlags = [AI_ADDRCONFIG]
506 , addrProtocol = proto
507 , addrSocketType = Stream }
508 addrs <- getAddrInfo (Just hints) (Just host) (Just serv)
509 firstSuccessful $ map tryToConnect addrs
510 where
511 tryToConnect addr =
512 bracketOnError
513 (socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr))
514 (sClose ) -- only done if there's an error
515 (\sock -> do
516 connect sock (addrAddress addr)
517 return (Just sock) -- socketToHandle sock ReadWriteMode
518 )
519
520catchIO :: IO a -> (IOException -> IO a) -> IO a
521catchIO a h = Exception.catch a h
522
523-- Returns the first action from a list which does not throw an exception.
524-- If all the actions throw exceptions (and the list of actions is not empty),
525-- the last exception is thrown.
526firstSuccessful :: [IO a] -> IO a
527firstSuccessful [] = error "firstSuccessful: empty list"
528firstSuccessful (p:ps) = catchIO p $ \e ->
529 case ps of
530 [] -> Exception.throwIO e
531 _ -> firstSuccessful ps
532
533
413seekRemotePeers :: XMPPConfig config => 534seekRemotePeers :: XMPPConfig config =>
414 (ByteString -> Bool) -> config -> TChan Presence -> IO b0 535 (ByteString -> Bool) -> config -> TChan Presence -> IO b0
415seekRemotePeers is_peer config chan = do 536seekRemotePeers is_peer config chan = do
@@ -427,5 +548,5 @@ seekRemotePeers is_peer config chan = do
427 let jid = parseJID bjid 548 let jid = parseJID bjid
428 peer = server jid 549 peer = server jid
429 when (is_peer peer) $ 550 when (is_peer peer) $
430 liftIO $ sendMessage server_connections p peer 551 liftIO $ sendMessage server_connections (OutBoundPresence p) peer
431 loop 552 loop