summaryrefslogtreecommitdiff
path: root/src/Data
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2018-05-30 06:38:54 +0000
committerJames Crayne <jim.crayne@gmail.com>2018-05-30 06:54:35 +0000
commit451a6eeec18b80f06378acba82e2b2dc56275188 (patch)
tree36bf101723dee46d5883ffe60778c63d00f6f1cf /src/Data
parentf315180cdc6c8a4f9790839b726b26b575295cf0 (diff)
Data.CyclicBuffer, wip
Diffstat (limited to 'src/Data')
-rw-r--r--src/Data/CyclicBuffer.hs100
-rw-r--r--src/Data/PacketQueue.hs4
2 files changed, 102 insertions, 2 deletions
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs
new file mode 100644
index 00000000..ab022e3f
--- /dev/null
+++ b/src/Data/CyclicBuffer.hs
@@ -0,0 +1,100 @@
1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE FlexibleContexts #-}
3module Data.CyclicBuffer where
4
5import Control.Concurrent.STM
6import Control.Concurrent.STM.TArray
7import Control.Monad
8import Control.Applicative
9import Data.Word
10import Data.Array.MArray
11import Data.Maybe
12
13data CyclicBuffer a = CyclicBuffer
14 { vwflgs :: TArray Word32 Bool -- TODO: Use TVar. TArray Word32 (TVar Bool)
15 -- This would allow updating by external code.
16 -- The TVar could be returned from dequeue
17 , pktq :: TArray Word32 (Maybe a)
18 , seqno :: TVar Word32
19 , qsize :: Word32
20 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
21 , dropCnt :: TVar Word32
22 }
23
24cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)]
25cyclicBufferViewList p = do
26 let f (n,Nothing) = Nothing
27 f (n,Just x) = Just (n,x)
28 catMaybes . map f <$> getAssocs (pktq p)
29
30getCapacity :: Applicative m => CyclicBuffer t -> m Word32
31getCapacity (CyclicBuffer { qsize }) = pure qsize
32
33-- | Create a new CyclicBuffer with Overwrite on Wrap.
34new :: Word32 -- ^ Capacity of queue.
35 -> Word32 -- ^ Initial sequence number.
36 -> STM (CyclicBuffer a)
37new capacity seqstart = do
38 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
39 q <- newArray (0,cap - 1) Nothing
40 flgs <- newArray (0,cap - 1) False
41 seqv <- newTVar seqstart
42 bufe <- newTVar 0
43 dropped <- newTVar 0
44 return CyclicBuffer
45 { vwflgs = flgs
46 , pktq = q
47 , seqno = seqv
48 , qsize = cap
49 , buffend = bufe
50 , dropCnt = dropped
51 }
52
53observeOutOfBand :: CyclicBuffer a -> Word32-> STM ()
54observeOutOfBand CyclicBuffer { seqno, qsize, buffend } no = do
55 low <- readTVar seqno
56 let proj = no - low
57 -- Ignore packet if out of range.
58 when ( proj < qsize) $ do
59 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
60
61
62-- | Retry until the next expected packet is enqueued. Then return it.
63dequeue :: CyclicBuffer a -> STM a
64dequeue CyclicBuffer { pktq, seqno, qsize } = do
65 i0 <- readTVar seqno
66 let i = i0 `mod` qsize
67 x <- maybe retry return =<< readArray pktq i
68 writeArray pktq i Nothing
69 modifyTVar' seqno succ
70 return x
71
72-- | Like dequeue, but just marks as viewed rather than removing
73markButNotDequeue :: CyclicBuffer a -> STM a
74markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do
75 i0 <- readTVar seqno
76 let i = i0 `mod` qsize
77 x <- maybe retry return =<< readArray pktq i
78 writeArray vwflgs i True
79 modifyTVar' seqno succ
80 return x
81
82-- | Enqueue a packet. Packets need not be enqueued in order as long as there
83-- is spare capacity in the queue. If the capacity is exceeded, packets are
84-- dropped and the drop count increased accordingly.
85enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue)
86 -> Word32 -- ^ Sequence number of the packet.
87 -> a -- ^ The packet.
88 -> STM ()
89enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do
90 low <- readTVar seqno
91 let proj = no - low
92 let i = no `mod` qsize
93 when (proj >= qsize) $ do
94 viewed <- readArray vwflgs i
95 when (not viewed) $
96 modifyTVar' dropCnt (+1)
97 writeArray pktq i (Just x)
98 writeArray vwflgs i False -- mark as not viewed
99 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
100 return ()
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index 4f0f04e3..144c8de0 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -37,7 +37,7 @@ data PacketQueue a = PacketQueue
37 , seqno :: TVar Word32 37 , seqno :: TVar Word32
38 , qsize :: Word32 38 , qsize :: Word32
39 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 39 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
40 , qOverWriteMode :: Bool 40 , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead.
41 } 41 }
42 42
43packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] 43packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
@@ -102,7 +102,7 @@ dequeue PacketQueue { pktq, seqno, qsize } = do
102 modifyTVar' seqno succ 102 modifyTVar' seqno succ
103 return x 103 return x
104 104
105-- | Like dequeue, but marks as handled rather than removing 105-- | Like dequeue, but marks as viewed rather than removing
106markButNotDequeue :: PacketQueue (Bool,a) -> STM a 106markButNotDequeue :: PacketQueue (Bool,a) -> STM a
107markButNotDequeue PacketQueue { pktq, seqno, qsize } = do 107markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
108 i0 <- readTVar seqno 108 i0 <- readTVar seqno