diff options
Diffstat (limited to 'src/Data/PacketBuffer.hs')
-rw-r--r-- | src/Data/PacketBuffer.hs | 148 |
1 files changed, 0 insertions, 148 deletions
diff --git a/src/Data/PacketBuffer.hs b/src/Data/PacketBuffer.hs deleted file mode 100644 index 17745664..00000000 --- a/src/Data/PacketBuffer.hs +++ /dev/null | |||
@@ -1,148 +0,0 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | ||
2 | {-# LANGUAGE DeriveFunctor #-} | ||
3 | module Data.PacketBuffer | ||
4 | ( PacketBuffer | ||
5 | , newPacketBuffer | ||
6 | , PacketOutboundEvent(..) | ||
7 | , PacketInboundEvent(..) | ||
8 | , grokOutboundPacket | ||
9 | , grokInboundPacket | ||
10 | , awaitReadyPacket | ||
11 | , packetNumbersToRequest | ||
12 | , expectingSequenceNumber | ||
13 | , nextToSendSequenceNumber | ||
14 | , retrieveForResend | ||
15 | , decompressSequenceNumbers | ||
16 | , compressSequenceNumbers | ||
17 | , pbReport | ||
18 | ) where | ||
19 | |||
20 | import Data.PacketQueue as Q | ||
21 | import DPut | ||
22 | import DebugTag | ||
23 | |||
24 | import Control.Concurrent.STM | ||
25 | import Control.Monad | ||
26 | import Data.Maybe | ||
27 | import Data.Word | ||
28 | |||
29 | data PacketBuffer a b = PacketBuffer | ||
30 | { inQueue :: PacketQueue a | ||
31 | , outBuffer :: PacketQueue b } | ||
32 | |||
33 | -- | Initialize the packet buffers. Note, the capacity of the inbound packet | ||
34 | -- queue is currently hardcoded to 200 packets and the capacity of the outbound | ||
35 | -- buffer is hardcoded to 400 packets. | ||
36 | newPacketBuffer :: STM (PacketBuffer a b) | ||
37 | newPacketBuffer = PacketBuffer <$> Q.new 200 0 | ||
38 | <*> Q.new 400 0 | ||
39 | |||
40 | -- | Input for 'grokPacket'. | ||
41 | data PacketOutboundEvent b | ||
42 | = PacketSent { poSeqNum :: Word32 -- ^ Sequence number for payload. | ||
43 | , poSentPayload :: b -- ^ Payload packet we sent to them. | ||
44 | } | ||
45 | deriving Functor | ||
46 | |||
47 | data PacketInboundEvent a | ||
48 | = PacketReceived { peSeqNum :: Word32 -- ^ Sequence number for payload. | ||
49 | , peReceivedPayload :: a -- ^ Payload packet they sent to us. | ||
50 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
51 | } | ||
52 | | PacketReceivedLossy { peSeqNum :: Word32 -- ^ Sequence number for lossy packet. | ||
53 | , peReceivedPayload :: a -- ^ Payload packet they sent to us (ignored). | ||
54 | , peAcknowledgedNum :: Word32 -- ^ Earliest sequence number they've seen from us. | ||
55 | } | ||
56 | deriving Functor | ||
57 | |||
58 | -- | Whenever a packet is received or sent (but not resent) from the network, | ||
59 | -- this function should be called to update the relevant buffers. | ||
60 | -- | ||
61 | -- On outgoing packets, if the outbound buffer is full, this will return | ||
62 | -- True. In this case, the caller may retry to enable blocking until | ||
63 | -- 'grokInboundPacket' is called in another thread. | ||
64 | grokOutboundPacket :: PacketBuffer a b -> PacketOutboundEvent b -> STM (Bool,(Word32,Word32)) | ||
65 | grokOutboundPacket (PacketBuffer _ outb) (PacketSent seqno a) | ||
66 | = do (n,r) <- Q.enqueue outb seqno a | ||
67 | return (n/=0,(n,r)) | ||
68 | |||
69 | grokInboundPacket :: PacketBuffer a b -> PacketInboundEvent a -> STM () | ||
70 | grokInboundPacket (PacketBuffer inb outb) (PacketReceived seqno a ack) | ||
71 | = do Q.enqueue inb seqno a | ||
72 | Q.dropPacketsBefore outb ack | ||
73 | grokInboundPacket (PacketBuffer inb outb) (PacketReceivedLossy seqno _ ack) | ||
74 | = do Q.observeOutOfBand inb seqno | ||
75 | Q.dropPacketsBefore outb ack | ||
76 | |||
77 | -- | Wait until an inbound packet is ready to process. Any necessary | ||
78 | -- re-ordering will have been done. | ||
79 | awaitReadyPacket :: PacketBuffer a b -> STM a | ||
80 | awaitReadyPacket (PacketBuffer inb _) = Q.dequeue inb | ||
81 | |||
82 | -- | Obtain a list of sequence numbers that may have been dropped. This would | ||
83 | -- be any number not yet received that is prior to the maxium sequence number | ||
84 | -- we have received. For convenience, a lowerbound for the missing squence numbers | ||
85 | -- is also returned as the second item of the pair. | ||
86 | packetNumbersToRequest :: PacketBuffer a b -> STM ([Word32],Word32) | ||
87 | packetNumbersToRequest (PacketBuffer inb _) = do | ||
88 | ns <- Q.getMissing inb | ||
89 | lb <- Q.getLastDequeuedPlus1 inb | ||
90 | return (ns,lb) | ||
91 | |||
92 | expectingSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
93 | expectingSequenceNumber (PacketBuffer inb _ ) = Q.getLastDequeuedPlus1 inb | ||
94 | |||
95 | nextToSendSequenceNumber :: PacketBuffer a b -> STM Word32 | ||
96 | nextToSendSequenceNumber (PacketBuffer _ outb) = Q.getLastEnqueuedPlus1 outb | ||
97 | |||
98 | -- | Retrieve already-sent packets by their sequence numbers. See | ||
99 | -- 'decompressSequenceNumbers' to obtain the sequence number list from a | ||
100 | -- compressed encoding. There is no need to call 'grokPacket' when sending the | ||
101 | -- packets returned from this call. | ||
102 | retrieveForResend :: PacketBuffer a b -> [Word32] -> STM [(Word32,b)] | ||
103 | retrieveForResend (PacketBuffer _ outb) seqnos = | ||
104 | catMaybes <$> forM seqnos (\no -> fmap (no,) <$> Q.lookup outb no) | ||
105 | |||
106 | -- | Expand a compressed set of sequence numbers. The first sequence number is | ||
107 | -- given directly and the rest are computed using 8-bit offsets. This is | ||
108 | -- normally used to obtain input for 'retrieveForResend'. | ||
109 | decompressSequenceNumbers :: Word32 -> [Word8] -> [Word32] | ||
110 | decompressSequenceNumbers baseno ns = foldr doOne (const []) ns (baseno-1) | ||
111 | where | ||
112 | doOne :: Word8 -> (Word32 -> [Word32]) -> Word32 -> [Word32] | ||
113 | doOne 0 f addend = f (addend + 255) | ||
114 | doOne x f addend = let y = fromIntegral x + addend | ||
115 | in y : f y | ||
116 | |||
117 | compressSequenceNumbers :: Word32 -> [Word32] -> [Word8] | ||
118 | compressSequenceNumbers baseno xs = foldr doOne (const []) xs (baseno-1) | ||
119 | where | ||
120 | doOne :: Word32 -> (Word32 -> [Word8]) -> Word32 -> [Word8] | ||
121 | doOne y f addend = case y - addend of | ||
122 | x | x < 255 -> fromIntegral x : f y | ||
123 | | otherwise -> 0 : doOne y f (addend + 255) | ||
124 | |||
125 | {- | ||
126 | compressSequenceNumbers :: Word32 -> [Word32] -> [Word8] | ||
127 | compressSequenceNumbers seqno xs = let r = map fromIntegral (reduceToSums ys >>= makeZeroes) | ||
128 | in dtrace XNetCrypto ("compressSequenceNumbers " ++ show seqno ++ show xs ++ " --> "++show r) r | ||
129 | where | ||
130 | ys = Prelude.map (subtract (seqno - 1)) xs | ||
131 | reduceToSums [] = [] | ||
132 | reduceToSums (x:xs) = x:(reduceToSums $ Prelude.map (subtract x) xs) | ||
133 | makeZeroes :: Word32 -> [Word32] | ||
134 | -- makeZeroes 0 = [] | ||
135 | makeZeroes x | ||
136 | = let (d,m)= x `divMod` 255 | ||
137 | zeros= Prelude.replicate (fromIntegral d) 0 | ||
138 | in zeros ++ [m] | ||
139 | -} | ||
140 | |||
141 | pbReport :: String -> PacketBuffer a b -> STM String | ||
142 | pbReport what (PacketBuffer inb outb) = do | ||
143 | inb_seqno <- getLastDequeuedPlus1 inb | ||
144 | inb_buffend <- getLastEnqueuedPlus1 inb | ||
145 | outb_seqno <- getLastDequeuedPlus1 outb | ||
146 | outb_bufend <- getLastEnqueuedPlus1 outb | ||
147 | return $ "PacketBuffer<"++what++"> Inbound" ++ show (inb_seqno, inb_buffend) | ||
148 | ++" Outbound" ++ show (outb_seqno, outb_bufend) | ||