summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Presence/XMPPServer.hs77
1 files changed, 71 insertions, 6 deletions
diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs
index 631f97c3..f29179e5 100644
--- a/Presence/XMPPServer.hs
+++ b/Presence/XMPPServer.hs
@@ -6,6 +6,7 @@ module XMPPServer
6 , XMPPServerParameters(..) 6 , XMPPServerParameters(..)
7 , XMPPServer 7 , XMPPServer
8 , addPeer 8 , addPeer
9 , StanzaWrap(..)
9 , Stanza(..) 10 , Stanza(..)
10 , StanzaType(..) 11 , StanzaType(..)
11 , StanzaOrigin(..) 12 , StanzaOrigin(..)
@@ -43,7 +44,7 @@ import Data.Conduit.Blaze (builderToByteStringFlush)
43import qualified Text.XML.Stream.Render as XML 44import qualified Text.XML.Stream.Render as XML
44import qualified Text.XML.Stream.Parse as XML 45import qualified Text.XML.Stream.Parse as XML
45import Data.XML.Types as XML 46import Data.XML.Types as XML
46import Data.Maybe (catMaybes,fromJust,isJust,isNothing,listToMaybe) 47import Data.Maybe
47import Data.Monoid ( (<>) ) 48import Data.Monoid ( (<>) )
48import Data.Text (Text) 49import Data.Text (Text)
49import qualified Data.Text as Text (pack,unpack) 50import qualified Data.Text as Text (pack,unpack)
@@ -130,17 +131,19 @@ data StanzaType
130 131
131data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) 132data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza)
132 133
133data Stanza = Stanza 134data StanzaWrap a = Stanza
134 { stanzaType :: StanzaType 135 { stanzaType :: StanzaType
135 , stanzaId :: Maybe Text 136 , stanzaId :: Maybe Text
136 , stanzaTo :: Maybe Text 137 , stanzaTo :: Maybe Text
137 , stanzaFrom :: Maybe Text 138 , stanzaFrom :: Maybe Text
138 , stanzaChan :: TChan XML.Event 139 , stanzaChan :: a
139 , stanzaClosers :: TVar (Maybe [XML.Event]) 140 , stanzaClosers :: TVar (Maybe [XML.Event])
140 , stanzaInterrupt :: TMVar () 141 , stanzaInterrupt :: TMVar ()
141 , stanzaOrigin :: StanzaOrigin 142 , stanzaOrigin :: StanzaOrigin
142 } 143 }
143 144
145type Stanza = StanzaWrap (TChan XML.Event)
146
144data XMPPServerParameters = 147data XMPPServerParameters =
145 XMPPServerParameters 148 XMPPServerParameters
146 { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text 149 { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text
@@ -586,6 +589,14 @@ chanContents ch = do
586 return (x:xs)) 589 return (x:xs))
587 x 590 x
588 591
592while :: IO Bool -> IO a -> IO [a]
593while cond body = do
594 b <- cond
595 if b then do x <- body
596 xs <- while cond body
597 return (x:xs)
598 else return []
599
589readUntilNothing :: TChan (Maybe x) -> IO [x] 600readUntilNothing :: TChan (Maybe x) -> IO [x]
590readUntilNothing ch = do 601readUntilNothing ch = do
591 x <- atomically $ readTChan ch 602 x <- atomically $ readTChan ch
@@ -698,6 +709,29 @@ iq_service_unavailable mid host {- mjid -} req =
698 ] 709 ]
699 710
700 711
712wrapStanzaList :: [XML.Event] -> STM [Either (StanzaWrap XML.Event) XML.Event]
713wrapStanzaList xs = do
714 wrap <- do
715 clsrs <- newTVar Nothing
716 donev <- newTMVar ()
717 return $ \ x ->
718 Stanza { stanzaType = Unrecognized
719 , stanzaId = mid
720 , stanzaTo = mto
721 , stanzaFrom = mfrom
722 , stanzaClosers = clsrs
723 , stanzaInterrupt = donev
724 , stanzaOrigin = LocalPeer
725 , stanzaChan = x
726 }
727 return $ map (Left . wrap) (take 1 xs) ++ map Right (drop 1 xs)
728 where
729 m = listToMaybe xs
730 mto = m >>= lookupAttrib "to" . tagAttrs
731 mfrom = m >>= lookupAttrib "from" . tagAttrs
732 mid = m >>= lookupAttrib "id" . tagAttrs
733
734
701{- 735{-
702greet namespace = 736greet namespace =
703 [ EventBeginDocument 737 [ EventBeginDocument
@@ -728,15 +762,36 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
728 PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr) 762 PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr)
729 me <- tellmyname 763 me <- tellmyname
730 rdone <- atomically newEmptyTMVar 764 rdone <- atomically newEmptyTMVar
731 slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement 765 let isStarter (Left _) = True
766 isStarter (Right e) | isEventBeginElement e = True
767 isStarter _ = False
768 isStopper (Left _) = False
769 isStopper (Right e) | isEventEndElement e = True
770 isStopper _ = False
771 slots <- atomically $ Slotted.new isStarter isStopper
732 needsFlush <- atomically $ newTVar False 772 needsFlush <- atomically $ newTVar False
733 let _ = slots :: Slotted.UpdateStream XMPPState XML.Event 773 lastStanza <- atomically $ newTVar Nothing
774 nesting <- atomically $ newTVar 0
775 let _ = slots :: Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event)
734 let greet_src = do 776 let greet_src = do
735 CL.sourceList (greet' namespace me) =$= CL.map Chunk 777 CL.sourceList (greet' namespace me) =$= CL.map Chunk
736 yield Flush 778 yield Flush
737 slot_src = do 779 slot_src = do
738 what <- lift . atomically $ foldr1 orElse 780 what <- lift . atomically $ foldr1 orElse
739 [Slotted.pull slots >>= \x -> do 781 [Slotted.pull slots >>= \x -> do
782 x <- case x of
783 Left wrapped -> do
784 writeTVar nesting 1
785 writeTVar lastStanza (Just wrapped)
786 return $ stanzaChan wrapped
787 Right x -> do
788 when (isEventBeginElement x)
789 $ modifyTVar' nesting (+1)
790 when (isEventEndElement x) $ do
791 n <- readTVar nesting
792 when (n==1) $ writeTVar lastStanza Nothing
793 modifyTVar' nesting (subtract 1)
794 return x
740 writeTVar needsFlush True 795 writeTVar needsFlush True
741 return $ do 796 return $ do
742 -- liftIO $ wlog $ "yielding Chunk: " ++ show x 797 -- liftIO $ wlog $ "yielding Chunk: " ++ show x
@@ -753,6 +808,15 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
753 ] 808 ]
754 what 809 what
755 forkIO $ do (greet_src >> slot_src) $$ snk 810 forkIO $ do (greet_src >> slot_src) $$ snk
811 last <- atomically $ readTVar lastStanza
812 es <- while (atomically . fmap not $ Slotted.isEmpty slots)
813 (atomically . Slotted.pull $ slots)
814 let es' = mapMaybe metadata es
815 metadata (Left s) = Just s
816 metadata _ = Nothing
817 let fail s = wlog $ "failed delivery: " ++ show (stanzaId s)
818 maybe (return ()) fail last
819 mapM_ fail es' -- TODO: queue or save these for re-connect?
756 wlog $ "end post-queue fork: " ++ show k 820 wlog $ "end post-queue fork: " ++ show k
757 output <- atomically newTChan 821 output <- atomically newTChan
758 forkIO $ do 822 forkIO $ do
@@ -776,7 +840,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
776 stanzaToConduit dup $$ prettyPrint typ 840 stanzaToConduit dup $$ prettyPrint typ
777 stanzaToConduit stanza 841 stanzaToConduit stanza
778 $$ awaitForever 842 $$ awaitForever
779 $ liftIO . atomically . Slotted.push slots Nothing 843 $ liftIO . atomically . Slotted.push slots Nothing . Right
780 loop 844 loop
781 ,do pingflag >>= check 845 ,do pingflag >>= check
782 return $ do 846 return $ do
@@ -785,6 +849,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do
785 -- or pass it with Connection event. 849 -- or pass it with Connection event.
786 mid = Just "ping" 850 mid = Just "ping"
787 ping = makePing namespace mid to from 851 ping = makePing namespace mid to from
852 ping <- atomically $ wrapStanzaList ping
788 mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) 853 mapM_ (atomically . Slotted.push slots (Just $ PingSlot))
789 ping 854 ping
790#ifdef PINGNOISE 855#ifdef PINGNOISE