summaryrefslogtreecommitdiff
path: root/src/Data/PacketQueue.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r--src/Data/PacketQueue.hs217
1 files changed, 0 insertions, 217 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
deleted file mode 100644
index 15a3b436..00000000
--- a/src/Data/PacketQueue.hs
+++ /dev/null
@@ -1,217 +0,0 @@
1-- | This module is useful for implementing a lossess protocol on top of a
2-- lossy datagram style protocol. It implements a buffer in which packets may
3-- be stored out of order, but from which they are extracted in the proper
4-- sequence.
5{-# LANGUAGE NamedFieldPuns #-}
6module Data.PacketQueue
7 ( PacketQueue
8 , getCapacity
9 , getLastDequeuedPlus1
10 , getLastEnqueuedPlus1
11 , new
12 , dequeue
13 , dropPacketsLogic
14 , dropPacketsBefore
15 , getMissing
16 -- , dequeueOrGetMissing
17 -- , markButNotDequeue
18 , enqueue
19 , observeOutOfBand
20 , packetQueueViewList
21 -- , mapQ
22 , Data.PacketQueue.lookup
23 ) where
24
25import Control.Concurrent.STM
26import Control.Monad
27import Data.Word
28import Data.Array.MArray
29import Data.Maybe
30
31data PacketQueue a = PacketQueue
32 { pktq :: TArray Word32 (Maybe a)
33 , seqno :: TVar Word32 -- (buffer_start)
34 , qsize :: Word32
35 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1
36 -- i.e. one more than the largest seen sequence number.
37 -- Written by:
38 -- observeOutOfBand
39 -- dropPacketsBefore
40 -- enqueue
41 }
42
43-- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value
44-- is an index into the underlying array, not a sequence number.
45packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
46packetQueueViewList p = do
47 let f (n,Nothing) = Nothing
48 f (n,Just x) = Just (n,x)
49 catMaybes . map f <$> getAssocs (pktq p)
50
51-- | This returns the earliest sequence number with a slot in the queue.
52getLastDequeuedPlus1 :: PacketQueue a -> STM Word32
53getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno
54
55-- | This returns the least upper bound of sequence numbers that have been
56-- enqueued.
57getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32
58getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend
59
60
61-- | This is the number of consequetive sequence numbers, starting at
62-- 'getLastDequeuedPlus1' that can be stored in the queue
63getCapacity :: Applicative m => PacketQueue t -> m Word32
64getCapacity (PacketQueue { qsize }) = pure qsize
65
66-- | Create a new PacketQueue.
67new :: Word32 -- ^ Capacity of queue.
68 -> Word32 -- ^ Initial sequence number.
69 -> STM (PacketQueue a)
70new capacity seqstart = do
71 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
72 q <- newArray (0,cap - 1) Nothing
73 seqv <- newTVar seqstart
74 bufe <- newTVar seqstart
75 return PacketQueue
76 { pktq = q
77 , seqno = seqv
78 , qsize = cap
79 , buffend = bufe
80 }
81
82-- | Update the packet queue given:
83--
84-- * packet queue
85--
86-- * the number of next lossless packet they intend to send you
87--
88-- This behaves exactly like 'enqueue' except that no packet data is written to
89-- the queue.
90observeOutOfBand :: PacketQueue a -> Word32-> STM ()
91observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do
92 low <- readTVar seqno
93 let proj = numberOfNextLosslessPacketThatTheyWillSend - low
94 -- Ignore packet if out of range.
95 when ( proj < qsize) $ do
96 modifyTVar' buffend (\be -> if be - low <= proj then numberOfNextLosslessPacketThatTheyWillSend + 1 else be)
97
98-- | If seqno < buffend then return expected packet numbers for all
99-- the Nothings in the array between them.
100-- Otherwise, return empty list.
101getMissing :: PacketQueue a -> STM [Word32]
102getMissing PacketQueue { pktq, seqno, qsize, buffend } = do
103 seqno0 <- readTVar seqno
104 buffend0 <- readTVar buffend
105 -- note relying on fact that [ b .. a ] is null when a < b
106 let indices = take (fromIntegral qsize) $ [ seqno0 .. buffend0 - 1]
107 maybes <- forM indices $ \i -> do
108 x <- readArray pktq $ i `mod` qsize
109 return (i,x)
110 let nums = map fst . filter (isNothing . snd) $ maybes
111 return nums
112
113-- -- | If seqno < buffend then return expected packet numbers for all
114-- -- the Nothings in the array between them.
115-- -- Otherwise, behave as 'dequeue' would.
116-- -- TODO: Do we need this function? Delete it if not.
117-- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a)
118-- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do
119-- seqno0 <- readTVar seqno
120-- buffend0 <- readTVar buffend
121-- if seqno0 < buffend0
122-- then do
123-- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ])
124-- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes
125-- return (Left nums)
126-- else do
127-- let i = seqno0 `mod` qsize
128-- x <- maybe retry return =<< readArray pktq i
129-- writeArray pktq i Nothing
130-- modifyTVar' seqno succ
131-- return (Right x)
132
133-- | Retry until the next expected packet is enqueued. Then return it.
134dequeue :: PacketQueue a -> STM a
135dequeue PacketQueue { pktq, seqno, qsize } = do
136 i0 <- readTVar seqno
137 let i = i0 `mod` qsize
138 x <- maybe retry return =<< readArray pktq i
139 writeArray pktq i Nothing
140 modifyTVar' seqno succ
141 return x
142
143-- | Helper to 'dropPacketsBefore'.
144dropPacketsLogic :: Word32 -> Word32 -> Word32 -> (Maybe Word32, Word32, [(Word32,Word32)])
145dropPacketsLogic qsize low no0 =
146 let no = no0 - 1 -- Unsigned: could overflow
147 proj = no - low -- Unsigned: could overflow
148 in if proj < qsize
149 then
150 let ilow = low `mod` qsize
151 i = no `mod` qsize
152 ranges = if ilow <= i then [(ilow, i)]
153 else [(0,i),(ilow,qsize-1)]
154 in (Nothing,no0,ranges) -- Clear some, but not all, slots.
155 else (Nothing,low,[]) -- out of bounds, do nothing -- (Just no0, no0, [(0,qsize - 1)]) -- Reset to empty queue.
156
157
158-- | Drop all packets preceding the given packet number.
159dropPacketsBefore :: PacketQueue a -> Word32 -> STM ()
160dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do
161 low <- readTVar seqno
162 let (mbuffend, no, ranges) = dropPacketsLogic qsize low no0
163 mapM_ (writeTVar buffend) mbuffend
164 writeTVar seqno no
165 forM_ ranges $ \(lo,hi) -> forM_ [lo .. hi] $ \i -> writeArray pktq i Nothing
166
167
168-- -- | Like dequeue, but marks as viewed rather than removing
169-- markButNotDequeue :: PacketQueue (Bool,a) -> STM a
170-- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
171-- i0 <- readTVar seqno
172-- let i = i0 `mod` qsize
173-- (b,x) <- maybe retry return =<< readArray pktq i
174-- writeArray pktq i (Just (True,x))
175-- modifyTVar' seqno succ
176-- return x
177
178-- | Enqueue a packet. Packets need not be enqueued in order as long as there
179-- is spare capacity in the queue. If there is not, the packet will be
180-- silently discarded without blocking. (Unless this is an Overwrite-queue, in
181-- which case, the packets will simply wrap around overwriting the old ones.)
182--
183-- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at
184-- which the new packet was stored in the buffer. If the queue was full, the
185-- first of the returned pair will be non-zero.
186enqueue :: PacketQueue a -- ^ The packet queue.
187 -> Word32 -- ^ Sequence number of the packet.
188 -> a -- ^ The packet.
189 -> STM (Word32,Word32)
190enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do
191 low <- readTVar seqno
192 let proj = no - low
193 -- Ignore packet if out of range.
194 when ( proj < qsize) $ do
195 let i = no `mod` qsize
196 writeArray pktq i (Just x)
197 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
198 return (proj `divMod` qsize)
199
200-- | Obtain the packet with the given sequence number if it is stored in the
201-- queue, otherwise /Nothing/ is returned without blocking.
202lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
203lookup PacketQueue{ pktq, seqno, qsize } no = do
204 low <- readTVar seqno
205 let proj = no - low
206 if proj < qsize
207 then let i = no `mod` qsize
208 in readArray pktq i
209 else return Nothing
210
211-- -- | For each item in the queue, modify or delete it.
212-- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM ()
213-- mapQ f PacketQueue{pktq} = do
214-- (z,n) <- getBounds pktq
215-- forM_ [z .. n] $ \i -> do
216-- e <- readArray pktq i
217-- writeArray pktq i (e>>=f)