summaryrefslogtreecommitdiff
path: root/src/Data/PacketQueue.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r--src/Data/PacketQueue.hs312
1 files changed, 99 insertions, 213 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index 6f997ac0..c5d5ac09 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -3,30 +3,22 @@
3-- be stored out of order, but from which they are extracted in the proper 3-- be stored out of order, but from which they are extracted in the proper
4-- sequence. 4-- sequence.
5{-# LANGUAGE NamedFieldPuns #-} 5{-# LANGUAGE NamedFieldPuns #-}
6{-# LANGUAGE FlexibleContexts #-}
7module Data.PacketQueue 6module Data.PacketQueue
8 ( PacketQueue 7 ( PacketQueue
9 , getCapacity 8 , getCapacity
10 , getLastDequeuedPlus1 9 , getLastDequeuedPlus1
10 , getLastEnqueuedPlus1
11 , new 11 , new
12 , dequeue 12 , dequeue
13 , dropPacketsBefore
13 , getMissing 14 , getMissing
14 , dequeueOrGetMissing 15 -- , dequeueOrGetMissing
15 , markButNotDequeue 16 -- , markButNotDequeue
16 , enqueue 17 , enqueue
17 , observeOutOfBand 18 , observeOutOfBand
18 , PacketOutQueue
19 , packetQueueViewList 19 , packetQueueViewList
20 , newOutGoing 20 -- , mapQ
21 , readyOutGoing 21 , Data.PacketQueue.lookup
22 , toPNums
23 , getRequested
24 , peekPacket
25 , tryAppendQueueOutgoing
26 , dequeueOutgoing
27 , getHighestHandledPacketPlus1
28 , mapOutGoing
29 , OutGoingResult(..)
30 ) where 22 ) where
31 23
32import Control.Concurrent.STM 24import Control.Concurrent.STM
@@ -34,24 +26,35 @@ import Control.Monad
34import Data.Word 26import Data.Word
35import Data.Array.MArray 27import Data.Array.MArray
36import Data.Maybe 28import Data.Maybe
37import DPut
38 29
39data PacketQueue a = PacketQueue 30data PacketQueue a = PacketQueue
40 { pktq :: TArray Word32 (Maybe a) 31 { pktq :: TArray Word32 (Maybe a)
41 , seqno :: TVar Word32 -- (buffer_start) 32 , seqno :: TVar Word32 -- (buffer_start)
42 , qsize :: Word32 33 , qsize :: Word32
43 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 34 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1
35 -- i.e. one more than the largest seen sequence number.
44 } 36 }
45 37
38-- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value
39-- is an index into the underlying array, not a sequence number.
46packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] 40packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
47packetQueueViewList p = do 41packetQueueViewList p = do
48 let f (n,Nothing) = Nothing 42 let f (n,Nothing) = Nothing
49 f (n,Just x) = Just (n,x) 43 f (n,Just x) = Just (n,x)
50 catMaybes . map f <$> getAssocs (pktq p) 44 catMaybes . map f <$> getAssocs (pktq p)
51 45
46-- | This returns the earliest sequence number with a slot in the queue.
52getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 47getLastDequeuedPlus1 :: PacketQueue a -> STM Word32
53getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno 48getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno
54 49
50-- | This returns the least upper bound of sequence numbers that have been
51-- enqueued.
52getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32
53getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend
54
55
56-- | This is the number of consequetive sequence numbers, starting at
57-- 'getLastDequeuedPlus1' that can be stored in the queue
55getCapacity :: Applicative m => PacketQueue t -> m Word32 58getCapacity :: Applicative m => PacketQueue t -> m Word32
56getCapacity (PacketQueue { qsize }) = pure qsize 59getCapacity (PacketQueue { qsize }) = pure qsize
57 60
@@ -63,7 +66,7 @@ new capacity seqstart = do
63 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 66 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
64 q <- newArray (0,cap - 1) Nothing 67 q <- newArray (0,cap - 1) Nothing
65 seqv <- newTVar seqstart 68 seqv <- newTVar seqstart
66 bufe <- newTVar 0 69 bufe <- newTVar seqstart
67 return PacketQueue 70 return PacketQueue
68 { pktq = q 71 { pktq = q
69 , seqno = seqv 72 , seqno = seqv
@@ -72,8 +75,13 @@ new capacity seqstart = do
72 } 75 }
73 76
74-- | Update the packet queue given: 77-- | Update the packet queue given:
78--
75-- * packet queue 79-- * packet queue
80--
76-- * the number of next lossless packet they intend to send you 81-- * the number of next lossless packet they intend to send you
82--
83-- This behaves exactly like 'enqueue' except that no packet data is written to
84-- the queue.
77observeOutOfBand :: PacketQueue a -> Word32-> STM () 85observeOutOfBand :: PacketQueue a -> Word32-> STM ()
78observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do 86observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do
79 low <- readTVar seqno 87 low <- readTVar seqno
@@ -85,37 +93,35 @@ observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacke
85-- | If seqno < buffend then return expected packet numbers for all 93-- | If seqno < buffend then return expected packet numbers for all
86-- the Nothings in the array between them. 94-- the Nothings in the array between them.
87-- Otherwise, return empty list. 95-- Otherwise, return empty list.
88getMissing :: Show a => PacketQueue a -> STM [Word32] 96getMissing :: PacketQueue a -> STM [Word32]
89getMissing PacketQueue { pktq, seqno, qsize, buffend } = do 97getMissing PacketQueue { pktq, seqno, qsize, buffend } = do
90 seqno0 <- readTVar seqno 98 seqno0 <- readTVar seqno
91 buffend0 <- readTVar buffend 99 buffend0 <- readTVar buffend
92 -- note relying on fact that [ b .. a ] is null when a < b 100 -- note relying on fact that [ b .. a ] is null when a < b
93 let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] 101 let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1]
94 -- tput XNetCrypto $ "(netCRYPTO getMissing indices: " ++ show indices
95 maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices 102 maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices
96 tput XNetCrypto $ "(netCRYPTO) getMissing: (" ++ show seqno0 ++ " " ++ show buffend0 ++") => " ++ show maybes
97 let nums = map fst . filter (isNothing . snd) $ maybes 103 let nums = map fst . filter (isNothing . snd) $ maybes
98 return nums 104 return nums
99 105
100-- | If seqno < buffend then return expected packet numbers for all 106-- -- | If seqno < buffend then return expected packet numbers for all
101-- the Nothings in the array between them. 107-- -- the Nothings in the array between them.
102-- Otherwise, behave as 'dequeue' would. 108-- -- Otherwise, behave as 'dequeue' would.
103-- TODO: Do we need this function? Delete it if not. 109-- -- TODO: Do we need this function? Delete it if not.
104dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) 110-- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a)
105dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do 111-- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do
106 seqno0 <- readTVar seqno 112-- seqno0 <- readTVar seqno
107 buffend0 <- readTVar buffend 113-- buffend0 <- readTVar buffend
108 if seqno0 < buffend0 114-- if seqno0 < buffend0
109 then do 115-- then do
110 maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) 116-- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ])
111 let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes 117-- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes
112 return (Left nums) 118-- return (Left nums)
113 else do 119-- else do
114 let i = seqno0 `mod` qsize 120-- let i = seqno0 `mod` qsize
115 x <- maybe retry return =<< readArray pktq i 121-- x <- maybe retry return =<< readArray pktq i
116 writeArray pktq i Nothing 122-- writeArray pktq i Nothing
117 modifyTVar' seqno succ 123-- modifyTVar' seqno succ
118 return (Right x) 124-- return (Right x)
119 125
120-- | Retry until the next expected packet is enqueued. Then return it. 126-- | Retry until the next expected packet is enqueued. Then return it.
121dequeue :: PacketQueue a -> STM a 127dequeue :: PacketQueue a -> STM a
@@ -127,20 +133,46 @@ dequeue PacketQueue { pktq, seqno, qsize } = do
127 modifyTVar' seqno succ 133 modifyTVar' seqno succ
128 return x 134 return x
129 135
130-- | Like dequeue, but marks as viewed rather than removing 136-- | Drop all packets preceding the given packet number.
131markButNotDequeue :: PacketQueue (Bool,a) -> STM a 137dropPacketsBefore :: PacketQueue a -> Word32 -> STM ()
132markButNotDequeue PacketQueue { pktq, seqno, qsize } = do 138dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do
133 i0 <- readTVar seqno 139 low <- readTVar seqno
134 let i = i0 `mod` qsize 140 let no = no0 - 1 -- possibly negative
135 (b,x) <- maybe retry return =<< readArray pktq i 141 proj = no - low -- possibly negative
136 writeArray pktq i (Just (True,x)) 142 if (proj < qsize)
137 modifyTVar' seqno succ 143 then do
138 return x 144 -- Clear some, but not all, slots.
145 let ilow = low `mod` qsize
146 i = no `mod` qsize
147 ranges = if ilow <= i then [[ilow .. i]]
148 else [[0 .. i],[ilow .. qsize-1]]
149 writeTVar seqno no
150 forM_ ranges $ mapM_ $ \i -> writeArray pktq i Nothing
151 else do
152 -- Reset to empty queue.
153 writeTVar seqno no
154 writeTVar buffend no
155 (z,n) <- getBounds pktq
156 forM_ [z .. n] $ \i -> writeArray pktq i Nothing
157
158-- -- | Like dequeue, but marks as viewed rather than removing
159-- markButNotDequeue :: PacketQueue (Bool,a) -> STM a
160-- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
161-- i0 <- readTVar seqno
162-- let i = i0 `mod` qsize
163-- (b,x) <- maybe retry return =<< readArray pktq i
164-- writeArray pktq i (Just (True,x))
165-- modifyTVar' seqno succ
166-- return x
139 167
140-- | Enqueue a packet. Packets need not be enqueued in order as long as there 168-- | Enqueue a packet. Packets need not be enqueued in order as long as there
141-- is spare capacity in the queue. If there is not, the packet will be 169-- is spare capacity in the queue. If there is not, the packet will be
142-- silently discarded without blocking. (Unless this is an Overwrite-queue, 170-- silently discarded without blocking. (Unless this is an Overwrite-queue, in
143-- in which case, the packets will simply wrap around overwriting the old ones.) 171-- which case, the packets will simply wrap around overwriting the old ones.)
172--
173-- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at
174-- which the new packet was stored in the buffer. If the queue was full, the
175-- first of the returned pair will be non-zero.
144enqueue :: PacketQueue a -- ^ The packet queue. 176enqueue :: PacketQueue a -- ^ The packet queue.
145 -> Word32 -- ^ Sequence number of the packet. 177 -> Word32 -- ^ Sequence number of the packet.
146 -> a -- ^ The packet. 178 -> a -- ^ The packet.
@@ -155,167 +187,21 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do
155 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 187 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
156 return (proj `divMod` qsize) 188 return (proj `divMod` qsize)
157 189
158-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) 190-- | Obtain the packet with the given sequence number if it is stored in the
159-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo 191-- queue, otherwise /Nothing/ is returned without blocking.
160 192lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
161----------------------------------------------------- 193lookup PacketQueue{ pktq, seqno, qsize } no = do
162-- * PacketOutQueue 194 low <- readTVar seqno
163-- 195 let proj = no - low
164 196 if proj < qsize
165data PacketOutQueue extra msg toWire fromWire = PacketOutQueue 197 then let i = no `mod` qsize
166 { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' 198 in readArray pktq i
167 , pktoOutPQ :: PacketQueue (Word32,toWire) 199 else return Nothing
168 , pktoPacketNo :: TVar Word32 200
169 , pktoToWireIO :: IO (STM extra) 201-- -- | For each item in the queue, modify or delete it.
170 , pktoToWire :: STM extra 202-- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM ()
171 -> Word32{-packet number we expect to recieve-} 203-- mapQ f PacketQueue{pktq} = do
172 -> Word32{- buffer_end -} 204-- (z,n) <- getBounds pktq
173 -> Word32{- packet number -} 205-- forM_ [z .. n] $ \i -> do
174 -> msg 206-- e <- readArray pktq i
175 -> STM (Maybe (toWire,Word32{-next packet no-})) 207-- writeArray pktq i (e>>=f)
176 }
177
178mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM ()
179mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do
180 (z,n) <- getBounds pktq
181 let ff i = do
182 e <- readArray pktq i
183 writeArray pktq i (e>>=f)
184 mapM_ ff [z .. n]
185
186newOutGoing :: PacketQueue fromwire
187 -- ^ Incoming queue
188 -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-})))
189 -- ^ toWire callback
190 -> IO (STM io)
191 -- ^ io action to get extra parameter
192 -> Word32 -- ^ packet number of first outgoing packet
193 -> Word32 -- ^ Capacity of queue.
194 -> Word32 -- ^ Initial sequence number.
195 -> STM (PacketOutQueue io msg wire fromwire)
196newOutGoing inq towire toWireIO num capacity seqstart = do
197 outq <- new capacity seqstart
198 numVar <- newTVar num
199 return $ PacketOutQueue
200 { pktoInPQ = inq
201 , pktoOutPQ = outq
202 , pktoPacketNo = numVar
203 , pktoToWireIO = toWireIO
204 , pktoToWire = towire
205 }
206
207data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail
208 deriving (Show)
209
210instance Eq (OutGoingResult a) where
211 OGSuccess _ == OGSuccess _ = True
212 OGFull == OGFull = True
213 OGEncodeFail == OGEncodeFail = True
214 _ == _ = False
215
216
217-- | do something in IO before appending to the queue
218readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra)
219readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO
220
221getRequested :: Show wire => STM extra -> PacketOutQueue extra msg wire fromwire -> Word32 -> [Word8] -> STM [Maybe (Word32,wire)]
222getRequested _ _ _ [] = return []
223getRequested getExtra pktoq snum ns = do
224 let pnums = toPNums snum ns
225 indices = map toIndex pnums
226 tput XNetCrypto $ "getRequested: snum = " ++ show snum
227 tput XNetCrypto $ "getRequested: pnums = " ++ show pnums ++ " indices = " ++ show indices
228 xs <- packetQueueViewList (pktoOutPQ pktoq)
229 tput XNetCrypto $ "getRequested viewList -> "
230 mapM_ (tput XNetCrypto . show) xs
231 forM indices $ \i -> readArray (pktq $ pktoOutPQ pktoq) i
232 where
233 toIndex :: Word32 -> Word32
234 toIndex = (`mod` qsize (pktoOutPQ pktoq))
235
236toPNums :: Word32 -> [Word8] -> [Word32]
237toPNums snum ns = reverse . snd $ foldl doOne (snum-1,[]) ns
238 where
239 doOne :: (Word32,[Word32]) -> Word8 -> (Word32,[Word32])
240 doOne (addend,as) 0 = (addend+255,as)
241 doOne (addend,as) x = let y = fromIntegral x + addend
242 in (y,y:as)
243
244-- | Makes no modifications to the PacketOutQueue state, useful for lossy packets
245peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32))
246peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg
247 = do
248 be <- readTVar (buffend pktoOutPQ)
249 let i = be `mod` (qsize pktoOutPQ)
250 let arrayEmpty :: MArray a e m => a Word32 e -> m Bool
251 arrayEmpty ar = do (lowB,highB) <- getBounds ar
252 let result= lowB > highB
253 tput XNetCrypto
254 ("arrayEmpty result=" ++ show result
255 ++ " lowB=" ++ show lowB
256 ++ " highB = " ++ show highB
257 ++ " i = " ++ show i)
258 return result
259 mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ)
260 if emp then tput XNetCrypto "(peekPacket empty)" >> return Nothing
261 else do tput XNetCrypto "(peekPacket nonempty)"
262 result <- readArray (pktq pktoOutPQ) i
263 tput XNetCrypto ("readArray (isJust result)==" ++ show (isJust result))
264 return result
265 pktno <- readTVar pktoPacketNo
266 nextno <- readTVar (seqno pktoInPQ)
267 pktoToWire getExtra nextno be pktno msg
268
269-- | Convert a message to packet format and append it to the front of a queue
270-- used for outgoing messages. (Note that ‘front‛ usually means the higher
271-- index in this implementation.)
272tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (OutGoingResult wire)
273tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg
274 = do
275 be <- readTVar (buffend pktoOutPQ)
276 let i = be `mod` (qsize pktoOutPQ)
277 let arrayEmpty :: MArray a e m => a Word32 e -> m Bool
278 arrayEmpty ar = do (lowB,highB) <- getBounds ar
279 return $ lowB > highB
280 mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ)
281 if emp then return Nothing
282 else do readArray (pktq pktoOutPQ) i
283 pktno <- readTVar pktoPacketNo
284 nextno <- readTVar (seqno pktoInPQ)
285 mbWire <- pktoToWire getExtra nextno be pktno msg
286 -- TODO all the above lines ^^ can be replaced with call to peekPacket
287 case mbWire of
288 Just (pkt,pktno')
289 -> case mbPkt of
290 -- slot is free, insert element
291 Nothing -> do
292 modifyTVar' (buffend pktoOutPQ) (+1)
293 writeTVar pktoPacketNo $! pktno'
294 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
295 return (OGSuccess pkt)
296 -- queue is full
297 Just (n,_) -> do
298 nn <- getHighestHandledPacketPlus1 q
299 if (n < nn)
300 -- but we can overwrite an old packet
301 then do
302 modifyTVar' (buffend pktoOutPQ) (+1)
303 writeTVar pktoPacketNo $! pktno'
304 writeArray (pktq pktoOutPQ) i (Just (pktno,pkt))
305 return (OGSuccess pkt)
306 -- uh oh this packet is still needed...
307 else return OGFull
308 -- don't know how to send this message
309 Nothing -> return OGEncodeFail
310
311dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire)
312dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do
313 i0 <- readTVar seqno
314 let i = i0 `mod` qsize
315 x <- maybe retry return =<< readArray pktq i
316 -- writeArray pktq i Nothing -- not cleaning
317 modifyTVar' seqno succ
318 return x
319
320getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32
321getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ)