diff options
Diffstat (limited to 'src/Data')
-rw-r--r-- | src/Data/PacketQueue.hs | 26 |
1 files changed, 15 insertions, 11 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 1e79b851..57845ae5 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -10,6 +10,7 @@ module Data.PacketQueue | |||
10 | , new | 10 | , new |
11 | , newOverwrite | 11 | , newOverwrite |
12 | , dequeue | 12 | , dequeue |
13 | , dequeueOrGetMissing | ||
13 | , markButNotDequeue | 14 | , markButNotDequeue |
14 | , enqueue | 15 | , enqueue |
15 | , observeOutOfBand | 16 | , observeOutOfBand |
@@ -36,7 +37,7 @@ import Debug.Trace | |||
36 | 37 | ||
37 | data PacketQueue a = PacketQueue | 38 | data PacketQueue a = PacketQueue |
38 | { pktq :: TArray Word32 (Maybe a) | 39 | { pktq :: TArray Word32 (Maybe a) |
39 | , seqno :: TVar Word32 | 40 | , seqno :: TVar Word32 -- (buffer_start) |
40 | , qsize :: Word32 | 41 | , qsize :: Word32 |
41 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 | 42 | , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 |
42 | , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. | 43 | , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. |
@@ -85,28 +86,31 @@ newOverwrite capacity seqstart = do | |||
85 | , qOverWriteMode = True | 86 | , qOverWriteMode = True |
86 | } | 87 | } |
87 | 88 | ||
89 | -- | Update the packet queue given: | ||
90 | -- * packet queue | ||
91 | -- * the number of next lossless packet they intend to send you | ||
88 | observeOutOfBand :: PacketQueue a -> Word32-> STM () | 92 | observeOutOfBand :: PacketQueue a -> Word32-> STM () |
89 | observeOutOfBand PacketQueue { seqno, qsize, buffend } no = do | 93 | observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do |
90 | low <- readTVar seqno | 94 | low <- readTVar seqno |
91 | let proj = no - low | 95 | let proj = numberOfNextLosslessPacketThatTheyWillSend - low |
92 | -- Ignore packet if out of range. | 96 | -- Ignore packet if out of range. |
93 | when ( proj < qsize) $ do | 97 | when ( proj < qsize) $ do |
94 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 98 | modifyTVar' buffend (\be -> if be - low <= proj then numberOfNextLosslessPacketThatTheyWillSend + 1 else be) |
95 | 99 | ||
96 | -- | If buffend < seqno then return expected packet numbers for all | 100 | -- | If seqno < buffend then return expected packet numbers for all |
97 | -- the Nothings in the array between them. | 101 | -- the Nothings in the array between them. |
98 | -- Otherwise, behave as 'dequeue' would. | 102 | -- Otherwise, behave as 'dequeue' would. |
99 | dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) | 103 | dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) |
100 | dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do | 104 | dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do |
101 | i0 <- readTVar seqno | 105 | seqno0 <- readTVar seqno |
102 | be <- readTVar buffend | 106 | buffend0 <- readTVar buffend |
103 | if i0 < be | 107 | if seqno0 < buffend0 |
104 | then do | 108 | then do |
105 | maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ be .. i0 ]) | 109 | maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ buffend0 .. seqno0 ]) |
106 | let nums = map fst . filter (isNothing . snd) $ zip [be ..] maybes | 110 | let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes |
107 | return (Left nums) | 111 | return (Left nums) |
108 | else do | 112 | else do |
109 | let i = i0 `mod` qsize | 113 | let i = seqno0 `mod` qsize |
110 | x <- maybe retry return =<< readArray pktq i | 114 | x <- maybe retry return =<< readArray pktq i |
111 | writeArray pktq i Nothing | 115 | writeArray pktq i Nothing |
112 | modifyTVar' seqno succ | 116 | modifyTVar' seqno succ |