summaryrefslogtreecommitdiff
path: root/src/Data
diff options
context:
space:
mode:
Diffstat (limited to 'src/Data')
-rw-r--r--src/Data/CyclicBuffer.hs24
1 files changed, 20 insertions, 4 deletions
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs
index ab022e3f..0cc87459 100644
--- a/src/Data/CyclicBuffer.hs
+++ b/src/Data/CyclicBuffer.hs
@@ -1,6 +1,7 @@
1{-# LANGUAGE NamedFieldPuns #-} 1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE FlexibleContexts #-} 2{-# LANGUAGE FlexibleContexts #-}
3module Data.CyclicBuffer where 3module Data.CyclicBuffer {- TODO: export list -} where
4
4 5
5import Control.Concurrent.STM 6import Control.Concurrent.STM
6import Control.Concurrent.STM.TArray 7import Control.Concurrent.STM.TArray
@@ -19,6 +20,7 @@ data CyclicBuffer a = CyclicBuffer
19 , qsize :: Word32 20 , qsize :: Word32
20 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 21 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
21 , dropCnt :: TVar Word32 22 , dropCnt :: TVar Word32
23 , totalCnt :: TVar Word32
22 } 24 }
23 25
24cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] 26cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)]
@@ -30,6 +32,15 @@ cyclicBufferViewList p = do
30getCapacity :: Applicative m => CyclicBuffer t -> m Word32 32getCapacity :: Applicative m => CyclicBuffer t -> m Word32
31getCapacity (CyclicBuffer { qsize }) = pure qsize 33getCapacity (CyclicBuffer { qsize }) = pure qsize
32 34
35getTotal :: CyclicBuffer t -> STM Word32
36getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt
37
38getDropped :: CyclicBuffer t -> STM Word32
39getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt
40
41getNextSequenceNum :: CyclicBuffer t -> STM Word32
42getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno
43
33-- | Create a new CyclicBuffer with Overwrite on Wrap. 44-- | Create a new CyclicBuffer with Overwrite on Wrap.
34new :: Word32 -- ^ Capacity of queue. 45new :: Word32 -- ^ Capacity of queue.
35 -> Word32 -- ^ Initial sequence number. 46 -> Word32 -- ^ Initial sequence number.
@@ -41,6 +52,7 @@ new capacity seqstart = do
41 seqv <- newTVar seqstart 52 seqv <- newTVar seqstart
42 bufe <- newTVar 0 53 bufe <- newTVar 0
43 dropped <- newTVar 0 54 dropped <- newTVar 0
55 total <- newTVar 0
44 return CyclicBuffer 56 return CyclicBuffer
45 { vwflgs = flgs 57 { vwflgs = flgs
46 , pktq = q 58 , pktq = q
@@ -48,6 +60,7 @@ new capacity seqstart = do
48 , qsize = cap 60 , qsize = cap
49 , buffend = bufe 61 , buffend = bufe
50 , dropCnt = dropped 62 , dropCnt = dropped
63 , totalCnt = total
51 } 64 }
52 65
53observeOutOfBand :: CyclicBuffer a -> Word32-> STM () 66observeOutOfBand :: CyclicBuffer a -> Word32-> STM ()
@@ -79,14 +92,15 @@ markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do
79 modifyTVar' seqno succ 92 modifyTVar' seqno succ
80 return x 93 return x
81 94
82-- | Enqueue a packet. Packets need not be enqueued in order as long as there 95-- | Enqueue a packet. If the capacity is exceeded, packets are
83-- is spare capacity in the queue. If the capacity is exceeded, packets are
84-- dropped and the drop count increased accordingly. 96-- dropped and the drop count increased accordingly.
97-- TODO: We no longer really support "out of order"
98-- So perhaps drop the num parameter
85enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) 99enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue)
86 -> Word32 -- ^ Sequence number of the packet. 100 -> Word32 -- ^ Sequence number of the packet.
87 -> a -- ^ The packet. 101 -> a -- ^ The packet.
88 -> STM () 102 -> STM ()
89enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do 103enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do
90 low <- readTVar seqno 104 low <- readTVar seqno
91 let proj = no - low 105 let proj = no - low
92 let i = no `mod` qsize 106 let i = no `mod` qsize
@@ -96,5 +110,7 @@ enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do
96 modifyTVar' dropCnt (+1) 110 modifyTVar' dropCnt (+1)
97 writeArray pktq i (Just x) 111 writeArray pktq i (Just x)
98 writeArray vwflgs i False -- mark as not viewed 112 writeArray vwflgs i False -- mark as not viewed
113 modifyTVar' totalCnt (+1)
114 writeTVar seqno (no+1)
99 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 115 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
100 return () 116 return ()