From 451a6eeec18b80f06378acba82e2b2dc56275188 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Wed, 30 May 2018 06:38:54 +0000 Subject: Data.CyclicBuffer, wip --- src/Data/CyclicBuffer.hs | 100 +++++++++++++++++++++++++++++++++++++++++++++++ src/Data/PacketQueue.hs | 4 +- 2 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 src/Data/CyclicBuffer.hs (limited to 'src/Data') diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs new file mode 100644 index 00000000..ab022e3f --- /dev/null +++ b/src/Data/CyclicBuffer.hs @@ -0,0 +1,100 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE FlexibleContexts #-} +module Data.CyclicBuffer where + +import Control.Concurrent.STM +import Control.Concurrent.STM.TArray +import Control.Monad +import Control.Applicative +import Data.Word +import Data.Array.MArray +import Data.Maybe + +data CyclicBuffer a = CyclicBuffer + { vwflgs :: TArray Word32 Bool -- TODO: Use TVar. TArray Word32 (TVar Bool) + -- This would allow updating by external code. + -- The TVar could be returned from dequeue + , pktq :: TArray Word32 (Maybe a) + , seqno :: TVar Word32 + , qsize :: Word32 + , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 + , dropCnt :: TVar Word32 + } + +cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] +cyclicBufferViewList p = do + let f (n,Nothing) = Nothing + f (n,Just x) = Just (n,x) + catMaybes . map f <$> getAssocs (pktq p) + +getCapacity :: Applicative m => CyclicBuffer t -> m Word32 +getCapacity (CyclicBuffer { qsize }) = pure qsize + +-- | Create a new CyclicBuffer with Overwrite on Wrap. +new :: Word32 -- ^ Capacity of queue. + -> Word32 -- ^ Initial sequence number. + -> STM (CyclicBuffer a) +new capacity seqstart = do + let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 + q <- newArray (0,cap - 1) Nothing + flgs <- newArray (0,cap - 1) False + seqv <- newTVar seqstart + bufe <- newTVar 0 + dropped <- newTVar 0 + return CyclicBuffer + { vwflgs = flgs + , pktq = q + , seqno = seqv + , qsize = cap + , buffend = bufe + , dropCnt = dropped + } + +observeOutOfBand :: CyclicBuffer a -> Word32-> STM () +observeOutOfBand CyclicBuffer { seqno, qsize, buffend } no = do + low <- readTVar seqno + let proj = no - low + -- Ignore packet if out of range. + when ( proj < qsize) $ do + modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) + + +-- | Retry until the next expected packet is enqueued. Then return it. +dequeue :: CyclicBuffer a -> STM a +dequeue CyclicBuffer { pktq, seqno, qsize } = do + i0 <- readTVar seqno + let i = i0 `mod` qsize + x <- maybe retry return =<< readArray pktq i + writeArray pktq i Nothing + modifyTVar' seqno succ + return x + +-- | Like dequeue, but just marks as viewed rather than removing +markButNotDequeue :: CyclicBuffer a -> STM a +markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do + i0 <- readTVar seqno + let i = i0 `mod` qsize + x <- maybe retry return =<< readArray pktq i + writeArray vwflgs i True + modifyTVar' seqno succ + return x + +-- | Enqueue a packet. Packets need not be enqueued in order as long as there +-- is spare capacity in the queue. If the capacity is exceeded, packets are +-- dropped and the drop count increased accordingly. +enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) + -> Word32 -- ^ Sequence number of the packet. + -> a -- ^ The packet. + -> STM () +enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do + low <- readTVar seqno + let proj = no - low + let i = no `mod` qsize + when (proj >= qsize) $ do + viewed <- readArray vwflgs i + when (not viewed) $ + modifyTVar' dropCnt (+1) + writeArray pktq i (Just x) + writeArray vwflgs i False -- mark as not viewed + modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) + return () diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 4f0f04e3..144c8de0 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs @@ -37,7 +37,7 @@ data PacketQueue a = PacketQueue , seqno :: TVar Word32 , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 - , qOverWriteMode :: Bool + , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. } packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] @@ -102,7 +102,7 @@ dequeue PacketQueue { pktq, seqno, qsize } = do modifyTVar' seqno succ return x --- | Like dequeue, but marks as handled rather than removing +-- | Like dequeue, but marks as viewed rather than removing markButNotDequeue :: PacketQueue (Bool,a) -> STM a markButNotDequeue PacketQueue { pktq, seqno, qsize } = do i0 <- readTVar seqno -- cgit v1.2.3