{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE FlexibleContexts #-} module Data.CyclicBuffer {- TODO: export list -} where import Control.Concurrent.STM import Control.Concurrent.STM.TArray import Control.Monad import Control.Applicative import Data.Word import Data.Array.MArray import Data.Maybe data CyclicBuffer a = CyclicBuffer { vwflgs :: TArray Word32 Bool -- TODO: Use TVar. TArray Word32 (TVar Bool) -- This would allow updating by external code. -- The TVar could be returned from dequeue , pktq :: TArray Word32 (Maybe a) , seqno :: TVar Word32 , qsize :: Word32 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 , dropCnt :: TVar Word32 , totalCnt :: TVar Word32 } cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] cyclicBufferViewList p = do let f (n,Nothing) = Nothing f (n,Just x) = Just (n,x) catMaybes . map f <$> getAssocs (pktq p) getCapacity :: Applicative m => CyclicBuffer t -> m Word32 getCapacity (CyclicBuffer { qsize }) = pure qsize getTotal :: CyclicBuffer t -> STM Word32 getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt getDropped :: CyclicBuffer t -> STM Word32 getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt getNextSequenceNum :: CyclicBuffer t -> STM Word32 getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno -- | Create a new CyclicBuffer with Overwrite on Wrap. new :: Word32 -- ^ Capacity of queue. -> Word32 -- ^ Initial sequence number. -> STM (CyclicBuffer a) new capacity seqstart = do let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 q <- newArray (0,cap - 1) Nothing flgs <- newArray (0,cap - 1) False seqv <- newTVar seqstart bufe <- newTVar 0 dropped <- newTVar 0 total <- newTVar 0 return CyclicBuffer { vwflgs = flgs , pktq = q , seqno = seqv , qsize = cap , buffend = bufe , dropCnt = dropped , totalCnt = total } observeOutOfBand :: CyclicBuffer a -> Word32-> STM () observeOutOfBand CyclicBuffer { seqno, qsize, buffend } no = do low <- readTVar seqno let proj = no - low -- Ignore packet if out of range. when ( proj < qsize) $ do modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) -- | Retry until the next expected packet is enqueued. Then return it. dequeue :: CyclicBuffer a -> STM a dequeue CyclicBuffer { pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i writeArray pktq i Nothing modifyTVar' seqno succ return x -- | Like dequeue, but just marks as viewed rather than removing markButNotDequeue :: CyclicBuffer a -> STM a markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do i0 <- readTVar seqno let i = i0 `mod` qsize x <- maybe retry return =<< readArray pktq i writeArray vwflgs i True modifyTVar' seqno succ return x -- | Enqueue a packet. If the capacity is exceeded, packets are -- dropped and the drop count increased accordingly. -- TODO: We no longer really support "out of order" -- So perhaps drop the num parameter enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) -> Word32 -- ^ Sequence number of the packet. -> a -- ^ The packet. -> STM () enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do low <- readTVar seqno let proj = no - low let i = no `mod` qsize when (proj >= qsize) $ do viewed <- readArray vwflgs i when (not viewed) $ modifyTVar' dropCnt (+1) writeArray pktq i (Just x) writeArray vwflgs i False -- mark as not viewed modifyTVar' totalCnt (+1) writeTVar seqno (no+1) modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) return ()