diff options
author | James Crayne <jim.crayne@gmail.com> | 2018-05-30 06:38:54 +0000 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2018-05-30 06:54:35 +0000 |
commit | 451a6eeec18b80f06378acba82e2b2dc56275188 (patch) | |
tree | 36bf101723dee46d5883ffe60778c63d00f6f1cf | |
parent | f315180cdc6c8a4f9790839b726b26b575295cf0 (diff) |
Data.CyclicBuffer, wip
-rw-r--r-- | src/Data/CyclicBuffer.hs | 100 | ||||
-rw-r--r-- | src/Data/PacketQueue.hs | 4 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 2 |
3 files changed, 104 insertions, 2 deletions
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs new file mode 100644 index 00000000..ab022e3f --- /dev/null +++ b/src/Data/CyclicBuffer.hs | |||
@@ -0,0 +1,100 @@ | |||
1 | {-# LANGUAGE NamedFieldPuns #-} | ||
2 | {-# LANGUAGE FlexibleContexts #-} | ||
3 | module Data.CyclicBuffer where | ||
4 | |||
5 | import Control.Concurrent.STM | ||
6 | import Control.Concurrent.STM.TArray | ||
7 | import Control.Monad | ||
8 | import Control.Applicative | ||
9 | import Data.Word | ||
10 | import Data.Array.MArray | ||
11 | import Data.Maybe | ||
12 | |||
13 | data CyclicBuffer a = CyclicBuffer | ||
14 | { vwflgs :: TArray Word32 Bool -- TODO: Use TVar. TArray Word32 (TVar Bool) | ||
15 | -- This would allow updating by external code. | ||
16 | -- The TVar could be returned from dequeue | ||
17 | , pktq :: TArray Word32 (Maybe a) | ||
18 | , seqno :: TVar Word32 | ||
19 | , qsize :: Word32 | ||
20 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 | ||
21 | , dropCnt :: TVar Word32 | ||
22 | } | ||
23 | |||
24 | cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] | ||
25 | cyclicBufferViewList p = do | ||
26 | let f (n,Nothing) = Nothing | ||
27 | f (n,Just x) = Just (n,x) | ||
28 | catMaybes . map f <$> getAssocs (pktq p) | ||
29 | |||
30 | getCapacity :: Applicative m => CyclicBuffer t -> m Word32 | ||
31 | getCapacity (CyclicBuffer { qsize }) = pure qsize | ||
32 | |||
33 | -- | Create a new CyclicBuffer with Overwrite on Wrap. | ||
34 | new :: Word32 -- ^ Capacity of queue. | ||
35 | -> Word32 -- ^ Initial sequence number. | ||
36 | -> STM (CyclicBuffer a) | ||
37 | new capacity seqstart = do | ||
38 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 | ||
39 | q <- newArray (0,cap - 1) Nothing | ||
40 | flgs <- newArray (0,cap - 1) False | ||
41 | seqv <- newTVar seqstart | ||
42 | bufe <- newTVar 0 | ||
43 | dropped <- newTVar 0 | ||
44 | return CyclicBuffer | ||
45 | { vwflgs = flgs | ||
46 | , pktq = q | ||
47 | , seqno = seqv | ||
48 | , qsize = cap | ||
49 | , buffend = bufe | ||
50 | , dropCnt = dropped | ||
51 | } | ||
52 | |||
53 | observeOutOfBand :: CyclicBuffer a -> Word32-> STM () | ||
54 | observeOutOfBand CyclicBuffer { seqno, qsize, buffend } no = do | ||
55 | low <- readTVar seqno | ||
56 | let proj = no - low | ||
57 | -- Ignore packet if out of range. | ||
58 | when ( proj < qsize) $ do | ||
59 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | ||
60 | |||
61 | |||
62 | -- | Retry until the next expected packet is enqueued. Then return it. | ||
63 | dequeue :: CyclicBuffer a -> STM a | ||
64 | dequeue CyclicBuffer { pktq, seqno, qsize } = do | ||
65 | i0 <- readTVar seqno | ||
66 | let i = i0 `mod` qsize | ||
67 | x <- maybe retry return =<< readArray pktq i | ||
68 | writeArray pktq i Nothing | ||
69 | modifyTVar' seqno succ | ||
70 | return x | ||
71 | |||
72 | -- | Like dequeue, but just marks as viewed rather than removing | ||
73 | markButNotDequeue :: CyclicBuffer a -> STM a | ||
74 | markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do | ||
75 | i0 <- readTVar seqno | ||
76 | let i = i0 `mod` qsize | ||
77 | x <- maybe retry return =<< readArray pktq i | ||
78 | writeArray vwflgs i True | ||
79 | modifyTVar' seqno succ | ||
80 | return x | ||
81 | |||
82 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | ||
83 | -- is spare capacity in the queue. If the capacity is exceeded, packets are | ||
84 | -- dropped and the drop count increased accordingly. | ||
85 | enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) | ||
86 | -> Word32 -- ^ Sequence number of the packet. | ||
87 | -> a -- ^ The packet. | ||
88 | -> STM () | ||
89 | enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do | ||
90 | low <- readTVar seqno | ||
91 | let proj = no - low | ||
92 | let i = no `mod` qsize | ||
93 | when (proj >= qsize) $ do | ||
94 | viewed <- readArray vwflgs i | ||
95 | when (not viewed) $ | ||
96 | modifyTVar' dropCnt (+1) | ||
97 | writeArray pktq i (Just x) | ||
98 | writeArray vwflgs i False -- mark as not viewed | ||
99 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | ||
100 | return () | ||
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 4f0f04e3..144c8de0 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -37,7 +37,7 @@ data PacketQueue a = PacketQueue | |||
37 | , seqno :: TVar Word32 | 37 | , seqno :: TVar Word32 |
38 | , qsize :: Word32 | 38 | , qsize :: Word32 |
39 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 | 39 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 |
40 | , qOverWriteMode :: Bool | 40 | , qOverWriteMode :: Bool -- TODO: Remove me, use Data.CyclicBuffer instead. |
41 | } | 41 | } |
42 | 42 | ||
43 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] | 43 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] |
@@ -102,7 +102,7 @@ dequeue PacketQueue { pktq, seqno, qsize } = do | |||
102 | modifyTVar' seqno succ | 102 | modifyTVar' seqno succ |
103 | return x | 103 | return x |
104 | 104 | ||
105 | -- | Like dequeue, but marks as handled rather than removing | 105 | -- | Like dequeue, but marks as viewed rather than removing |
106 | markButNotDequeue :: PacketQueue (Bool,a) -> STM a | 106 | markButNotDequeue :: PacketQueue (Bool,a) -> STM a |
107 | markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | 107 | markButNotDequeue PacketQueue { pktq, seqno, qsize } = do |
108 | i0 <- readTVar seqno | 108 | i0 <- readTVar seqno |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index e6bb8371..ee8af399 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -24,6 +24,8 @@ import Control.Lens | |||
24 | import Data.Function | 24 | import Data.Function |
25 | import qualified Data.PacketQueue as PQ | 25 | import qualified Data.PacketQueue as PQ |
26 | ;import Data.PacketQueue (PacketQueue) | 26 | ;import Data.PacketQueue (PacketQueue) |
27 | import qualified Data.CyclicBuffer as CB | ||
28 | ;import Data.CyclicBuffer (CyclicBuffer) | ||
27 | import Data.Serialize as S | 29 | import Data.Serialize as S |
28 | import Data.Word | 30 | import Data.Word |
29 | import Data.Maybe | 31 | import Data.Maybe |