summaryrefslogtreecommitdiff
path: root/src/Data/PacketBuffer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data/PacketBuffer.hs')
-rw-r--r--src/Data/PacketBuffer.hs102
1 files changed, 102 insertions, 0 deletions
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs
new file mode 100644
index 00000000..c5ede50d
--- /dev/null
+++ b/src/Data/PacketBuffer.hs
@@ -0,0 +1,102 @@
1{-# LANGUAGE TupleSections #-}
2module Data.PacketBuffer
3 ( PacketBuffer
4 , newPacketBuffer
5 , PacketEvent(..)
6 , grokPacket
7 , awaitReadyPacket
8 , packetNumbersToRequest
9 , expectingSequenceNumber
10 , nextToSendSequenceNumber
11 , retrieveForResend
12 , decompressSequenceNumbers
13 ) where
14
15import Data.PacketQueue as Q
16
17import Control.Concurrent.STM
18import Control.Monad
19import Data.Maybe
20import Data.Word
21
22data PacketBuffer a b = PacketBuffer
23 { inQueue :: PacketQueue a
24 , outBuffer :: PacketQueue b
25 }
26
27-- | Initialize the packet buffers. Note, the capacity of the inbound packet
28-- queue is currently hardcoded to 200 packets and the capacity of the outbound
29-- buffer is hardcoded to 400 packets.
30newPacketBuffer :: STM (PacketBuffer a b)
31newPacketBuffer = PacketBuffer <$> Q.new 200 0
32 <*> Q.new 400 0
33
34-- | Input for 'grokPacket'.
35data PacketEvent a b
36 = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload.
37 , peSentPayload :: b -- ^ Payload packet we sent to them.
38 }
39 | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload.
40 , peReceivedPayload :: a -- ^ Payload packet they sent to us.
41 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
42 }
43 | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet.
44 , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us.
45 }
46
47-- | Whenever a packet is received or sent (but not resent) from the network,
48-- this function should be called to update the relevant buffers.
49--
50-- On outgoing packets, if the outbound buffer is full, this will block
51-- indefinitely until it is called in another thread with an inbound
52-- acknowledgement.
53grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM ()
54grokPacket (PacketBuffer _ outb) (PacketSent seqno a)
55 = do (n,_) <- Q.enqueue outb seqno a
56 when (n/=0) retry
57grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack)
58 = do Q.enqueue inb seqno a
59 Q.dropPacketsBefore outb ack
60grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack)
61 = do Q.observeOutOfBand inb seqno
62 Q.dropPacketsBefore outb ack
63
64-- | Wait until an inbound packet is ready to process. Any necessary
65-- re-ordering will have been done.
66awaitReadyPacket :: PacketBuffer a b -> STM a
67awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb
68
69-- | Obtain a list of sequence numbers that may have been dropped. This would
70-- be any number not yet received that is prior to the maxium sequence number
71-- we have received. For convenience, a lowerbound for the missing squence numbers
72-- is also returned as the second item of the pair.
73packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32)
74packetNumbersToRequest (PacketBuffer inb _) = do
75 ns <- Q.getMissing inb
76 lb <- Q.getLastDequeuedPlus1 inb
77 return (ns,lb)
78
79expectingSequenceNumber :: PacketBuffer a b -> STM Word32
80expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb
81
82nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32
83nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb
84
85-- | Retrieve already-sent packets by their sequence numbers. See
86-- 'decompressSequenceNumbers' to obtain the sequence number list from a
87-- compressed encoding. There is no need to call 'grokPacket' when sending the
88-- packets returned from this call.
89retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)]
90retrieveForResend (PacketBuffer _ outb) seqnos =
91 catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no)
92
93-- | Expand a compressed set of sequence numbers. The first squence number is
94-- given directly and the rest are computed using 8-bit offsets. This is
95-- normally used to obtain input for 'retrieveForResend'.
96decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32]
97decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1)
98 where
99 doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32]
100 doOne 0 f addend = f (addend+255)
101 doOne x f addend = let y = fromIntegral x + addend
102 in y : f y