diff options
-rw-r--r-- | examples/dhtd.hs | 17 | ||||
-rw-r--r-- | src/Data/PacketQueue.hs | 51 |
2 files changed, 57 insertions, 11 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index fed2976f..57ee8deb 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -673,20 +673,23 @@ clientSession s@Session{..} sock cnum h = do | |||
673 | -> cmd0 $ do | 673 | -> cmd0 $ do |
674 | sessions' <- atomically $ readTVar sessions :: IO [PerSession] | 674 | sessions' <- atomically $ readTVar sessions :: IO [PerSession] |
675 | let sessionsReport = mapM showPerSession sessions' | 675 | let sessionsReport = mapM showPerSession sessions' |
676 | headers = ["Key", "NextMsg", "Handled","Unhandled"] | 676 | headers = ["Key", "NextMsg", "Dropped","Handled","Unhandled"] |
677 | showPerSession (PerSession | 677 | showPerSession (PerSession |
678 | { perSessionMsgs = msgQ | 678 | { perSessionMsgs = msgQ |
679 | , perSessionPublicKey = pubKey | 679 | , perSessionPublicKey = pubKey |
680 | , perSessionAddr = sockAddr | 680 | , perSessionAddr = sockAddr |
681 | , perSessionNumVar = msgNumVar | 681 | , perSessionNumVar = msgNumVar |
682 | , perSessionDropCount = dropCntVar | ||
682 | }) = do | 683 | }) = do |
683 | num <- atomically (readTVar msgNumVar) | 684 | num <- atomically (readTVar msgNumVar) |
685 | dropped <- atomically (readTVar dropCntVar) | ||
684 | as <- atomically (packetQueueViewList msgQ) | 686 | as <- atomically (packetQueueViewList msgQ) |
685 | let (h,u) = partition (fst . snd) as | 687 | let (h,u) = partition (fst . snd) as |
686 | countHandled = length h | 688 | countHandled = length h |
687 | countUnhandled = length u | 689 | countUnhandled = length u |
688 | return [ show (Tox.key2id pubKey) -- "Key" | 690 | return [ show (Tox.key2id pubKey) -- "Key" |
689 | , show num -- "NextMsg" | 691 | , show num -- "NextMsg" |
692 | , show dropped -- "Dropped" | ||
690 | , show countHandled -- "Handled" | 693 | , show countHandled -- "Handled" |
691 | , show countUnhandled -- "Unhandled" | 694 | , show countUnhandled -- "Unhandled" |
692 | ] | 695 | ] |
@@ -1248,7 +1251,8 @@ announceToxJabberPeer echan laddr saddr pingflag tsrc tsnk | |||
1248 | data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage) | 1251 | data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage) |
1249 | , perSessionPublicKey :: PublicKey | 1252 | , perSessionPublicKey :: PublicKey |
1250 | , perSessionAddr :: SockAddr | 1253 | , perSessionAddr :: SockAddr |
1251 | , perSessionNumVar :: TVar Word32 | 1254 | , perSessionNumVar :: TVar Word32 |
1255 | , perSessionDropCount :: TVar Word32 | ||
1252 | } | 1256 | } |
1253 | 1257 | ||
1254 | main :: IO () | 1258 | main :: IO () |
@@ -1572,12 +1576,14 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1572 | -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) | 1576 | -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) |
1573 | let sockAddr = Tox.ncSockAddr netcrypto | 1577 | let sockAddr = Tox.ncSockAddr netcrypto |
1574 | pubKey = Tox.ncTheirPublicKey netcrypto | 1578 | pubKey = Tox.ncTheirPublicKey netcrypto |
1575 | msgQ <- atomically (Data.PacketQueue.new 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage))) | 1579 | msgQ <- atomically (Data.PacketQueue.newOverwrite 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage))) |
1576 | msgNumVar <- atomically (newTVar 0) | 1580 | msgNumVar <- atomically (newTVar 0) |
1581 | dropCntVar <- atomically (newTVar 0) | ||
1577 | let perSession = PerSession { perSessionMsgs = msgQ | 1582 | let perSession = PerSession { perSessionMsgs = msgQ |
1578 | , perSessionPublicKey = pubKey | 1583 | , perSessionPublicKey = pubKey |
1579 | , perSessionAddr = sockAddr | 1584 | , perSessionAddr = sockAddr |
1580 | , perSessionNumVar = msgNumVar | 1585 | , perSessionNumVar = msgNumVar |
1586 | , perSessionDropCount = dropCntVar | ||
1581 | } | 1587 | } |
1582 | atomically $ modifyTVar' sessions (perSession:) | 1588 | atomically $ modifyTVar' sessions (perSession:) |
1583 | tmchan <- atomically newTMChan | 1589 | tmchan <- atomically newTMChan |
@@ -1599,8 +1605,11 @@ main = runResourceT $ liftBaseWith $ \resT -> do | |||
1599 | handleIncoming mTyp session cm = do | 1605 | handleIncoming mTyp session cm = do |
1600 | atomically $ do | 1606 | atomically $ do |
1601 | num <- readTVar msgNumVar | 1607 | num <- readTVar msgNumVar |
1602 | enqueue msgQ num (False,cm) | 1608 | (wraps,offset) <- enqueue msgQ num (False,cm) |
1609 | capacity <- getCapacity msgQ | ||
1610 | let dropped = wraps * capacity + offset | ||
1603 | modifyTVar' msgNumVar (+1) | 1611 | modifyTVar' msgNumVar (+1) |
1612 | writeTVar dropCntVar dropped | ||
1604 | atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd) | 1613 | atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd) |
1605 | return Nothing | 1614 | return Nothing |
1606 | atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming | 1615 | atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming |
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index a4c99cab..4f0f04e3 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs | |||
@@ -6,8 +6,11 @@ | |||
6 | {-# LANGUAGE FlexibleContexts #-} | 6 | {-# LANGUAGE FlexibleContexts #-} |
7 | module Data.PacketQueue | 7 | module Data.PacketQueue |
8 | ( PacketQueue | 8 | ( PacketQueue |
9 | , getCapacity | ||
9 | , new | 10 | , new |
11 | , newOverwrite | ||
10 | , dequeue | 12 | , dequeue |
13 | , markButNotDequeue | ||
11 | , enqueue | 14 | , enqueue |
12 | , observeOutOfBand | 15 | , observeOutOfBand |
13 | , PacketOutQueue | 16 | , PacketOutQueue |
@@ -24,6 +27,7 @@ module Data.PacketQueue | |||
24 | import Control.Concurrent.STM | 27 | import Control.Concurrent.STM |
25 | import Control.Concurrent.STM.TArray | 28 | import Control.Concurrent.STM.TArray |
26 | import Control.Monad | 29 | import Control.Monad |
30 | import Control.Applicative | ||
27 | import Data.Word | 31 | import Data.Word |
28 | import Data.Array.MArray | 32 | import Data.Array.MArray |
29 | import Data.Maybe | 33 | import Data.Maybe |
@@ -33,6 +37,7 @@ data PacketQueue a = PacketQueue | |||
33 | , seqno :: TVar Word32 | 37 | , seqno :: TVar Word32 |
34 | , qsize :: Word32 | 38 | , qsize :: Word32 |
35 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 | 39 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 |
40 | , qOverWriteMode :: Bool | ||
36 | } | 41 | } |
37 | 42 | ||
38 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] | 43 | packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] |
@@ -41,6 +46,8 @@ packetQueueViewList p = do | |||
41 | f (n,Just x) = Just (n,x) | 46 | f (n,Just x) = Just (n,x) |
42 | catMaybes . map f <$> getAssocs (pktq p) | 47 | catMaybes . map f <$> getAssocs (pktq p) |
43 | 48 | ||
49 | getCapacity :: Applicative m => PacketQueue t -> m Word32 | ||
50 | getCapacity (PacketQueue { qsize }) = pure qsize | ||
44 | 51 | ||
45 | -- | Create a new PacketQueue. | 52 | -- | Create a new PacketQueue. |
46 | new :: Word32 -- ^ Capacity of queue. | 53 | new :: Word32 -- ^ Capacity of queue. |
@@ -56,6 +63,24 @@ new capacity seqstart = do | |||
56 | , seqno = seqv | 63 | , seqno = seqv |
57 | , qsize = cap | 64 | , qsize = cap |
58 | , buffend = bufe | 65 | , buffend = bufe |
66 | , qOverWriteMode = False | ||
67 | } | ||
68 | |||
69 | -- | Create a new PacketQueue with Overwrite on Wrap. | ||
70 | newOverwrite :: Word32 -- ^ Capacity of queue. | ||
71 | -> Word32 -- ^ Initial sequence number. | ||
72 | -> STM (PacketQueue a) | ||
73 | newOverwrite capacity seqstart = do | ||
74 | let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1 | ||
75 | q <- newArray (0,cap - 1) Nothing | ||
76 | seqv <- newTVar seqstart | ||
77 | bufe <- newTVar 0 | ||
78 | return PacketQueue | ||
79 | { pktq = q | ||
80 | , seqno = seqv | ||
81 | , qsize = cap | ||
82 | , buffend = bufe | ||
83 | , qOverWriteMode = True | ||
59 | } | 84 | } |
60 | 85 | ||
61 | observeOutOfBand :: PacketQueue a -> Word32-> STM () | 86 | observeOutOfBand :: PacketQueue a -> Word32-> STM () |
@@ -77,21 +102,33 @@ dequeue PacketQueue { pktq, seqno, qsize } = do | |||
77 | modifyTVar' seqno succ | 102 | modifyTVar' seqno succ |
78 | return x | 103 | return x |
79 | 104 | ||
105 | -- | Like dequeue, but marks as handled rather than removing | ||
106 | markButNotDequeue :: PacketQueue (Bool,a) -> STM a | ||
107 | markButNotDequeue PacketQueue { pktq, seqno, qsize } = do | ||
108 | i0 <- readTVar seqno | ||
109 | let i = i0 `mod` qsize | ||
110 | (b,x) <- maybe retry return =<< readArray pktq i | ||
111 | writeArray pktq i (Just (True,x)) | ||
112 | modifyTVar' seqno succ | ||
113 | return x | ||
114 | |||
80 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | 115 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there |
81 | -- is spare capacity in the queue. If there is not, the packet will be | 116 | -- is spare capacity in the queue. If there is not, the packet will be |
82 | -- silently discarded without blocking. | 117 | -- silently discarded without blocking. (Unless this is an Overwrite-queue, |
118 | -- in which case, the packets will simply wrap around overwriting the old ones.) | ||
83 | enqueue :: PacketQueue a -- ^ The packet queue. | 119 | enqueue :: PacketQueue a -- ^ The packet queue. |
84 | -> Word32 -- ^ Sequence number of the packet. | 120 | -> Word32 -- ^ Sequence number of the packet. |
85 | -> a -- ^ The packet. | 121 | -> a -- ^ The packet. |
86 | -> STM () | 122 | -> STM (Word32,Word32) |
87 | enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do | 123 | enqueue PacketQueue{ pktq, seqno, qsize, buffend,qOverWriteMode } no x = do |
88 | low <- readTVar seqno | 124 | low <- readTVar seqno |
89 | let proj = no - low | 125 | let proj = no - low |
90 | -- Ignore packet if out of range. | 126 | -- Ignore packet if out of range. |
91 | when ( proj < qsize) $ do | 127 | when ( proj < qsize || qOverWriteMode) $ do |
92 | let i = no `mod` qsize | 128 | let i = no `mod` qsize |
93 | writeArray pktq i (Just x) | 129 | writeArray pktq i (Just x) |
94 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 130 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) |
131 | return (proj `divMod` qsize) | ||
95 | 132 | ||
96 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) | 133 | -- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) |
97 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo | 134 | -- lookup PacketQueue{ pktq, seqno, qsize } no = _todo |