From e8e889cf562954c82aa53c940b21782b16d63b97 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 18 Feb 2014 19:06:49 -0500 Subject: log delivery failures --- Presence/XMPPServer.hs | 77 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 71 insertions(+), 6 deletions(-) (limited to 'Presence/XMPPServer.hs') diff --git a/Presence/XMPPServer.hs b/Presence/XMPPServer.hs index 631f97c3..f29179e5 100644 --- a/Presence/XMPPServer.hs +++ b/Presence/XMPPServer.hs @@ -6,6 +6,7 @@ module XMPPServer , XMPPServerParameters(..) , XMPPServer , addPeer + , StanzaWrap(..) , Stanza(..) , StanzaType(..) , StanzaOrigin(..) @@ -43,7 +44,7 @@ import Data.Conduit.Blaze (builderToByteStringFlush) import qualified Text.XML.Stream.Render as XML import qualified Text.XML.Stream.Parse as XML import Data.XML.Types as XML -import Data.Maybe (catMaybes,fromJust,isJust,isNothing,listToMaybe) +import Data.Maybe import Data.Monoid ( (<>) ) import Data.Text (Text) import qualified Data.Text as Text (pack,unpack) @@ -130,17 +131,19 @@ data StanzaType data StanzaOrigin = LocalPeer | NetworkOrigin ConnectionKey (TChan Stanza) -data Stanza = Stanza +data StanzaWrap a = Stanza { stanzaType :: StanzaType , stanzaId :: Maybe Text , stanzaTo :: Maybe Text , stanzaFrom :: Maybe Text - , stanzaChan :: TChan XML.Event + , stanzaChan :: a , stanzaClosers :: TVar (Maybe [XML.Event]) , stanzaInterrupt :: TMVar () , stanzaOrigin :: StanzaOrigin } +type Stanza = StanzaWrap (TChan XML.Event) + data XMPPServerParameters = XMPPServerParameters { xmppChooseResourceName :: ConnectionKey -> SockAddr -> Maybe Text -> IO Text @@ -586,6 +589,14 @@ chanContents ch = do return (x:xs)) x +while :: IO Bool -> IO a -> IO [a] +while cond body = do + b <- cond + if b then do x <- body + xs <- while cond body + return (x:xs) + else return [] + readUntilNothing :: TChan (Maybe x) -> IO [x] readUntilNothing ch = do x <- atomically $ readTChan ch @@ -698,6 +709,29 @@ iq_service_unavailable mid host {- mjid -} req = ] +wrapStanzaList :: [XML.Event] -> STM [Either (StanzaWrap XML.Event) XML.Event] +wrapStanzaList xs = do + wrap <- do + clsrs <- newTVar Nothing + donev <- newTMVar () + return $ \ x -> + Stanza { stanzaType = Unrecognized + , stanzaId = mid + , stanzaTo = mto + , stanzaFrom = mfrom + , stanzaClosers = clsrs + , stanzaInterrupt = donev + , stanzaOrigin = LocalPeer + , stanzaChan = x + } + return $ map (Left . wrap) (take 1 xs) ++ map Right (drop 1 xs) + where + m = listToMaybe xs + mto = m >>= lookupAttrib "to" . tagAttrs + mfrom = m >>= lookupAttrib "from" . tagAttrs + mid = m >>= lookupAttrib "id" . tagAttrs + + {- greet namespace = [ EventBeginDocument @@ -728,15 +762,36 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do PeerKey {} -> ("jabber:server",xmppTellMyNameToPeer xmpp laddr) me <- tellmyname rdone <- atomically newEmptyTMVar - slots <- atomically $ Slotted.new isEventBeginElement isEventEndElement + let isStarter (Left _) = True + isStarter (Right e) | isEventBeginElement e = True + isStarter _ = False + isStopper (Left _) = False + isStopper (Right e) | isEventEndElement e = True + isStopper _ = False + slots <- atomically $ Slotted.new isStarter isStopper needsFlush <- atomically $ newTVar False - let _ = slots :: Slotted.UpdateStream XMPPState XML.Event + lastStanza <- atomically $ newTVar Nothing + nesting <- atomically $ newTVar 0 + let _ = slots :: Slotted.UpdateStream XMPPState (Either (StanzaWrap XML.Event) XML.Event) let greet_src = do CL.sourceList (greet' namespace me) =$= CL.map Chunk yield Flush slot_src = do what <- lift . atomically $ foldr1 orElse [Slotted.pull slots >>= \x -> do + x <- case x of + Left wrapped -> do + writeTVar nesting 1 + writeTVar lastStanza (Just wrapped) + return $ stanzaChan wrapped + Right x -> do + when (isEventBeginElement x) + $ modifyTVar' nesting (+1) + when (isEventEndElement x) $ do + n <- readTVar nesting + when (n==1) $ writeTVar lastStanza Nothing + modifyTVar' nesting (subtract 1) + return x writeTVar needsFlush True return $ do -- liftIO $ wlog $ "yielding Chunk: " ++ show x @@ -753,6 +808,15 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do ] what forkIO $ do (greet_src >> slot_src) $$ snk + last <- atomically $ readTVar lastStanza + es <- while (atomically . fmap not $ Slotted.isEmpty slots) + (atomically . Slotted.pull $ slots) + let es' = mapMaybe metadata es + metadata (Left s) = Just s + metadata _ = Nothing + let fail s = wlog $ "failed delivery: " ++ show (stanzaId s) + maybe (return ()) fail last + mapM_ fail es' -- TODO: queue or save these for re-connect? wlog $ "end post-queue fork: " ++ show k output <- atomically newTChan forkIO $ do @@ -776,7 +840,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do stanzaToConduit dup $$ prettyPrint typ stanzaToConduit stanza $$ awaitForever - $ liftIO . atomically . Slotted.push slots Nothing + $ liftIO . atomically . Slotted.push slots Nothing . Right loop ,do pingflag >>= check return $ do @@ -785,6 +849,7 @@ forkConnection sv xmpp k laddr pingflag src snk stanzas = do -- or pass it with Connection event. mid = Just "ping" ping = makePing namespace mid to from + ping <- atomically $ wrapStanzaList ping mapM_ (atomically . Slotted.push slots (Just $ PingSlot)) ping #ifdef PINGNOISE -- cgit v1.2.3