From 8fe574a8f1f2f71b68c89aeacccd05d084a4003d Mon Sep 17 00:00:00 2001 From: James Crayne Date: Mon, 28 May 2018 19:54:59 +0000 Subject: ncLastNMsgs is now CyclicBuffer type --- src/Data/CyclicBuffer.hs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) (limited to 'src/Data/CyclicBuffer.hs') diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs index ab022e3f..0cc87459 100644 --- a/src/Data/CyclicBuffer.hs +++ b/src/Data/CyclicBuffer.hs @@ -1,6 +1,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE FlexibleContexts #-} -module Data.CyclicBuffer where +module Data.CyclicBuffer {- TODO: export list -} where + import Control.Concurrent.STM import Control.Concurrent.STM.TArray @@ -19,6 +20,7 @@ data CyclicBuffer a = CyclicBuffer , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 , dropCnt :: TVar Word32 + , totalCnt :: TVar Word32 } cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] @@ -30,6 +32,15 @@ cyclicBufferViewList p = do getCapacity :: Applicative m => CyclicBuffer t -> m Word32 getCapacity (CyclicBuffer { qsize }) = pure qsize +getTotal :: CyclicBuffer t -> STM Word32 +getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt + +getDropped :: CyclicBuffer t -> STM Word32 +getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt + +getNextSequenceNum :: CyclicBuffer t -> STM Word32 +getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno + -- | Create a new CyclicBuffer with Overwrite on Wrap. new :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. @@ -41,6 +52,7 @@ new capacity seqstart = do seqv <- newTVar seqstart bufe <- newTVar 0 dropped <- newTVar 0 + total <- newTVar 0 return CyclicBuffer { vwflgs = flgs , pktq = q @@ -48,6 +60,7 @@ new capacity seqstart = do , qsize = cap , buffend = bufe , dropCnt = dropped + , totalCnt = total } observeOutOfBand :: CyclicBuffer a -> Word32-> STM () @@ -79,14 +92,15 @@ markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do modifyTVar' seqno succ return x --- | Enqueue a packet. Packets need not be enqueued in order as long as there --- is spare capacity in the queue. If the capacity is exceeded, packets are +-- | Enqueue a packet. If the capacity is exceeded, packets are -- dropped and the drop count increased accordingly. +-- TODO: We no longer really support "out of order" +-- So perhaps drop the num parameter enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) -> Word32 -- ^ Sequence number of the packet. -> a -- ^ The packet. -> STM () -enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do +enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do low <- readTVar seqno let proj = no - low let i = no `mod` qsize @@ -96,5 +110,7 @@ enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do modifyTVar' dropCnt (+1) writeArray pktq i (Just x) writeArray vwflgs i False -- mark as not viewed + modifyTVar' totalCnt (+1) + writeTVar seqno (no+1) modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) return () -- cgit v1.2.3