diff options
-rw-r--r-- | Presence/XMPPServer.hs | 77 |
1 files changed, 71 insertions, 6 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 631f97c3..f29179e5 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs | |||
@@ -6,6 +6,7 @@ module XMPPServer | |||
6 | , XMPPServerParameters(..) | 6 | , XMPPServerParameters(..) |
7 | , XMPPServer | 7 | , XMPPServer |
8 | , addPeer | 8 | , addPeer |
9 | , StanzaWrap(..) | ||
9 | , Stanza(..) | 10 | , Stanza(..) |
10 | , StanzaType(..) | 11 | , StanzaType(..) |
11 | , StanzaOrigin(..) | 12 | , StanzaOrigin(..) |
@@ -43,7 +44,7 @@ import Data.Conduit.Blaze (builderToByteStringFlush) | |||
43 | import qualified Text.XML.Stream.Render as XML | 44 | import qualified Text.XML.Stream.Render as XML |
44 | import qualified Text.XML.Stream.Parse as XML | 45 | import qualified Text.XML.Stream.Parse as XML |
45 | import Data.XML.Types as XML | 46 | import Data.XML.Types as XML |
46 | import Data.Maybe (catMaybes,fromJust,isJust,isNothing,listToMaybe) | 47 | import Data.Maybe |
47 | import Data.Monoid ( (<>) ) | 48 | import Data.Monoid ( (<>) ) |
48 | import Data.Text (Text) | 49 | import Data.Text (Text) |
49 | import qualified Data.Text as Text (pack,unpack) | 50 | import qualified Data.Text as Text (pack,unpack) |
@@ -130,17 +131,19 @@ data StanzaType | |||
130 | 131 | ||
131 | data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) | 132 | data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) |
132 | 133 | ||
133 | data Stanza = Stanza | 134 | data StanzaWrap a = Stanza |
134 | { stanzaType :: StanzaType | 135 | { stanzaType :: StanzaType |
135 | , stanzaId :: Maybe Text | 136 | , stanzaId :: Maybe Text |
136 | , stanzaTo :: Maybe Text | 137 | , stanzaTo :: Maybe Text |
137 | , stanzaFrom :: Maybe Text | 138 | , stanzaFrom :: Maybe Text |
138 | , stanzaChan :: TChan XML.Event | 139 | , stanzaChan :: a |
139 | , stanzaClosers :: TVar (Maybe [XML.Event]) | 140 | , stanzaClosers :: TVar (Maybe [XML.Event]) |
140 | , stanzaInterrupt :: TMVar () | 141 | , stanzaInterrupt :: TMVar () |
141 | , stanzaOrigin :: StanzaOrigin | 142 | , stanzaOrigin :: StanzaOrigin |
142 | } | 143 | } |
143 | 144 | ||
145 | type Stanza = StanzaWrap (TChan XML.Event) | ||
146 | |||
144 | data XMPPServerParameters = | 147 | data XMPPServerParameters = |
145 | XMPPServerParameters | 148 | XMPPServerParameters |
146 | { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text | 149 | { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text |
@@ -586,6 +589,14 @@ chanContents ch = do | |||
586 | return (x:xs)) | 589 | return (x:xs)) |
587 | x | 590 | x |
588 | 591 | ||
592 | while :: IO Bool -> IO a -> IO [a] | ||
593 | while cond body = do | ||
594 | b <- cond | ||
595 | if b then do x <- body | ||
596 | xs <- while cond body | ||
597 | return (x:xs) | ||
598 | else return [] | ||
599 | |||
589 | readUntilNothing :: TChan (Maybe x) -> IO [x] | 600 | readUntilNothing :: TChan (Maybe x) -> IO [x] |
590 | readUntilNothing ch = do | 601 | readUntilNothing ch = do |
591 | x <- atomically $ readTChan ch | 602 | x <- atomically $ readTChan ch |
@@ -698,6 +709,29 @@ iq_service_unavailable mid host {- mjid -} req = | |||
698 | ] | 709 | ] |
699 | 710 | ||
700 | 711 | ||
712 | wrapStanzaList :: [XML.Event] -> STM [Either (StanzaWrap XML.Event) XML.Event] | ||
713 | wrapStanzaList xs = do | ||
714 | wrap <- do | ||
715 | clsrs <- newTVar Nothing | ||
716 | donev <- newTMVar () | ||
717 | return $ \ x -> | ||
718 | Stanza { stanzaType = Unrecognized | ||
719 | , stanzaId = mid | ||
720 | , stanzaTo = mto | ||
721 | , stanzaFrom = mfrom | ||
722 | , stanzaClosers = clsrs | ||
723 | , stanzaInterrupt = donev | ||
724 | , stanzaOrigin = LocalPeer | ||
725 | , stanzaChan = x | ||
726 | } | ||
727 | return $ map (Left . wrap) (take 1 xs) ++ map Right (drop 1 xs) | ||
728 | where | ||
729 | m = listToMaybe xs | ||
730 | mto = m >>= lookupAttrib "to" . tagAttrs | ||
731 | mfrom = m >>= lookupAttrib "from" . tagAttrs | ||
732 | mid = m >>= lookupAttrib "id" . tagAttrs | ||
733 | |||
734 | |||
701 | {- | 735 | {- |
702 | greet namespace = | 736 | greet namespace = |
703 | [ EventBeginDocument | 737 | [ EventBeginDocument |
@@ -728,15 +762,36 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
728 | PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr) | 762 | PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr) |
729 | me <- tellmyname | 763 | me <- tellmyname |
730 | rdone <- atomically newEmptyTMVar | 764 | rdone <- atomically newEmptyTMVar |
731 | slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement | 765 | let isStarter (Left _) = True |
766 | isStarter (Right e) | isEventBeginElement e = True | ||
767 | isStarter _ = False | ||
768 | isStopper (Left _) = False | ||
769 | isStopper (Right e) | isEventEndElement e = True | ||
770 | isStopper _ = False | ||
771 | slots <- atomically $ Slotted.new isStarter isStopper | ||
732 | needsFlush <- atomically $ newTVar False | 772 | needsFlush <- atomically $ newTVar False |
733 | let _ = slots :: Slotted.UpdateStream XMPPState XML.Event | 773 | lastStanza <- atomically $ newTVar Nothing |
774 | nesting <- atomically $ newTVar 0 | ||
775 | let _ = slots :: Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event) | ||
734 | let greet_src = do | 776 | let greet_src = do |
735 | CL.sourceList (greet' namespace me) =$= CL.map Chunk | 777 | CL.sourceList (greet' namespace me) =$= CL.map Chunk |
736 | yield Flush | 778 | yield Flush |
737 | slot_src = do | 779 | slot_src = do |
738 | what <- lift . atomically $ foldr1 orElse | 780 | what <- lift . atomically $ foldr1 orElse |
739 | [Slotted.pull slots >>= \x -> do | 781 | [Slotted.pull slots >>= \x -> do |
782 | x <- case x of | ||
783 | Left wrapped -> do | ||
784 | writeTVar nesting 1 | ||
785 | writeTVar lastStanza (Just wrapped) | ||
786 | return $ stanzaChan wrapped | ||
787 | Right x -> do | ||
788 | when (isEventBeginElement x) | ||
789 | $ modifyTVar' nesting (+1) | ||
790 | when (isEventEndElement x) $ do | ||
791 | n <- readTVar nesting | ||
792 | when (n==1) $ writeTVar lastStanza Nothing | ||
793 | modifyTVar' nesting (subtract 1) | ||
794 | return x | ||
740 | writeTVar needsFlush True | 795 | writeTVar needsFlush True |
741 | return $ do | 796 | return $ do |
742 | -- liftIO $ wlog $ "yielding Chunk: " ++ show x | 797 | -- liftIO $ wlog $ "yielding Chunk: " ++ show x |
@@ -753,6 +808,15 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
753 | ] | 808 | ] |
754 | what | 809 | what |
755 | forkIO $ do (greet_src >> slot_src) $$ snk | 810 | forkIO $ do (greet_src >> slot_src) $$ snk |
811 | last <- atomically $ readTVar lastStanza | ||
812 | es <- while (atomically . fmap not $ Slotted.isEmpty slots) | ||
813 | (atomically . Slotted.pull $ slots) | ||
814 | let es' = mapMaybe metadata es | ||
815 | metadata (Left s) = Just s | ||
816 | metadata _ = Nothing | ||
817 | let fail s = wlog $ "failed delivery: " ++ show (stanzaId s) | ||
818 | maybe (return ()) fail last | ||
819 | mapM_ fail es' -- TODO: queue or save these for re-connect? | ||
756 | wlog $ "end post-queue fork: " ++ show k | 820 | wlog $ "end post-queue fork: " ++ show k |
757 | output <- atomically newTChan | 821 | output <- atomically newTChan |
758 | forkIO $ do | 822 | forkIO $ do |
@@ -776,7 +840,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
776 | stanzaToConduit dup $$ prettyPrint typ | 840 | stanzaToConduit dup $$ prettyPrint typ |
777 | stanzaToConduit stanza | 841 | stanzaToConduit stanza |
778 | $$ awaitForever | 842 | $$ awaitForever |
779 | $ liftIO . atomically . Slotted.push slots Nothing | 843 | $ liftIO . atomically . Slotted.push slots Nothing . Right |
780 | loop | 844 | loop |
781 | ,do pingflag >>= check | 845 | ,do pingflag >>= check |
782 | return $ do | 846 | return $ do |
@@ -785,6 +849,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do | |||
785 | -- or pass it with Connection event. | 849 | -- or pass it with Connection event. |
786 | mid = Just "ping" | 850 | mid = Just "ping" |
787 | ping = makePing namespace mid to from | 851 | ping = makePing namespace mid to from |
852 | ping <- atomically $ wrapStanzaList ping | ||
788 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) | 853 | mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) |
789 | ping | 854 | ping |
790 | #ifdef PINGNOISE | 855 | #ifdef PINGNOISE |