diff options
author | Joe Crayne <joe@jerkface.net> | 2018-08-17 05:05:17 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-08-17 16:06:38 -0400 |
commit | fde5005a2d1ef3a0636cff21547d4cda22b7b215 (patch) | |
tree | 1263b8d66cbcc838432afd6cc5cb122d9c4c064b /src/Data/PacketQueue.hs | |
parent | f4dd948176187f5fb46a2cf0dbfbfc4c32badfa5 (diff) |
Simplified PacketQueue/PacketBuffer interface.
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r-- | src/Data/PacketQueue.hs | 312 |
1 files changed, 99 insertions, 213 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 6f997ac0..c5d5ac09 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -3,30 +3,22 @@ | |||
3 | -- be stored out of order, but from which they are extracted in the proper | 3 | -- be stored out of order, but from which they are extracted in the proper |
4 | -- sequence. | 4 | -- sequence. |
5 | {-# LANGUAGE NamedFieldPuns #-} | 5 | {-# LANGUAGE NamedFieldPuns #-} |
6 | {-# LANGUAGE FlexibleContexts #-} | ||
7 | module Data.PacketQueue | 6 | module Data.PacketQueue |
8 | ( PacketQueue | 7 | ( PacketQueue |
9 | , getCapacity | 8 | , getCapacity |
10 | , getLastDequeuedPlus1 | 9 | , getLastDequeuedPlus1 |
10 | , getLastEnqueuedPlus1 | ||
11 | , new | 11 | , new |
12 | , dequeue | 12 | , dequeue |
13 | , dropPacketsBefore | ||
13 | , getMissing | 14 | , getMissing |
14 | , dequeueOrGetMissing | 15 | -- , dequeueOrGetMissing |
15 | , markButNotDequeue | 16 | -- , markButNotDequeue |
16 | , enqueue | 17 | , enqueue |
17 | , observeOutOfBand | 18 | , observeOutOfBand |
18 | , PacketOutQueue | ||
19 | , packetQueueViewList | 19 | , packetQueueViewList |
20 | , newOutGoing | 20 | -- , mapQ |
21 | , readyOutGoing | 21 | , Data.PacketQueue.lookup |
22 | , toPNums | ||
23 | , getRequested | ||
24 | , peekPacket | ||
25 | , tryAppendQueueOutgoing | ||
26 | , dequeueOutgoing | ||
27 | , getHighestHandledPacketPlus1 | ||
28 | , mapOutGoing | ||
29 | , OutGoingResult(..) | ||
30 | ) where | 22 | ) where |
31 | 23 | ||
32 | import Control.Concurrent.STM | 24 | import Control.Concurrent.STM |
@@ -34,24 +26,35 @@ import Control.Monad | |||
34 | import Data.Word | 26 | import Data.Word |
35 | import Data.Array.MArray | 27 | import Data.Array.MArray |
36 | import Data.Maybe | 28 | import Data.Maybe |
37 | import DPut | ||
38 | 29 | ||
39 | data PacketQueue a = PacketQueue | 30 | data PacketQueue a = PacketQueue |
40 | { pktq :: TArray Word32 (Maybe a) | 31 | { pktq :: TArray Word32 (Maybe a) |
41 | , seqno :: TVar Word32 -- (buffer_start) | 32 | , seqno :: TVar Word32 -- (buffer_start) |
42 | , qsize :: Word32 | 33 | , qsize :: Word32 |
43 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 | 34 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 |
35 | -- i.e. one more than the largest seen sequence number. | ||
44 | } | 36 | } |
45 | 37 | ||
38 | -- | Obtain a list of non-empty slots in the 'PacketQueue'. The numeric value | ||
39 | -- is an index into the underlying array, not a sequence number. | ||
46 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] | 40 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] |
47 | packetQueueViewList p = do | 41 | packetQueueViewList p = do |
48 | let f (n,Nothing) = Nothing | 42 | let f (n,Nothing) = Nothing |
49 | f (n,Just x) = Just (n,x) | 43 | f (n,Just x) = Just (n,x) |
50 | catMaybes . map f <$> getAssocs (pktq p) | 44 | catMaybes . map f <$> getAssocs (pktq p) |
51 | 45 | ||
46 | -- | This returns the earliest sequence number with a slot in the queue. | ||
52 | getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 | 47 | getLastDequeuedPlus1 :: PacketQueue a -> STM Word32 |
53 | getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno | 48 | getLastDequeuedPlus1 PacketQueue {seqno} = readTVar seqno |
54 | 49 | ||
50 | -- | This returns the least upper bound of sequence numbers that have been | ||
51 | -- enqueued. | ||
52 | getLastEnqueuedPlus1 :: PacketQueue a -> STM Word32 | ||
53 | getLastEnqueuedPlus1 PacketQueue {buffend} = readTVar buffend | ||
54 | |||
55 | |||
56 | -- | This is the number of consequetive sequence numbers, starting at | ||
57 | -- 'getLastDequeuedPlus1' that can be stored in the queue | ||
55 | getCapacity :: Applicative m => PacketQueue t -> m Word32 | 58 | getCapacity :: Applicative m => PacketQueue t -> m Word32 |
56 | getCapacity (PacketQueue { qsize }) = pure qsize | 59 | getCapacity (PacketQueue { qsize }) = pure qsize |
57 | 60 | ||
@@ -63,7 +66,7 @@ new capacity seqstart = do | |||
63 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 | 66 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 |
64 | q <- newArray (0,cap - 1) Nothing | 67 | q <- newArray (0,cap - 1) Nothing |
65 | seqv <- newTVar seqstart | 68 | seqv <- newTVar seqstart |
66 | bufe <- newTVar 0 | 69 | bufe <- newTVar seqstart |
67 | return PacketQueue | 70 | return PacketQueue |
68 | { pktq = q | 71 | { pktq = q |
69 | , seqno = seqv | 72 | , seqno = seqv |
@@ -72,8 +75,13 @@ new capacity seqstart = do | |||
72 | } | 75 | } |
73 | 76 | ||
74 | -- | Update the packet queue given: | 77 | -- | Update the packet queue given: |
78 | -- | ||
75 | -- * packet queue | 79 | -- * packet queue |
80 | -- | ||
76 | -- * the number of next lossless packet they intend to send you | 81 | -- * the number of next lossless packet they intend to send you |
82 | -- | ||
83 | -- This behaves exactly like 'enqueue' except that no packet data is written to | ||
84 | -- the queue. | ||
77 | observeOutOfBand :: PacketQueue a -> Word32-> STM () | 85 | observeOutOfBand :: PacketQueue a -> Word32-> STM () |
78 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do | 86 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do |
79 | low <- readTVar seqno | 87 | low <- readTVar seqno |
@@ -85,37 +93,35 @@ observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacke | |||
85 | -- | If seqno < buffend then return expected packet numbers for all | 93 | -- | If seqno < buffend then return expected packet numbers for all |
86 | -- the Nothings in the array between them. | 94 | -- the Nothings in the array between them. |
87 | -- Otherwise, return empty list. | 95 | -- Otherwise, return empty list. |
88 | getMissing :: Show a => PacketQueue a -> STM [Word32] | 96 | getMissing :: PacketQueue a -> STM [Word32] |
89 | getMissing PacketQueue { pktq, seqno, qsize, buffend } = do | 97 | getMissing PacketQueue { pktq, seqno, qsize, buffend } = do |
90 | seqno0 <- readTVar seqno | 98 | seqno0 <- readTVar seqno |
91 | buffend0 <- readTVar buffend | 99 | buffend0 <- readTVar buffend |
92 | -- note relying on fact that [ b .. a ] is null when a < b | 100 | -- note relying on fact that [ b .. a ] is null when a < b |
93 | let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] | 101 | let indices = take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 - 1] |
94 | -- tput XNetCrypto $ "(netCRYPTO getMissing indices: " ++ show indices | ||
95 | maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices | 102 | maybes <- mapM (\i -> do {x <- readArray pktq i; return (i,x)}) indices |
96 | tput XNetCrypto $ "(netCRYPTO) getMissing: (" ++ show seqno0 ++ " " ++ show buffend0 ++") => " ++ show maybes | ||
97 | let nums = map fst . filter (isNothing . snd) $ maybes | 103 | let nums = map fst . filter (isNothing . snd) $ maybes |
98 | return nums | 104 | return nums |
99 | 105 | ||
100 | -- | If seqno < buffend then return expected packet numbers for all | 106 | -- -- | If seqno < buffend then return expected packet numbers for all |
101 | -- the Nothings in the array between them. | 107 | -- -- the Nothings in the array between them. |
102 | -- Otherwise, behave as 'dequeue' would. | 108 | -- -- Otherwise, behave as 'dequeue' would. |
103 | -- TODO: Do we need this function? Delete it if not. | 109 | -- -- TODO: Do we need this function? Delete it if not. |
104 | dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) | 110 | -- dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) |
105 | dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do | 111 | -- dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do |
106 | seqno0 <- readTVar seqno | 112 | -- seqno0 <- readTVar seqno |
107 | buffend0 <- readTVar buffend | 113 | -- buffend0 <- readTVar buffend |
108 | if seqno0 < buffend0 | 114 | -- if seqno0 < buffend0 |
109 | then do | 115 | -- then do |
110 | maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) | 116 | -- maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ seqno0 .. buffend0 ]) |
111 | let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes | 117 | -- let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes |
112 | return (Left nums) | 118 | -- return (Left nums) |
113 | else do | 119 | -- else do |
114 | let i = seqno0 `mod` qsize | 120 | -- let i = seqno0 `mod` qsize |
115 | x <- maybe retry return =<< readArray pktq i | 121 | -- x <- maybe retry return =<< readArray pktq i |
116 | writeArray pktq i Nothing | 122 | -- writeArray pktq i Nothing |
117 | modifyTVar' seqno succ | 123 | -- modifyTVar' seqno succ |
118 | return (Right x) | 124 | -- return (Right x) |
119 | 125 | ||
120 | -- | Retry until the next expected packet is enqueued. Then return it. | 126 | -- | Retry until the next expected packet is enqueued. Then return it. |
121 | dequeue :: PacketQueue a -> STM a | 127 | dequeue :: PacketQueue a -> STM a |
@@ -127,20 +133,46 @@ dequeue PacketQueue { pktq, seqno, qsize } = do | |||
127 | modifyTVar' seqno succ | 133 | modifyTVar' seqno succ |
128 | return x | 134 | return x |
129 | 135 | ||
130 | -- | Like dequeue, but marks as viewed rather than removing | 136 | -- | Drop all packets preceding the given packet number. |
131 | markButNotDequeue :: PacketQueue (Bool,a) -> STM a | 137 | dropPacketsBefore :: PacketQueue a -> Word32 -> STM () |
132 | markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | 138 | dropPacketsBefore PacketQueue{ pktq, seqno, qsize, buffend } no0 = do |
133 | i0 <- readTVar seqno | 139 | low <- readTVar seqno |
134 | let i = i0 `mod` qsize | 140 | let no = no0 - 1 -- possibly negative |
135 | (b,x) <- maybe retry return =<< readArray pktq i | 141 | proj = no - low -- possibly negative |
136 | writeArray pktq i (Just (True,x)) | 142 | if (proj < qsize) |
137 | modifyTVar' seqno succ | 143 | then do |
138 | return x | 144 | -- Clear some, but not all, slots. |
145 | let ilow = low `mod` qsize | ||
146 | i = no `mod` qsize | ||
147 | ranges = if ilow <= i then [[ilow .. i]] | ||
148 | else [[0 .. i],[ilow .. qsize-1]] | ||
149 | writeTVar seqno no | ||
150 | forM_ ranges $ mapM_ $ \i -> writeArray pktq i Nothing | ||
151 | else do | ||
152 | -- Reset to empty queue. | ||
153 | writeTVar seqno no | ||
154 | writeTVar buffend no | ||
155 | (z,n) <- getBounds pktq | ||
156 | forM_ [z .. n] $ \i -> writeArray pktq i Nothing | ||
157 | |||
158 | -- -- | Like dequeue, but marks as viewed rather than removing | ||
159 | -- markButNotDequeue :: PacketQueue (Bool,a) -> STM a | ||
160 | -- markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | ||
161 | -- i0 <- readTVar seqno | ||
162 | -- let i = i0 `mod` qsize | ||
163 | -- (b,x) <- maybe retry return =<< readArray pktq i | ||
164 | -- writeArray pktq i (Just (True,x)) | ||
165 | -- modifyTVar' seqno succ | ||
166 | -- return x | ||
139 | 167 | ||
140 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | 168 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there |
141 | -- is spare capacity in the queue. If there is not, the packet will be | 169 | -- is spare capacity in the queue. If there is not, the packet will be |
142 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, | 170 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, in |
143 | -- in which case, the packets will simply wrap around overwriting the old ones.) | 171 | -- which case, the packets will simply wrap around overwriting the old ones.) |
172 | -- | ||
173 | -- If the packet was enqueued, (0,i) will be retuned where /i/ is the index at | ||
174 | -- which the new packet was stored in the buffer. If the queue was full, the | ||
175 | -- first of the returned pair will be non-zero. | ||
144 | enqueue :: PacketQueue a -- ^ The packet queue. | 176 | enqueue :: PacketQueue a -- ^ The packet queue. |
145 | -> Word32 -- ^ Sequence number of the packet. | 177 | -> Word32 -- ^ Sequence number of the packet. |
146 | -> a -- ^ The packet. | 178 | -> a -- ^ The packet. |
@@ -155,167 +187,21 @@ enqueue PacketQueue{ pktq, seqno, qsize, buffend} no x = do | |||
155 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 187 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) |
156 | return (proj `divMod` qsize) | 188 | return (proj `divMod` qsize) |
157 | 189 | ||
158 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | 190 | -- | Obtain the packet with the given sequence number if it is stored in the |
159 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo | 191 | -- queue, otherwise /Nothing/ is returned without blocking. |
160 | 192 | lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | |
161 | ----------------------------------------------------- | 193 | lookup PacketQueue{ pktq, seqno, qsize } no = do |
162 | -- * PacketOutQueue | 194 | low <- readTVar seqno |
163 | -- | 195 | let proj = no - low |
164 | 196 | if proj < qsize | |
165 | data PacketOutQueue extra msg toWire fromWire = PacketOutQueue | 197 | then let i = no `mod` qsize |
166 | { pktoInPQ :: PacketQueue fromWire -- ^ reference to the incoming 'PacketQueue' | 198 | in readArray pktq i |
167 | , pktoOutPQ :: PacketQueue (Word32,toWire) | 199 | else return Nothing |
168 | , pktoPacketNo :: TVar Word32 | 200 | |
169 | , pktoToWireIO :: IO (STM extra) | 201 | -- -- | For each item in the queue, modify or delete it. |
170 | , pktoToWire :: STM extra | 202 | -- mapQ :: (a -> Maybe a) -> PacketQueue a -> STM () |
171 | -> Word32{-packet number we expect to recieve-} | 203 | -- mapQ f PacketQueue{pktq} = do |
172 | -> Word32{- buffer_end -} | 204 | -- (z,n) <- getBounds pktq |
173 | -> Word32{- packet number -} | 205 | -- forM_ [z .. n] $ \i -> do |
174 | -> msg | 206 | -- e <- readArray pktq i |
175 | -> STM (Maybe (toWire,Word32{-next packet no-})) | 207 | -- writeArray pktq i (e>>=f) |
176 | } | ||
177 | |||
178 | mapOutGoing :: ((Word32,towire) -> Maybe (Word32,towire)) -> PacketOutQueue extra msg towire fromwire -> STM () | ||
179 | mapOutGoing f q@(PacketOutQueue { pktoOutPQ=PacketQueue{ pktq } }) = do | ||
180 | (z,n) <- getBounds pktq | ||
181 | let ff i = do | ||
182 | e <- readArray pktq i | ||
183 | writeArray pktq i (e>>=f) | ||
184 | mapM_ ff [z .. n] | ||
185 | |||
186 | newOutGoing :: PacketQueue fromwire | ||
187 | -- ^ Incoming queue | ||
188 | -> (STM io -> Word32 {-packet number we expect to recieve-} -> Word32{-buffer_end-} -> Word32{-packet number-} -> msg -> STM (Maybe (wire,Word32{-next packet no-}))) | ||
189 | -- ^ toWire callback | ||
190 | -> IO (STM io) | ||
191 | -- ^ io action to get extra parameter | ||
192 | -> Word32 -- ^ packet number of first outgoing packet | ||
193 | -> Word32 -- ^ Capacity of queue. | ||
194 | -> Word32 -- ^ Initial sequence number. | ||
195 | -> STM (PacketOutQueue io msg wire fromwire) | ||
196 | newOutGoing inq towire toWireIO num capacity seqstart = do | ||
197 | outq <- new capacity seqstart | ||
198 | numVar <- newTVar num | ||
199 | return $ PacketOutQueue | ||
200 | { pktoInPQ = inq | ||
201 | , pktoOutPQ = outq | ||
202 | , pktoPacketNo = numVar | ||
203 | , pktoToWireIO = toWireIO | ||
204 | , pktoToWire = towire | ||
205 | } | ||
206 | |||
207 | data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail | ||
208 | deriving (Show) | ||
209 | |||
210 | instance Eq (OutGoingResult a) where | ||
211 | OGSuccess _ == OGSuccess _ = True | ||
212 | OGFull == OGFull = True | ||
213 | OGEncodeFail == OGEncodeFail = True | ||
214 | _ == _ = False | ||
215 | |||
216 | |||
217 | -- | do something in IO before appending to the queue | ||
218 | readyOutGoing :: PacketOutQueue extra msg wire fromwire -> IO (STM extra) | ||
219 | readyOutGoing (PacketOutQueue {pktoToWireIO }) = pktoToWireIO | ||
220 | |||
221 | getRequested :: Show wire => STM extra -> PacketOutQueue extra msg wire fromwire -> Word32 -> [Word8] -> STM [Maybe (Word32,wire)] | ||
222 | getRequested _ _ _ [] = return [] | ||
223 | getRequested getExtra pktoq snum ns = do | ||
224 | let pnums = toPNums snum ns | ||
225 | indices = map toIndex pnums | ||
226 | tput XNetCrypto $ "getRequested: snum = " ++ show snum | ||
227 | tput XNetCrypto $ "getRequested: pnums = " ++ show pnums ++ " indices = " ++ show indices | ||
228 | xs <- packetQueueViewList (pktoOutPQ pktoq) | ||
229 | tput XNetCrypto $ "getRequested viewList -> " | ||
230 | mapM_ (tput XNetCrypto . show) xs | ||
231 | forM indices $ \i -> readArray (pktq $ pktoOutPQ pktoq) i | ||
232 | where | ||
233 | toIndex :: Word32 -> Word32 | ||
234 | toIndex = (`mod` qsize (pktoOutPQ pktoq)) | ||
235 | |||
236 | toPNums :: Word32 -> [Word8] -> [Word32] | ||
237 | toPNums snum ns = reverse . snd $ foldl doOne (snum-1,[]) ns | ||
238 | where | ||
239 | doOne :: (Word32,[Word32]) -> Word8 -> (Word32,[Word32]) | ||
240 | doOne (addend,as) 0 = (addend+255,as) | ||
241 | doOne (addend,as) x = let y = fromIntegral x + addend | ||
242 | in (y,y:as) | ||
243 | |||
244 | -- | Makes no modifications to the PacketOutQueue state, useful for lossy packets | ||
245 | peekPacket :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (Maybe (wire,Word32)) | ||
246 | peekPacket getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg | ||
247 | = do | ||
248 | be <- readTVar (buffend pktoOutPQ) | ||
249 | let i = be `mod` (qsize pktoOutPQ) | ||
250 | let arrayEmpty :: MArray a e m => a Word32 e -> m Bool | ||
251 | arrayEmpty ar = do (lowB,highB) <- getBounds ar | ||
252 | let result= lowB > highB | ||
253 | tput XNetCrypto | ||
254 | ("arrayEmpty result=" ++ show result | ||
255 | ++ " lowB=" ++ show lowB | ||
256 | ++ " highB = " ++ show highB | ||
257 | ++ " i = " ++ show i) | ||
258 | return result | ||
259 | mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) | ||
260 | if emp then tput XNetCrypto "(peekPacket empty)" >> return Nothing | ||
261 | else do tput XNetCrypto "(peekPacket nonempty)" | ||
262 | result <- readArray (pktq pktoOutPQ) i | ||
263 | tput XNetCrypto ("readArray (isJust result)==" ++ show (isJust result)) | ||
264 | return result | ||
265 | pktno <- readTVar pktoPacketNo | ||
266 | nextno <- readTVar (seqno pktoInPQ) | ||
267 | pktoToWire getExtra nextno be pktno msg | ||
268 | |||
269 | -- | Convert a message to packet format and append it to the front of a queue | ||
270 | -- used for outgoing messages. (Note that ‘front‛ usually means the higher | ||
271 | -- index in this implementation.) | ||
272 | tryAppendQueueOutgoing :: STM extra -> PacketOutQueue extra msg wire fromwire -> msg -> STM (OutGoingResult wire) | ||
273 | tryAppendQueueOutgoing getExtra q@(PacketOutQueue { pktoInPQ, pktoOutPQ, pktoPacketNo, pktoToWireIO, pktoToWire }) msg | ||
274 | = do | ||
275 | be <- readTVar (buffend pktoOutPQ) | ||
276 | let i = be `mod` (qsize pktoOutPQ) | ||
277 | let arrayEmpty :: MArray a e m => a Word32 e -> m Bool | ||
278 | arrayEmpty ar = do (lowB,highB) <- getBounds ar | ||
279 | return $ lowB > highB | ||
280 | mbPkt <- do emp <- arrayEmpty (pktq pktoOutPQ) | ||
281 | if emp then return Nothing | ||
282 | else do readArray (pktq pktoOutPQ) i | ||
283 | pktno <- readTVar pktoPacketNo | ||
284 | nextno <- readTVar (seqno pktoInPQ) | ||
285 | mbWire <- pktoToWire getExtra nextno be pktno msg | ||
286 | -- TODO all the above lines ^^ can be replaced with call to peekPacket | ||
287 | case mbWire of | ||
288 | Just (pkt,pktno') | ||
289 | -> case mbPkt of | ||
290 | -- slot is free, insert element | ||
291 | Nothing -> do | ||
292 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
293 | writeTVar pktoPacketNo $! pktno' | ||
294 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
295 | return (OGSuccess pkt) | ||
296 | -- queue is full | ||
297 | Just (n,_) -> do | ||
298 | nn <- getHighestHandledPacketPlus1 q | ||
299 | if (n < nn) | ||
300 | -- but we can overwrite an old packet | ||
301 | then do | ||
302 | modifyTVar' (buffend pktoOutPQ) (+1) | ||
303 | writeTVar pktoPacketNo $! pktno' | ||
304 | writeArray (pktq pktoOutPQ) i (Just (pktno,pkt)) | ||
305 | return (OGSuccess pkt) | ||
306 | -- uh oh this packet is still needed... | ||
307 | else return OGFull | ||
308 | -- don't know how to send this message | ||
309 | Nothing -> return OGEncodeFail | ||
310 | |||
311 | dequeueOutgoing :: PacketOutQueue extra msg wire fromwire -> STM (Word32,wire) | ||
312 | dequeueOutgoing (PacketOutQueue {pktoOutPQ=PacketQueue { pktq, seqno, qsize }}) = do | ||
313 | i0 <- readTVar seqno | ||
314 | let i = i0 `mod` qsize | ||
315 | x <- maybe retry return =<< readArray pktq i | ||
316 | -- writeArray pktq i Nothing -- not cleaning | ||
317 | modifyTVar' seqno succ | ||
318 | return x | ||
319 | |||
320 | getHighestHandledPacketPlus1 :: PacketOutQueue extra msg wire fromwire -> STM Word32 | ||
321 | getHighestHandledPacketPlus1 (PacketOutQueue { pktoInPQ }) = readTVar (buffend pktoInPQ) | ||