diff options
author | joe <joe@jerkface.net> | 2013-06-21 01:34:36 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2013-06-21 01:34:36 -0400 |
commit | ca280b85f19c16e0ae890a79b9b2e7db4a98e2c6 (patch) | |
tree | 3de465c50a4c43f1a6a02f5804e1d6417828ccca /Presence/XMPPServer.hs | |
parent | fa6a523704984bd98762a4e639b739e73320068f (diff) |
Parse/Lex fixes. Presence now received by remote peer.
Diffstat (limited to 'Presence/XMPPServer.hs')
-rw-r--r-- | Presence/XMPPServer.hs | 177 |
1 files changed, 149 insertions, 28 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 062fcacb..76c14f52 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -10,12 +10,19 @@ import Todo | |||
10 | import Data.HList.TypeEqGeneric1() | 10 | import Data.HList.TypeEqGeneric1() |
11 | import Data.HList.TypeCastGeneric1() | 11 | import Data.HList.TypeCastGeneric1() |
12 | import ByteStringOperators | 12 | import ByteStringOperators |
13 | import System.IO | ||
14 | ( IOMode(..) | ||
15 | , BufferMode(..) | ||
16 | , hSetBuffering | ||
17 | ) | ||
13 | 18 | ||
14 | import Server | 19 | import Server |
15 | import Data.ByteString.Lazy.Char8 as L | 20 | import Data.ByteString.Lazy.Char8 as L |
16 | ( hPutStrLn | 21 | ( hPutStrLn |
17 | , unlines | 22 | , unlines |
23 | , lines | ||
18 | , splitWith | 24 | , splitWith |
25 | , drop | ||
19 | , ByteString | 26 | , ByteString |
20 | , pack | 27 | , pack |
21 | , unpack ) | 28 | , unpack ) |
@@ -26,11 +33,11 @@ import System.IO | |||
26 | ) | 33 | ) |
27 | import Data.HList | 34 | import Data.HList |
28 | import AdaptServer | 35 | import AdaptServer |
29 | import Text.XML.HaXml.Lex (xmlLex) | 36 | import Text.XML.HaXml.Lex (xmlLex,TokenT(..)) |
30 | import Text.XML.HaXml.Parse (XParser,xmlParseWith,element,{-doctypedecl,-}processinginstruction,elemOpenTag,elemCloseTag) | 37 | import Text.XML.HaXml.Parse (XParser,xmlParseWith,element,{-doctypedecl,-}processinginstruction,elemOpenTag,elemCloseTag) |
31 | import Text.XML.HaXml.Types as Hax hiding (Element) -- (ElemTag,QName(..),Namespace(..),Element(..),Content(..),AttValue(..)) | 38 | import Text.XML.HaXml.Types as Hax hiding (Element) -- (ElemTag,QName(..),Namespace(..),Element(..),Content(..),AttValue(..)) |
32 | import qualified Text.XML.HaXml.Types as Hax (Element(..)) | 39 | import qualified Text.XML.HaXml.Types as Hax (Element(..)) |
33 | import Text.XML.HaXml.Posn (Posn) | 40 | import Text.XML.HaXml.Posn (Posn, posnLine, posnColumn) |
34 | import Text.XML.HaXml.Lex (TokenT) | 41 | import Text.XML.HaXml.Lex (TokenT) |
35 | import qualified Text.XML.HaXml.Pretty as PP | 42 | import qualified Text.XML.HaXml.Pretty as PP |
36 | import Text.PrettyPrint | 43 | import Text.PrettyPrint |
@@ -45,12 +52,14 @@ import Control.Monad.IO.Class | |||
45 | import Control.DeepSeq | 52 | import Control.DeepSeq |
46 | import Control.Concurrent.STM | 53 | import Control.Concurrent.STM |
47 | import Control.Concurrent | 54 | import Control.Concurrent |
48 | import Control.Exception | 55 | import Control.Exception as Exception |
49 | import Text.Show.ByteString as L | 56 | import Text.Show.ByteString as L |
50 | import Data.Binary.Builder as B | 57 | import Data.Binary.Builder as B |
51 | import Data.Binary.Put | 58 | import Data.Binary.Put |
52 | import qualified Data.Map as Map | 59 | import qualified Data.Map as Map |
53 | import GHC.Conc | 60 | import GHC.Conc |
61 | import Network.BSD | ||
62 | import Control.Concurrent.Async | ||
54 | 63 | ||
55 | -- | Jabber ID (JID) datatype | 64 | -- | Jabber ID (JID) datatype |
56 | data JID = JID { name :: Maybe ByteString | 65 | data JID = JID { name :: Maybe ByteString |
@@ -332,7 +341,8 @@ xmppParse ls = runTryParse $ do | |||
332 | listenForXmppClients session_factory port st = do | 341 | listenForXmppClients session_factory port st = do |
333 | -- standard port: 5222 | 342 | -- standard port: 5222 |
334 | let (start,dopkt) = | 343 | let (start,dopkt) = |
335 | adaptServer ( xmlLex "local-client" . unpack | 344 | adaptServer ( dropTill |
345 | , xmlLexPartial "local-client" . unpack | ||
336 | , xmppParse) | 346 | , xmppParse) |
337 | (startCon session_factory,doCon) | 347 | (startCon session_factory,doCon) |
338 | doServer (port .*. st) | 348 | doServer (port .*. st) |
@@ -343,39 +353,49 @@ listenForXmppClients session_factory port st = do | |||
343 | startPeer session_factory sock st = do | 353 | startPeer session_factory sock st = do |
344 | let h = hOccursFst st :: Handle | 354 | let h = hOccursFst st :: Handle |
345 | name <- fmap bshow $ getPeerName sock | 355 | name <- fmap bshow $ getPeerName sock |
346 | L.putStrLn $ "REMOTE: connected " <++> name | 356 | L.putStrLn $ "REMOTE-IN: connected " <++> name |
347 | let quit = L.putStrLn $ "REMOTE: disconnected " <++> name | 357 | let quit = L.putStrLn $ "REMOTE-IN: disconnected " <++> name |
348 | return ( ConnectionFinalizer quit .*. st ) | 358 | return ( ConnectionFinalizer quit .*. st ) |
349 | 359 | ||
350 | doPeer st elem cont = do | 360 | doPeer st elem cont = do |
361 | L.putStrLn $ "REMOTE-IN: received " <++> bshow elem | ||
351 | cont () | 362 | cont () |
352 | 363 | ||
364 | xmlLexPartial name cs = | ||
365 | let ls = xmlLex name cs | ||
366 | isTokError (_,TokError _) = True | ||
367 | isTokError _ = False | ||
368 | (gs,bs) = break isTokError ls | ||
369 | in if any (not . isTokError) bs | ||
370 | then ls | ||
371 | else gs | ||
372 | |||
373 | |||
353 | listenForRemotePeers session_factory port st = do | 374 | listenForRemotePeers session_factory port st = do |
354 | -- standard port: 5269 | 375 | -- standard port: 5269 |
355 | let (start,dopkt) = | 376 | let (start,dopkt) = |
356 | adaptServer ( xmlLex "remote-peer" . unpack | 377 | adaptServer ( dropTill |
378 | , xmlLexPartial "remote-inbound" . unpack | ||
357 | , xmppParse) | 379 | , xmppParse) |
358 | (startPeer session_factory,doPeer) | 380 | (startPeer session_factory,doPeer) |
359 | doServer (port .*. st) | 381 | doServer (port .*. st) |
360 | dopkt | 382 | dopkt |
361 | start | 383 | start |
362 | 384 | ||
363 | newServerConnections = atomically $ newTVar Map.empty | 385 | dropTill bs ((fst->posn):_) = |
364 | {- | 386 | let ls = zip [1..] (L.lines bs) |
365 | sendMessage cons msg peer = do | 387 | ln = posnLine posn |
366 | (is_new,entry) <- atomically $ do | 388 | col = posnColumn posn |
367 | consmap <- readTVar cons | 389 | ls' = map snd $ dropWhile ((<ln).fst) ls |
368 | let found = Map.lookup peer consmap | 390 | in case ls' of |
369 | newEntry = () | 391 | [] -> "" |
370 | entry = maybe newEntry id found | 392 | fstLine:ls'' -> foldr1 (<++>) (L.drop (fromIntegral (col-1)) fstLine : ls'') |
371 | is_new = isNothing found | ||
372 | when is_new | ||
373 | $ writeTVar cons (Map.insert peer entry consmap) | ||
374 | return (is_new,entry) | ||
375 | L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg | ||
376 | when is_new $ connect_to_server entry peer | ||
377 | 393 | ||
378 | -} | 394 | |
395 | data OutBoundMessage = OutBoundPresence Presence | ||
396 | deriving Prelude.Show | ||
397 | |||
398 | newServerConnections = atomically $ newTVar Map.empty | ||
379 | 399 | ||
380 | sendMessage cons msg peer = do | 400 | sendMessage cons msg peer = do |
381 | found <- atomically $ do | 401 | found <- atomically $ do |
@@ -384,18 +404,87 @@ sendMessage cons msg peer = do | |||
384 | let newEntry = do | 404 | let newEntry = do |
385 | chan <- atomically newTChan | 405 | chan <- atomically newTChan |
386 | t <- forkIO $ connect_to_server chan peer | 406 | t <- forkIO $ connect_to_server chan peer |
387 | return (chan,t) | 407 | L.putStrLn $ "remote-map new: " <++> peer |
388 | entry <- maybe newEntry | 408 | return (True,(chan,t)) |
409 | (is_new,entry) <- maybe newEntry | ||
389 | ( \(chan,t) -> do | 410 | ( \(chan,t) -> do |
390 | st <- threadStatus t | 411 | st <- threadStatus t |
412 | let running = do | ||
413 | L.putStrLn $ "remote-map, thread running: " <++> peer | ||
414 | return (False,(chan,t)) | ||
415 | died = do | ||
416 | L.putStrLn $ "remote-map, thread died("<++>bshow st<++>"): " <++> peer | ||
417 | newEntry | ||
391 | case st of | 418 | case st of |
392 | ThreadRunning -> return (chan,t) | 419 | ThreadRunning -> running |
393 | _ -> newEntry | 420 | ThreadBlocked _ -> running |
421 | ThreadDied -> died | ||
422 | ThreadFinished -> died | ||
394 | ) | 423 | ) |
395 | found | 424 | found |
396 | L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg | 425 | L.putStrLn $ "sendMessage ->"<++>peer<++>": "<++>bshow msg |
426 | atomically $ writeTChan (fst entry) msg | ||
427 | when is_new . atomically $ | ||
428 | readTVar cons >>= writeTVar cons . Map.insert peer entry | ||
429 | |||
430 | connect_to_server chan peer = (>> return ()) . runMaybeT $ do | ||
431 | let port = "5269" | ||
432 | |||
433 | connected <- liftIO . async $ connect' (unpack peer) port | ||
434 | |||
435 | sock <- MaybeT . fix $ \loop -> do | ||
436 | e <- atomically $ orElse | ||
437 | (fmap Right $ waitSTM connected) | ||
438 | (fmap Left $ readTChan chan) | ||
439 | case e of | ||
440 | Left event -> do | ||
441 | L.putStrLn $ "REMOTE-OUT NOT READY: " <++> bshow event | ||
442 | loop | ||
443 | Right sock -> return sock | ||
444 | |||
445 | liftIO $ do | ||
446 | h <- socketToHandle sock ReadWriteMode | ||
447 | hSetBuffering h NoBuffering | ||
448 | hPutStrLn h "<stream>" | ||
449 | L.putStrLn $ "REMOTE-OUT: <stream>" | ||
450 | fix $ \loop -> do | ||
451 | event <- atomically $ readTChan chan | ||
452 | case event of | ||
453 | OutBoundPresence p -> do | ||
454 | let r = xmlifyPresence p | ||
455 | hPutStrLn h r | ||
456 | L.putStrLn $ "REMOTE-OUT:\n" <++> r <++> "\n" | ||
457 | loop | ||
458 | hPutStrLn h "</stream>" | ||
459 | L.putStrLn $ "REMOTE-OUT: </stream>" | ||
397 | 460 | ||
398 | connect_to_server chan peer = return () | 461 | |
462 | {- | ||
463 | pending <- newTVarIO True | ||
464 | thread <- forkIO $ | ||
465 | runMaybeT $ do | ||
466 | let port = "5269" | ||
467 | sock <- MaybeT $ connect' (unpack peer) port | ||
468 | liftIO $ do | ||
469 | h <- socketToHandle sock ReadWriteMode | ||
470 | hSetBuffering h NoBuffering | ||
471 | hPutStrLn h "<stream>" | ||
472 | atomically $ writeTVar pending False | ||
473 | fix $ \loop -> do | ||
474 | event <- atomically $ readTChan chan | ||
475 | case event of | ||
476 | OutBoundPresence p -> do | ||
477 | let r = xmlifyPresence p | ||
478 | hPutStrLn h r | ||
479 | L.putStrLn $ "REMOTE:\n" <++> r <++> "\n" | ||
480 | loop | ||
481 | hPutStrLn h "</stream>" | ||
482 | fix $ \loop -> do | ||
483 | event <- atomically $ readTChan chan | ||
484 | when (readTVarIO pending) loop | ||
485 | joinThread thread | ||
486 | return () | ||
487 | -} | ||
399 | 488 | ||
400 | parseJID :: ByteString -> JID | 489 | parseJID :: ByteString -> JID |
401 | parseJID bjid = | 490 | parseJID bjid = |
@@ -410,6 +499,38 @@ parseJID bjid = | |||
410 | _ -> Nothing | 499 | _ -> Nothing |
411 | in JID name server rsrc | 500 | in JID name server rsrc |
412 | 501 | ||
502 | connect' :: HostName -> ServiceName -> IO (Maybe Socket) | ||
503 | connect' host serv = do | ||
504 | proto <- getProtocolNumber "tcp" | ||
505 | let hints = defaultHints { addrFlags = [AI_ADDRCONFIG] | ||
506 | , addrProtocol = proto | ||
507 | , addrSocketType = Stream } | ||
508 | addrs <- getAddrInfo (Just hints) (Just host) (Just serv) | ||
509 | firstSuccessful $ map tryToConnect addrs | ||
510 | where | ||
511 | tryToConnect addr = | ||
512 | bracketOnError | ||
513 | (socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)) | ||
514 | (sClose ) -- only done if there's an error | ||
515 | (\sock -> do | ||
516 | connect sock (addrAddress addr) | ||
517 | return (Just sock) -- socketToHandle sock ReadWriteMode | ||
518 | ) | ||
519 | |||
520 | catchIO :: IO a -> (IOException -> IO a) -> IO a | ||
521 | catchIO a h = Exception.catch a h | ||
522 | |||
523 | -- Returns the first action from a list which does not throw an exception. | ||
524 | -- If all the actions throw exceptions (and the list of actions is not empty), | ||
525 | -- the last exception is thrown. | ||
526 | firstSuccessful :: [IO a] -> IO a | ||
527 | firstSuccessful [] = error "firstSuccessful: empty list" | ||
528 | firstSuccessful (p:ps) = catchIO p $ \e -> | ||
529 | case ps of | ||
530 | [] -> Exception.throwIO e | ||
531 | _ -> firstSuccessful ps | ||
532 | |||
533 | |||
413 | seekRemotePeers :: XMPPConfig config => | 534 | seekRemotePeers :: XMPPConfig config => |
414 | (ByteString -> Bool) -> config -> TChan Presence -> IO b0 | 535 | (ByteString -> Bool) -> config -> TChan Presence -> IO b0 |
415 | seekRemotePeers is_peer config chan = do | 536 | seekRemotePeers is_peer config chan = do |
@@ -427,5 +548,5 @@ seekRemotePeers is_peer config chan = do | |||
427 | let jid = parseJID bjid | 548 | let jid = parseJID bjid |
428 | peer = server jid | 549 | peer = server jid |
429 | when (is_peer peer) $ | 550 | when (is_peer peer) $ |
430 | liftIO $ sendMessage server_connections p peer | 551 | liftIO $ sendMessage server_connections (OutBoundPresence p) peer |
431 | loop | 552 | loop |