diff options
Diffstat (limited to 'src/Data/PacketBuffer.hs')
-rw-r--r-- | src/Data/PacketBuffer.hs | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs new file mode 100644 index 00000000..c5ede50d --- /dev/null +++ b/src/Data/PacketBuffer.hs | |||
@@ -0,0 +1,102 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | ||
2 | module Data.PacketBuffer | ||
3 | ( PacketBuffer | ||
4 | , newPacketBuffer | ||
5 | , PacketEvent(..) | ||
6 | , grokPacket | ||
7 | , awaitReadyPacket | ||
8 | , packetNumbersToRequest | ||
9 | , expectingSequenceNumber | ||
10 | , nextToSendSequenceNumber | ||
11 | , retrieveForResend | ||
12 | , decompressSequenceNumbers | ||
13 | ) where | ||
14 | |||
15 | import Data.PacketQueue as Q | ||
16 | |||
17 | import Control.Concurrent.STM | ||
18 | import Control.Monad | ||
19 | import Data.Maybe | ||
20 | import Data.Word | ||
21 | |||
22 | data PacketBuffer a b = PacketBuffer | ||
23 | { inQueue :: PacketQueue a | ||
24 | , outBuffer :: PacketQueue b | ||
25 | } | ||
26 | |||
27 | -- | Initialize the packet buffers. Note, the capacity of the inbound packet | ||
28 | -- queue is currently hardcoded to 200 packets and the capacity of the outbound | ||
29 | -- buffer is hardcoded to 400 packets. | ||
30 | newPacketBuffer :: STM (PacketBuffer a b) | ||
31 | newPacketBuffer = PacketBuffer <$> Q.new 200 0 | ||
32 | <*> Q.new 400 0 | ||
33 | |||
34 | -- | Input for 'grokPacket'. | ||
35 | data PacketEvent a b | ||
36 | = PacketSent { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
37 | , peSentPayload :: b -- ^ Payload packet we sent to them. | ||
38 | } | ||
39 | | PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
40 | , peReceivedPayload :: a -- ^ Payload packet they sent to us. | ||
41 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
42 | } | ||
43 | | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. | ||
44 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
45 | } | ||
46 | |||
47 | -- | Whenever a packet is received or sent (but not resent) from the network, | ||
48 | -- this function should be called to update the relevant buffers. | ||
49 | -- | ||
50 | -- On outgoing packets, if the outbound buffer is full, this will block | ||
51 | -- indefinitely until it is called in another thread with an inbound | ||
52 | -- acknowledgement. | ||
53 | grokPacket :: PacketBuffer a b -> PacketEvent a b -> STM () | ||
54 | grokPacket (PacketBuffer _ outb) (PacketSent seqno a) | ||
55 | = do (n,_) <- Q.enqueue outb seqno a | ||
56 | when (n/=0) retry | ||
57 | grokPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) | ||
58 | = do Q.enqueue inb seqno a | ||
59 | Q.dropPacketsBefore outb ack | ||
60 | grokPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno ack) | ||
61 | = do Q.observeOutOfBand inb seqno | ||
62 | Q.dropPacketsBefore outb ack | ||
63 | |||
64 | -- | Wait until an inbound packet is ready to process. Any necessary | ||
65 | -- re-ordering will have been done. | ||
66 | awaitReadyPacket :: PacketBuffer a b -> STM a | ||
67 | awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb | ||
68 | |||
69 | -- | Obtain a list of sequence numbers that may have been dropped. This would | ||
70 | -- be any number not yet received that is prior to the maxium sequence number | ||
71 | -- we have received. For convenience, a lowerbound for the missing squence numbers | ||
72 | -- is also returned as the second item of the pair. | ||
73 | packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32) | ||
74 | packetNumbersToRequest (PacketBuffer inb _) = do | ||
75 | ns <- Q.getMissing inb | ||
76 | lb <- Q.getLastDequeuedPlus1 inb | ||
77 | return (ns,lb) | ||
78 | |||
79 | expectingSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
80 | expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb | ||
81 | |||
82 | nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
83 | nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb | ||
84 | |||
85 | -- | Retrieve already-sent packets by their sequence numbers. See | ||
86 | -- 'decompressSequenceNumbers' to obtain the sequence number list from a | ||
87 | -- compressed encoding. There is no need to call 'grokPacket' when sending the | ||
88 | -- packets returned from this call. | ||
89 | retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)] | ||
90 | retrieveForResend (PacketBuffer _ outb) seqnos = | ||
91 | catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no) | ||
92 | |||
93 | -- | Expand a compressed set of sequence numbers. The first squence number is | ||
94 | -- given directly and the rest are computed using 8-bit offsets. This is | ||
95 | -- normally used to obtain input for 'retrieveForResend'. | ||
96 | decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32] | ||
97 | decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1) | ||
98 | where | ||
99 | doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32] | ||
100 | doOne 0 f addend = f (addend+255) | ||
101 | doOne x f addend = let y = fromIntegral x + addend | ||
102 | in y : f y | ||