diff options
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r-- | src/Data/PacketQueue.hs | 217 |
1 files changed, 0 insertions, 217 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs deleted file mode 100644 index 15a3b436..00000000 --- a/src/Data/PacketQueue.hs +++ /dev/null | |||
@@ -1,217 +0,0 @@ | |||
1 | -- | This module is useful for implementing a lossess protocol on top of a | ||
2 | -- lossy datagram style protocol. It implements a buffer in which packets may | ||
3 | -- be stored out of order, but from which they are extracted in the proper | ||
4 | -- sequence. | ||
5 | {-# LANGUAGE NamedFieldPuns #-} | ||
6 | module Data.PacketQueue | ||
7 | ( PacketQueue | ||
8 | , getCapacity | ||
9 | , getLastDequeuedPlus1 | ||
10 | , getLastEnqueuedPlus1 | ||
11 | , new | ||
12 | , dequeue | ||
13 | , dropPacketsLogic | ||
14 | , dropPacketsBefore | ||
15 | , getMissing | ||
16 | -- , dequeueOrGetMissing | ||
17 | -- , markButNotDequeue | ||
18 | , enqueue | ||
19 | , observeOutOfBand | ||
20 | , packetQueueViewList | ||
21 | -- , mapQ | ||
22 | , Data.PacketQueue.lookup | ||
23 | ) where | ||
24 | |||
25 | import Control.Concurrent.STM | ||
26 | import Control.Monad | ||
27 | import Data.Word | ||
28 | import Data.Array.MArray | ||
29 | import Data.Maybe | ||
30 | |||
31 | data PacketQueue a = PacketQueue | ||
32 | { pktq :: TArray Word32 (Maybe a) | ||
33 | , seqno :: TVar Word32 -- (buffer_start) | ||
34 | , qsize :: Word32 | ||
35 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 | ||
36 | -- i.e. one more than the largest seen sequence number. | ||
37 | -- Written by: | ||
38 | -- observeOutOfBand | ||
39 | -- dropPacketsBefore | ||
40 | -- enqueue | ||
41 | } | ||
42 | |||
43 | -- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value | ||
44 | -- is an index into the underlying array, not a sequence number. | ||
45 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] | ||
46 | packetQueueViewList p = do | ||
47 | let f (n,Nothing) = Nothing | ||
48 | f (n,Just x) = Just (n,x) | ||
49 | catMaybes . map f <$> getAssocs (pktq p) | ||
50 | |||
51 | -- | This returns the earliest sequence number with a slot in the queue. | ||
52 | getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 | ||
53 | getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno | ||
54 | |||
55 | -- | This returns the least upper bound of sequence numbers that have been | ||
56 | -- enqueued. | ||
57 | getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32 | ||
58 | getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend | ||
59 | |||
60 | |||
61 | -- | This is the number of consequetive sequence numbers, starting at | ||
62 | -- 'getLastDequeuedPlus1' that can be stored in the queue | ||
63 | getCapacity :: Applicative m => PacketQueue t -> m Word32 | ||
64 | getCapacity (PacketQueue { qsize }) = pure qsize | ||
65 | |||
66 | -- | Create a new PacketQueue. | ||
67 | new :: Word32 -- ^ Capacity of queue. | ||
68 | -> Word32 -- ^ Initial sequence number. | ||
69 | -> STM (PacketQueue a) | ||
70 | new capacity seqstart = do | ||
71 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 | ||
72 | q <- newArray (0,cap - 1) Nothing | ||
73 | seqv <- newTVar seqstart | ||
74 | bufe <- newTVar seqstart | ||
75 | return PacketQueue | ||
76 | { pktq = q | ||
77 | , seqno = seqv | ||
78 | , qsize = cap | ||
79 | , buffend = bufe | ||
80 | } | ||
81 | |||
82 | -- | Update the packet queue given: | ||
83 | -- | ||
84 | -- * packet queue | ||
85 | -- | ||
86 | -- * the number of next lossless packet they intend to send you | ||
87 | -- | ||
88 | -- This behaves exactly like 'enqueue' except that no packet data is written to | ||
89 | -- the queue. | ||
90 | observeOutOfBand :: PacketQueue a -> Word32-> STM () | ||
91 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do | ||
92 | low <- readTVar seqno | ||
93 | let proj = numberOfNextLosslessPacketThatTheyWillSend - low | ||
94 | -- Ignore packet if out of range. | ||
95 | when ( proj < qsize) $ do | ||
96 | modifyTVar' buffend (\be -> if be - low <= proj then numberOfNextLosslessPacketThatTheyWillSend + 1 else be) | ||
97 | |||
98 | -- | If seqno < buffend then return expected packet numbers for all | ||
99 | -- the Nothings in the array between them. | ||
100 | -- Otherwise, return empty list. | ||
101 | getMissing :: PacketQueue a -> STM [Word32] | ||
102 | getMissing PacketQueue { pktq, seqno, qsize, buffend } = do | ||
103 | seqno0 <- readTVar seqno | ||
104 | buffend0 <- readTVar buffend | ||
105 | -- note relying on fact that [ b .. a ] is null when a < b | ||
106 | let indices = take (fromIntegral qsize) $ [ seqno0 .. buffend0 - 1] | ||
107 | maybes <- forM indices $ \i -> do | ||
108 | x <- readArray pktq $ i `mod` qsize | ||
109 | return (i,x) | ||
110 | let nums = map fst . filter (isNothing . snd) $ maybes | ||
111 | return nums | ||
112 | |||
113 | -- -- | If seqno < buffend then return expected packet numbers for all | ||
114 | -- -- the Nothings in the array between them. | ||
115 | -- -- Otherwise, behave as 'dequeue' would. | ||
116 | -- -- TODO: Do we need this function? Delete it if not. | ||
117 | -- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) | ||
118 | -- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do | ||
119 | -- seqno0 <- readTVar seqno | ||
120 | -- buffend0 <- readTVar buffend | ||
121 | -- if seqno0 < buffend0 | ||
122 | -- then do | ||
123 | -- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) | ||
124 | -- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes | ||
125 | -- return (Left nums) | ||
126 | -- else do | ||
127 | -- let i = seqno0 `mod` qsize | ||
128 | -- x <- maybe retry return =<< readArray pktq i | ||
129 | -- writeArray pktq i Nothing | ||
130 | -- modifyTVar' seqno succ | ||
131 | -- return (Right x) | ||
132 | |||
133 | -- | Retry until the next expected packet is enqueued. Then return it. | ||
134 | dequeue :: PacketQueue a -> STM a | ||
135 | dequeue PacketQueue { pktq, seqno, qsize } = do | ||
136 | i0 <- readTVar seqno | ||
137 | let i = i0 `mod` qsize | ||
138 | x <- maybe retry return =<< readArray pktq i | ||
139 | writeArray pktq i Nothing | ||
140 | modifyTVar' seqno succ | ||
141 | return x | ||
142 | |||
143 | -- | Helper to 'dropPacketsBefore'. | ||
144 | dropPacketsLogic :: Word32 -> Word32 -> Word32 -> (Maybe Word32, Word32, [(Word32,Word32)]) | ||
145 | dropPacketsLogic qsize low no0 = | ||
146 | let no = no0 - 1 -- Unsigned: could overflow | ||
147 | proj = no - low -- Unsigned: could overflow | ||
148 | in if proj < qsize | ||
149 | then | ||
150 | let ilow = low `mod` qsize | ||
151 | i = no `mod` qsize | ||
152 | ranges = if ilow <= i then [(ilow, i)] | ||
153 | else [(0,i),(ilow,qsize-1)] | ||
154 | in (Nothing,no0,ranges) -- Clear some, but not all, slots. | ||
155 | else (Nothing,low,[]) -- out of bounds, do nothing -- (Just no0, no0, [(0,qsize - 1)]) -- Reset to empty queue. | ||
156 | |||
157 | |||
158 | -- | Drop all packets preceding the given packet number. | ||
159 | dropPacketsBefore :: PacketQueue a -> Word32 -> STM () | ||
160 | dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do | ||
161 | low <- readTVar seqno | ||
162 | let (mbuffend, no, ranges) = dropPacketsLogic qsize low no0 | ||
163 | mapM_ (writeTVar buffend) mbuffend | ||
164 | writeTVar seqno no | ||
165 | forM_ ranges $ \(lo,hi) -> forM_ [lo .. hi] $ \i -> writeArray pktq i Nothing | ||
166 | |||
167 | |||
168 | -- -- | Like dequeue, but marks as viewed rather than removing | ||
169 | -- markButNotDequeue :: PacketQueue (Bool,a) -> STM a | ||
170 | -- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | ||
171 | -- i0 <- readTVar seqno | ||
172 | -- let i = i0 `mod` qsize | ||
173 | -- (b,x) <- maybe retry return =<< readArray pktq i | ||
174 | -- writeArray pktq i (Just (True,x)) | ||
175 | -- modifyTVar' seqno succ | ||
176 | -- return x | ||
177 | |||
178 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | ||
179 | -- is spare capacity in the queue. If there is not, the packet will be | ||
180 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, in | ||
181 | -- which case, the packets will simply wrap around overwriting the old ones.) | ||
182 | -- | ||
183 | -- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at | ||
184 | -- which the new packet was stored in the buffer. If the queue was full, the | ||
185 | -- first of the returned pair will be non-zero. | ||
186 | enqueue :: PacketQueue a -- ^ The packet queue. | ||
187 | -> Word32 -- ^ Sequence number of the packet. | ||
188 | -> a -- ^ The packet. | ||
189 | -> STM (Word32,Word32) | ||
190 | enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do | ||
191 | low <- readTVar seqno | ||
192 | let proj = no - low | ||
193 | -- Ignore packet if out of range. | ||
194 | when ( proj < qsize) $ do | ||
195 | let i = no `mod` qsize | ||
196 | writeArray pktq i (Just x) | ||
197 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | ||
198 | return (proj `divMod` qsize) | ||
199 | |||
200 | -- | Obtain the packet with the given sequence number if it is stored in the | ||
201 | -- queue, otherwise /Nothing/ is returned without blocking. | ||
202 | lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | ||
203 | lookup PacketQueue{ pktq, seqno, qsize } no = do | ||
204 | low <- readTVar seqno | ||
205 | let proj = no - low | ||
206 | if proj < qsize | ||
207 | then let i = no `mod` qsize | ||
208 | in readArray pktq i | ||
209 | else return Nothing | ||
210 | |||
211 | -- -- | For each item in the queue, modify or delete it. | ||
212 | -- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM () | ||
213 | -- mapQ f PacketQueue{pktq} = do | ||
214 | -- (z,n) <- getBounds pktq | ||
215 | -- forM_ [z .. n] $ \i -> do | ||
216 | -- e <- readArray pktq i | ||
217 | -- writeArray pktq i (e>>=f) | ||