From edbc10169d7cf363928bdcae2385b64cfdf54675 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sat, 18 Aug 2018 16:46:30 -0400 Subject: Split grokPacket into inbound/outbound versions. --- src/Data/PacketBuffer.hs | 29 +++++++++++++++++++---------- src/Network/Tox/Crypto/Handlers.hs | 8 +++++--- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs index c5ede50d..f01abb28 100644 --- a/src/Data/PacketBuffer.hs +++ b/src/Data/PacketBuffer.hs @@ -1,9 +1,12 @@ {-# LANGUAGE TupleSections #-} +{-# LANGUAGE DeriveFunctor #-} module Data.PacketBuffer ( PacketBuffer , newPacketBuffer - , PacketEvent(..) - , grokPacket + , PacketOutboundEvent(..) + , PacketInboundEvent(..) + , grokOutboundPacket + , grokInboundPacket , awaitReadyPacket , packetNumbersToRequest , expectingSequenceNumber @@ -32,17 +35,21 @@ newPacketBuffer = PacketBuffer <$> Q.new 200 0 <*> Q.new 400 0 -- | Input for 'grokPacket'. -data PacketEvent a b - = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload. - , peSentPayload :: b -- ^ Payload packet we sent to them. +data PacketOutboundEvent b + = PacketSent { poSeqNum :: Word32 -- ^ Sequence number for payload. + , poSentPayload :: b -- ^ Payload packet we sent to them. } - | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. + deriving Functor + +data PacketInboundEvent a + = PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. , peReceivedPayload :: a -- ^ Payload packet they sent to us. , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. } | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. } + deriving Functor -- | Whenever a packet is received or sent (but not resent) from the network, -- this function should be called to update the relevant buffers. @@ -50,14 +57,16 @@ data PacketEvent a b -- On outgoing packets, if the outbound buffer is full, this will block -- indefinitely until it is called in another thread with an inbound -- acknowledgement. -grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM () -grokPacket (PacketBuffer _ outb) (PacketSent seqno a) +grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM () +grokOutboundPacket (PacketBuffer _ outb) (PacketSent seqno a) = do (n,_) <- Q.enqueue outb seqno a when (n/=0) retry -grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) + +grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM () +grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) = do Q.enqueue inb seqno a Q.dropPacketsBefore outb ack -grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) +grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) = do Q.observeOutOfBand inb seqno Q.dropPacketsBefore outb ack diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index db7543f7..57df4734 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -828,7 +828,7 @@ tryAppendQueueOutgoing getExtra outq msg = do mbWire <- nqToWire outq getExtra nextno be pktno msg case mbWire of Just (payload,seqno) -> do - PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload) + PB.grokOutboundPacket (nqPacketBuffer outq) (PacketSent seqno payload) return $ OGSuccess payload Nothing -> return OGEncodeFail @@ -1181,7 +1181,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do ack = bufferStart -- Earliest sequence number they've seen from us. if isLossy msgTypMapped then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm - atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack) + atomically $ PB.grokInboundPacket ncPacketBuffer + (PacketReceivedLossy bufferEnd ack) runCryptoHook session (bufferData cd) else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm when (msgID cm == PING) $ @@ -1192,7 +1193,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do -- num <- CB.getNextSequenceNum ncStoredRequests -- CB.enqueue ncStoredRequests num cd handlePacketRequest session cd - atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack) + atomically $ PB.grokInboundPacket ncPacketBuffer + (PacketReceived bufferEnd cd ack) return Nothing where last2Bytes :: Nonce24 -> Word16 -- cgit v1.2.3