summaryrefslogtreecommitdiff
path: root/src/Data/PacketQueue.hs
diff options
context:
space:
mode:
authorjim@bo <jim@bo>2018-06-21 04:26:17 -0400
committerjim@bo <jim@bo>2018-06-21 04:26:17 -0400
commitd0cad3e46bc798dfc9e5ef2f4483d9e637bf8a67 (patch)
treeb2758e3bf8d4a64d3bde2398f7c8c99e9f362ff9 /src/Data/PacketQueue.hs
parentb220a9c00c8b5a287cb255de62faa2994bb6a56e (diff)
update PacketQueue in preparation for request thread
Diffstat (limited to 'src/Data/PacketQueue.hs')
-rw-r--r--src/Data/PacketQueue.hs26
1 files changed, 15 insertions, 11 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index 1e79b851..57845ae5 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -10,6 +10,7 @@ module Data.PacketQueue
10 , new 10 , new
11 , newOverwrite 11 , newOverwrite
12 , dequeue 12 , dequeue
13 , dequeueOrGetMissing
13 , markButNotDequeue 14 , markButNotDequeue
14 , enqueue 15 , enqueue
15 , observeOutOfBand 16 , observeOutOfBand
@@ -36,7 +37,7 @@ import Debug.Trace
36 37
37data PacketQueue a = PacketQueue 38data PacketQueue a = PacketQueue
38 { pktq :: TArray Word32 (Maybe a) 39 { pktq :: TArray Word32 (Maybe a)
39 , seqno :: TVar Word32 40 , seqno :: TVar Word32 -- (buffer_start)
40 , qsize :: Word32 41 , qsize :: Word32
41 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1 42 , buffend :: TVar Word32 -- on incoming, next packet they'll send + 1
42 , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. 43 , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead.
@@ -85,28 +86,31 @@ newOverwrite capacity seqstart = do
85 , qOverWriteMode = True 86 , qOverWriteMode = True
86 } 87 }
87 88
89-- | Update the packet queue given:
90-- * packet queue
91-- * the number of next lossless packet they intend to send you
88observeOutOfBand :: PacketQueue a -> Word32-> STM () 92observeOutOfBand :: PacketQueue a -> Word32-> STM ()
89observeOutOfBand PacketQueue { seqno, qsize, buffend } no = do 93observeOutOfBand PacketQueue { seqno, qsize, buffend } numberOfNextLosslessPacketThatTheyWillSend = do
90 low <- readTVar seqno 94 low <- readTVar seqno
91 let proj = no - low 95 let proj = numberOfNextLosslessPacketThatTheyWillSend - low
92 -- Ignore packet if out of range. 96 -- Ignore packet if out of range.
93 when ( proj < qsize) $ do 97 when ( proj < qsize) $ do
94 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 98 modifyTVar' buffend (\be -> if be - low <= proj then numberOfNextLosslessPacketThatTheyWillSend + 1 else be)
95 99
96-- | If buffend < seqno then return expected packet numbers for all 100-- | If seqno < buffend then return expected packet numbers for all
97-- the Nothings in the array between them. 101-- the Nothings in the array between them.
98-- Otherwise, behave as 'dequeue' would. 102-- Otherwise, behave as 'dequeue' would.
99dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a) 103dequeueOrGetMissing :: PacketQueue a -> STM (Either [Word32] a)
100dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do 104dequeueOrGetMissing PacketQueue { pktq, seqno, qsize, buffend } = do
101 i0 <- readTVar seqno 105 seqno0 <- readTVar seqno
102 be <- readTVar buffend 106 buffend0 <- readTVar buffend
103 if i0 < be 107 if seqno0 < buffend0
104 then do 108 then do
105 maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ be .. i0 ]) 109 maybes <- mapM (readArray pktq) (take (fromIntegral qsize) $ map (`mod` qsize) [ buffend0 .. seqno0 ])
106 let nums = map fst . filter (isNothing . snd) $ zip [be ..] maybes 110 let nums = map fst . filter (isNothing . snd) $ zip [buffend0 ..] maybes
107 return (Left nums) 111 return (Left nums)
108 else do 112 else do
109 let i = i0 `mod` qsize 113 let i = seqno0 `mod` qsize
110 x <- maybe retry return =<< readArray pktq i 114 x <- maybe retry return =<< readArray pktq i
111 writeArray pktq i Nothing 115 writeArray pktq i Nothing
112 modifyTVar' seqno succ 116 modifyTVar' seqno succ