summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Data/PacketQueue.hs51
1 files changed, 44 insertions, 7 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index a4c99cab..4f0f04e3 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -6,8 +6,11 @@
6{-# LANGUAGE FlexibleContexts #-} 6{-# LANGUAGE FlexibleContexts #-}
7module Data.PacketQueue 7module Data.PacketQueue
8 ( PacketQueue 8 ( PacketQueue
9 , getCapacity
9 , new 10 , new
11 , newOverwrite
10 , dequeue 12 , dequeue
13 , markButNotDequeue
11 , enqueue 14 , enqueue
12 , observeOutOfBand 15 , observeOutOfBand
13 , PacketOutQueue 16 , PacketOutQueue
@@ -24,6 +27,7 @@ module Data.PacketQueue
24import Control.Concurrent.STM 27import Control.Concurrent.STM
25import Control.Concurrent.STM.TArray 28import Control.Concurrent.STM.TArray
26import Control.Monad 29import Control.Monad
30import Control.Applicative
27import Data.Word 31import Data.Word
28import Data.Array.MArray 32import Data.Array.MArray
29import Data.Maybe 33import Data.Maybe
@@ -33,6 +37,7 @@ data PacketQueue a = PacketQueue
33 , seqno :: TVar Word32 37 , seqno :: TVar Word32
34 , qsize :: Word32 38 , qsize :: Word32
35 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 39 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
40 , qOverWriteMode :: Bool
36 } 41 }
37 42
38packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] 43packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
@@ -41,6 +46,8 @@ packetQueueViewList p = do
41 f (n,Just x) = Just (n,x) 46 f (n,Just x) = Just (n,x)
42 catMaybes . map f <$> getAssocs (pktq p) 47 catMaybes . map f <$> getAssocs (pktq p)
43 48
49getCapacity :: Applicative m => PacketQueue t -> m Word32
50getCapacity (PacketQueue { qsize }) = pure qsize
44 51
45-- | Create a new PacketQueue. 52-- | Create a new PacketQueue.
46new :: Word32 -- ^ Capacity of queue. 53new :: Word32 -- ^ Capacity of queue.
@@ -56,6 +63,24 @@ new capacity seqstart = do
56 , seqno = seqv 63 , seqno = seqv
57 , qsize = cap 64 , qsize = cap
58 , buffend = bufe 65 , buffend = bufe
66 , qOverWriteMode = False
67 }
68
69-- | Create a new PacketQueue with Overwrite on Wrap.
70newOverwrite :: Word32 -- ^ Capacity of queue.
71 -> Word32 -- ^ Initial sequence number.
72 -> STM (PacketQueue a)
73newOverwrite capacity seqstart = do
74 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
75 q <- newArray (0,cap - 1) Nothing
76 seqv <- newTVar seqstart
77 bufe <- newTVar 0
78 return PacketQueue
79 { pktq = q
80 , seqno = seqv
81 , qsize = cap
82 , buffend = bufe
83 , qOverWriteMode = True
59 } 84 }
60 85
61observeOutOfBand :: PacketQueue a -> Word32-> STM () 86observeOutOfBand :: PacketQueue a -> Word32-> STM ()
@@ -77,21 +102,33 @@ dequeue PacketQueue { pktq, seqno, qsize } = do
77 modifyTVar' seqno succ 102 modifyTVar' seqno succ
78 return x 103 return x
79 104
105-- | Like dequeue, but marks as handled rather than removing
106markButNotDequeue :: PacketQueue (Bool,a) -> STM a
107markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
108 i0 <- readTVar seqno
109 let i = i0 `mod` qsize
110 (b,x) <- maybe retry return =<< readArray pktq i
111 writeArray pktq i (Just (True,x))
112 modifyTVar' seqno succ
113 return x
114
80-- | Enqueue a packet. Packets need not be enqueued in order as long as there 115-- | Enqueue a packet. Packets need not be enqueued in order as long as there
81-- is spare capacity in the queue. If there is not, the packet will be 116-- is spare capacity in the queue. If there is not, the packet will be
82-- silently discarded without blocking. 117-- silently discarded without blocking. (Unless this is an Overwrite-queue,
118-- in which case, the packets will simply wrap around overwriting the old ones.)
83enqueue :: PacketQueue a -- ^ The packet queue. 119enqueue :: PacketQueue a -- ^ The packet queue.
84 -> Word32 -- ^ Sequence number of the packet. 120 -> Word32 -- ^ Sequence number of the packet.
85 -> a -- ^ The packet. 121 -> a -- ^ The packet.
86 -> STM () 122 -> STM (Word32,Word32)
87enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do 123enqueue PacketQueue{ pktq, seqno, qsize, buffend,qOverWriteMode } no x = do
88 low <- readTVar seqno 124 low <- readTVar seqno
89 let proj = no - low 125 let proj = no - low
90 -- Ignore packet if out of range. 126 -- Ignore packet if out of range.
91 when ( proj < qsize) $ do 127 when ( proj < qsize || qOverWriteMode) $ do
92 let i = no `mod` qsize 128 let i = no `mod` qsize
93 writeArray pktq i (Just x) 129 writeArray pktq i (Just x)
94 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 130 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
131 return (proj `divMod` qsize)
95 132
96-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) 133-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
97-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo 134-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo