diff options
Diffstat (limited to 'src/Data')
-rw-r--r-- | src/Data/CyclicBuffer.hs | 24 |
1 files changed, 20 insertions, 4 deletions
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs index ab022e3f..0cc87459 100644 --- a/src/Data/CyclicBuffer.hs +++ b/src/Data/CyclicBuffer.hs | |||
@@ -1,6 +1,7 @@ | |||
1 | {-# LANGUAGE NamedFieldPuns #-} | 1 | {-# LANGUAGE NamedFieldPuns #-} |
2 | {-# LANGUAGE FlexibleContexts #-} | 2 | {-# LANGUAGE FlexibleContexts #-} |
3 | module Data.CyclicBuffer where | 3 | module Data.CyclicBuffer {- TODO: export list -} where |
4 | |||
4 | 5 | ||
5 | import Control.Concurrent.STM | 6 | import Control.Concurrent.STM |
6 | import Control.Concurrent.STM.TArray | 7 | import Control.Concurrent.STM.TArray |
@@ -19,6 +20,7 @@ data CyclicBuffer a = CyclicBuffer | |||
19 | , qsize :: Word32 | 20 | , qsize :: Word32 |
20 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 | 21 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 |
21 | , dropCnt :: TVar Word32 | 22 | , dropCnt :: TVar Word32 |
23 | , totalCnt :: TVar Word32 | ||
22 | } | 24 | } |
23 | 25 | ||
24 | cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] | 26 | cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] |
@@ -30,6 +32,15 @@ cyclicBufferViewList p = do | |||
30 | getCapacity :: Applicative m => CyclicBuffer t -> m Word32 | 32 | getCapacity :: Applicative m => CyclicBuffer t -> m Word32 |
31 | getCapacity (CyclicBuffer { qsize }) = pure qsize | 33 | getCapacity (CyclicBuffer { qsize }) = pure qsize |
32 | 34 | ||
35 | getTotal :: CyclicBuffer t -> STM Word32 | ||
36 | getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt | ||
37 | |||
38 | getDropped :: CyclicBuffer t -> STM Word32 | ||
39 | getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt | ||
40 | |||
41 | getNextSequenceNum :: CyclicBuffer t -> STM Word32 | ||
42 | getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno | ||
43 | |||
33 | -- | Create a new CyclicBuffer with Overwrite on Wrap. | 44 | -- | Create a new CyclicBuffer with Overwrite on Wrap. |
34 | new :: Word32 -- ^ Capacity of queue. | 45 | new :: Word32 -- ^ Capacity of queue. |
35 | -> Word32 -- ^ Initial sequence number. | 46 | -> Word32 -- ^ Initial sequence number. |
@@ -41,6 +52,7 @@ new capacity seqstart = do | |||
41 | seqv <- newTVar seqstart | 52 | seqv <- newTVar seqstart |
42 | bufe <- newTVar 0 | 53 | bufe <- newTVar 0 |
43 | dropped <- newTVar 0 | 54 | dropped <- newTVar 0 |
55 | total <- newTVar 0 | ||
44 | return CyclicBuffer | 56 | return CyclicBuffer |
45 | { vwflgs = flgs | 57 | { vwflgs = flgs |
46 | , pktq = q | 58 | , pktq = q |
@@ -48,6 +60,7 @@ new capacity seqstart = do | |||
48 | , qsize = cap | 60 | , qsize = cap |
49 | , buffend = bufe | 61 | , buffend = bufe |
50 | , dropCnt = dropped | 62 | , dropCnt = dropped |
63 | , totalCnt = total | ||
51 | } | 64 | } |
52 | 65 | ||
53 | observeOutOfBand :: CyclicBuffer a -> Word32-> STM () | 66 | observeOutOfBand :: CyclicBuffer a -> Word32-> STM () |
@@ -79,14 +92,15 @@ markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do | |||
79 | modifyTVar' seqno succ | 92 | modifyTVar' seqno succ |
80 | return x | 93 | return x |
81 | 94 | ||
82 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | 95 | -- | Enqueue a packet. If the capacity is exceeded, packets are |
83 | -- is spare capacity in the queue. If the capacity is exceeded, packets are | ||
84 | -- dropped and the drop count increased accordingly. | 96 | -- dropped and the drop count increased accordingly. |
97 | -- TODO: We no longer really support "out of order" | ||
98 | -- So perhaps drop the num parameter | ||
85 | enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) | 99 | enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) |
86 | -> Word32 -- ^ Sequence number of the packet. | 100 | -> Word32 -- ^ Sequence number of the packet. |
87 | -> a -- ^ The packet. | 101 | -> a -- ^ The packet. |
88 | -> STM () | 102 | -> STM () |
89 | enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do | 103 | enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do |
90 | low <- readTVar seqno | 104 | low <- readTVar seqno |
91 | let proj = no - low | 105 | let proj = no - low |
92 | let i = no `mod` qsize | 106 | let i = no `mod` qsize |
@@ -96,5 +110,7 @@ enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do | |||
96 | modifyTVar' dropCnt (+1) | 110 | modifyTVar' dropCnt (+1) |
97 | writeArray pktq i (Just x) | 111 | writeArray pktq i (Just x) |
98 | writeArray vwflgs i False -- mark as not viewed | 112 | writeArray vwflgs i False -- mark as not viewed |
113 | modifyTVar' totalCnt (+1) | ||
114 | writeTVar seqno (no+1) | ||
99 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 115 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) |
100 | return () | 116 | return () |