diff options
-rw-r--r-- | src/Data/PacketBuffer.hs | 29 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 8 |
2 files changed, 24 insertions, 13 deletions
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs index c5ede50d..f01abb28 100644 --- a/src/Data/PacketBuffer.hs +++ b/src/Data/PacketBuffer.hs | |||
@@ -1,9 +1,12 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | 1 | {-# LANGUAGE TupleSections #-} |
2 | {-# LANGUAGE DeriveFunctor #-} | ||
2 | module Data.PacketBuffer | 3 | module Data.PacketBuffer |
3 | ( PacketBuffer | 4 | ( PacketBuffer |
4 | , newPacketBuffer | 5 | , newPacketBuffer |
5 | , PacketEvent(..) | 6 | , PacketOutboundEvent(..) |
6 | , grokPacket | 7 | , PacketInboundEvent(..) |
8 | , grokOutboundPacket | ||
9 | , grokInboundPacket | ||
7 | , awaitReadyPacket | 10 | , awaitReadyPacket |
8 | , packetNumbersToRequest | 11 | , packetNumbersToRequest |
9 | , expectingSequenceNumber | 12 | , expectingSequenceNumber |
@@ -32,17 +35,21 @@ newPacketBuffer = PacketBuffer <$> Q.new 200 0 | |||
32 | <*> Q.new 400 0 | 35 | <*> Q.new 400 0 |
33 | 36 | ||
34 | -- | Input for 'grokPacket'. | 37 | -- | Input for 'grokPacket'. |
35 | data PacketEvent a b | 38 | data PacketOutboundEvent b |
36 | = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload. | 39 | = PacketSent { poSeqNum :: Word32 -- ^ Sequence number for payload. |
37 | , peSentPayload :: b -- ^ Payload packet we sent to them. | 40 | , poSentPayload :: b -- ^ Payload packet we sent to them. |
38 | } | 41 | } |
39 | | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. | 42 | deriving Functor |
43 | |||
44 | data PacketInboundEvent a | ||
45 | = PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
40 | , peReceivedPayload :: a -- ^ Payload packet they sent to us. | 46 | , peReceivedPayload :: a -- ^ Payload packet they sent to us. |
41 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | 47 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. |
42 | } | 48 | } |
43 | | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. | 49 | | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. |
44 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | 50 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. |
45 | } | 51 | } |
52 | deriving Functor | ||
46 | 53 | ||
47 | -- | Whenever a packet is received or sent (but not resent) from the network, | 54 | -- | Whenever a packet is received or sent (but not resent) from the network, |
48 | -- this function should be called to update the relevant buffers. | 55 | -- this function should be called to update the relevant buffers. |
@@ -50,14 +57,16 @@ data PacketEvent a b | |||
50 | -- On outgoing packets, if the outbound buffer is full, this will block | 57 | -- On outgoing packets, if the outbound buffer is full, this will block |
51 | -- indefinitely until it is called in another thread with an inbound | 58 | -- indefinitely until it is called in another thread with an inbound |
52 | -- acknowledgement. | 59 | -- acknowledgement. |
53 | grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM () | 60 | grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM () |
54 | grokPacket (PacketBuffer _ outb) (PacketSent seqno a) | 61 | grokOutboundPacket (PacketBuffer _ outb) (PacketSent seqno a) |
55 | = do (n,_) <- Q.enqueue outb seqno a | 62 | = do (n,_) <- Q.enqueue outb seqno a |
56 | when (n/=0) retry | 63 | when (n/=0) retry |
57 | grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) | 64 | |
65 | grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM () | ||
66 | grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) | ||
58 | = do Q.enqueue inb seqno a | 67 | = do Q.enqueue inb seqno a |
59 | Q.dropPacketsBefore outb ack | 68 | Q.dropPacketsBefore outb ack |
60 | grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) | 69 | grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) |
61 | = do Q.observeOutOfBand inb seqno | 70 | = do Q.observeOutOfBand inb seqno |
62 | Q.dropPacketsBefore outb ack | 71 | Q.dropPacketsBefore outb ack |
63 | 72 | ||
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index db7543f7..57df4734 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -828,7 +828,7 @@ tryAppendQueueOutgoing getExtra outq msg = do | |||
828 | mbWire <- nqToWire outq getExtra nextno be pktno msg | 828 | mbWire <- nqToWire outq getExtra nextno be pktno msg |
829 | case mbWire of | 829 | case mbWire of |
830 | Just (payload,seqno) -> do | 830 | Just (payload,seqno) -> do |
831 | PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload) | 831 | PB.grokOutboundPacket (nqPacketBuffer outq) (PacketSent seqno payload) |
832 | return $ OGSuccess payload | 832 | return $ OGSuccess payload |
833 | Nothing -> return OGEncodeFail | 833 | Nothing -> return OGEncodeFail |
834 | 834 | ||
@@ -1181,7 +1181,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1181 | ack = bufferStart -- Earliest sequence number they've seen from us. | 1181 | ack = bufferStart -- Earliest sequence number they've seen from us. |
1182 | if isLossy msgTypMapped | 1182 | if isLossy msgTypMapped |
1183 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm | 1183 | then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm |
1184 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack) | 1184 | atomically $ PB.grokInboundPacket ncPacketBuffer |
1185 | (PacketReceivedLossy bufferEnd ack) | ||
1185 | runCryptoHook session (bufferData cd) | 1186 | runCryptoHook session (bufferData cd) |
1186 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm | 1187 | else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm |
1187 | when (msgID cm == PING) $ | 1188 | when (msgID cm == PING) $ |
@@ -1192,7 +1193,8 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do | |||
1192 | -- num <- CB.getNextSequenceNum ncStoredRequests | 1193 | -- num <- CB.getNextSequenceNum ncStoredRequests |
1193 | -- CB.enqueue ncStoredRequests num cd | 1194 | -- CB.enqueue ncStoredRequests num cd |
1194 | handlePacketRequest session cd | 1195 | handlePacketRequest session cd |
1195 | atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack) | 1196 | atomically $ PB.grokInboundPacket ncPacketBuffer |
1197 | (PacketReceived bufferEnd cd ack) | ||
1196 | return Nothing | 1198 | return Nothing |
1197 | where | 1199 | where |
1198 | last2Bytes :: Nonce24 -> Word16 | 1200 | last2Bytes :: Nonce24 -> Word16 |