summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/dhtd.hs17
-rw-r--r--src/Data/PacketQueue.hs51
2 files changed, 57 insertions, 11 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index fed2976f..57ee8deb 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -673,20 +673,23 @@ clientSession s@Session{..} sock cnum h = do
673 -> cmd0 $ do 673 -> cmd0 $ do
674 sessions' <- atomically $ readTVar sessions :: IO [PerSession] 674 sessions' <- atomically $ readTVar sessions :: IO [PerSession]
675 let sessionsReport = mapM showPerSession sessions' 675 let sessionsReport = mapM showPerSession sessions'
676 headers = ["Key", "NextMsg", "Handled","Unhandled"] 676 headers = ["Key", "NextMsg", "Dropped","Handled","Unhandled"]
677 showPerSession (PerSession 677 showPerSession (PerSession
678 { perSessionMsgs = msgQ 678 { perSessionMsgs = msgQ
679 , perSessionPublicKey = pubKey 679 , perSessionPublicKey = pubKey
680 , perSessionAddr = sockAddr 680 , perSessionAddr = sockAddr
681 , perSessionNumVar = msgNumVar 681 , perSessionNumVar = msgNumVar
682 , perSessionDropCount = dropCntVar
682 }) = do 683 }) = do
683 num <- atomically (readTVar msgNumVar) 684 num <- atomically (readTVar msgNumVar)
685 dropped <- atomically (readTVar dropCntVar)
684 as <- atomically (packetQueueViewList msgQ) 686 as <- atomically (packetQueueViewList msgQ)
685 let (h,u) = partition (fst . snd) as 687 let (h,u) = partition (fst . snd) as
686 countHandled = length h 688 countHandled = length h
687 countUnhandled = length u 689 countUnhandled = length u
688 return [ show (Tox.key2id pubKey) -- "Key" 690 return [ show (Tox.key2id pubKey) -- "Key"
689 , show num -- "NextMsg" 691 , show num -- "NextMsg"
692 , show dropped -- "Dropped"
690 , show countHandled -- "Handled" 693 , show countHandled -- "Handled"
691 , show countUnhandled -- "Unhandled" 694 , show countUnhandled -- "Unhandled"
692 ] 695 ]
@@ -1248,7 +1251,8 @@ announceToxJabberPeer echan laddr saddr pingflag tsrc tsnk
1248data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage) 1251data PerSession = PerSession { perSessionMsgs :: PacketQueue (Bool{-Handled?-},Tox.CryptoMessage)
1249 , perSessionPublicKey :: PublicKey 1252 , perSessionPublicKey :: PublicKey
1250 , perSessionAddr :: SockAddr 1253 , perSessionAddr :: SockAddr
1251 , perSessionNumVar :: TVar Word32 1254 , perSessionNumVar :: TVar Word32
1255 , perSessionDropCount :: TVar Word32
1252 } 1256 }
1253 1257
1254main :: IO () 1258main :: IO ()
@@ -1572,12 +1576,14 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1572 -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState) 1576 -- allsessionsMap <- atomically $ readTVar (netCryptoSessions netCryptoSessionsState)
1573 let sockAddr = Tox.ncSockAddr netcrypto 1577 let sockAddr = Tox.ncSockAddr netcrypto
1574 pubKey = Tox.ncTheirPublicKey netcrypto 1578 pubKey = Tox.ncTheirPublicKey netcrypto
1575 msgQ <- atomically (Data.PacketQueue.new 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage))) 1579 msgQ <- atomically (Data.PacketQueue.newOverwrite 10 0 :: STM (PacketQueue (Bool,Tox.CryptoMessage)))
1576 msgNumVar <- atomically (newTVar 0) 1580 msgNumVar <- atomically (newTVar 0)
1581 dropCntVar <- atomically (newTVar 0)
1577 let perSession = PerSession { perSessionMsgs = msgQ 1582 let perSession = PerSession { perSessionMsgs = msgQ
1578 , perSessionPublicKey = pubKey 1583 , perSessionPublicKey = pubKey
1579 , perSessionAddr = sockAddr 1584 , perSessionAddr = sockAddr
1580 , perSessionNumVar = msgNumVar 1585 , perSessionNumVar = msgNumVar
1586 , perSessionDropCount = dropCntVar
1581 } 1587 }
1582 atomically $ modifyTVar' sessions (perSession:) 1588 atomically $ modifyTVar' sessions (perSession:)
1583 tmchan <- atomically newTMChan 1589 tmchan <- atomically newTMChan
@@ -1599,8 +1605,11 @@ main = runResourceT $ liftBaseWith $ \resT -> do
1599 handleIncoming mTyp session cm = do 1605 handleIncoming mTyp session cm = do
1600 atomically $ do 1606 atomically $ do
1601 num <- readTVar msgNumVar 1607 num <- readTVar msgNumVar
1602 enqueue msgQ num (False,cm) 1608 (wraps,offset) <- enqueue msgQ num (False,cm)
1609 capacity <- getCapacity msgQ
1610 let dropped = wraps * capacity + offset
1603 modifyTVar' msgNumVar (+1) 1611 modifyTVar' msgNumVar (+1)
1612 writeTVar dropCntVar dropped
1604 atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd) 1613 atomically $ writeTMChan tmchan cm -- (Tox.bufferData cd)
1605 return Nothing 1614 return Nothing
1606 atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming 1615 atomically $ writeTVar (Tox.ncUnrecognizedHook netcrypto) handleIncoming
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
index a4c99cab..4f0f04e3 100644
--- a/src/Data/PacketQueue.hs
+++ b/src/Data/PacketQueue.hs
@@ -6,8 +6,11 @@
6{-# LANGUAGE FlexibleContexts #-} 6{-# LANGUAGE FlexibleContexts #-}
7module Data.PacketQueue 7module Data.PacketQueue
8 ( PacketQueue 8 ( PacketQueue
9 , getCapacity
9 , new 10 , new
11 , newOverwrite
10 , dequeue 12 , dequeue
13 , markButNotDequeue
11 , enqueue 14 , enqueue
12 , observeOutOfBand 15 , observeOutOfBand
13 , PacketOutQueue 16 , PacketOutQueue
@@ -24,6 +27,7 @@ module Data.PacketQueue
24import Control.Concurrent.STM 27import Control.Concurrent.STM
25import Control.Concurrent.STM.TArray 28import Control.Concurrent.STM.TArray
26import Control.Monad 29import Control.Monad
30import Control.Applicative
27import Data.Word 31import Data.Word
28import Data.Array.MArray 32import Data.Array.MArray
29import Data.Maybe 33import Data.Maybe
@@ -33,6 +37,7 @@ data PacketQueue a = PacketQueue
33 , seqno :: TVar Word32 37 , seqno :: TVar Word32
34 , qsize :: Word32 38 , qsize :: Word32
35 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 39 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
40 , qOverWriteMode :: Bool
36 } 41 }
37 42
38packetQueueViewList :: PacketQueue a -> STM [(Word32,a)] 43packetQueueViewList :: PacketQueue a -> STM [(Word32,a)]
@@ -41,6 +46,8 @@ packetQueueViewList p = do
41 f (n,Just x) = Just (n,x) 46 f (n,Just x) = Just (n,x)
42 catMaybes . map f <$> getAssocs (pktq p) 47 catMaybes . map f <$> getAssocs (pktq p)
43 48
49getCapacity :: Applicative m => PacketQueue t -> m Word32
50getCapacity (PacketQueue { qsize }) = pure qsize
44 51
45-- | Create a new PacketQueue. 52-- | Create a new PacketQueue.
46new :: Word32 -- ^ Capacity of queue. 53new :: Word32 -- ^ Capacity of queue.
@@ -56,6 +63,24 @@ new capacity seqstart = do
56 , seqno = seqv 63 , seqno = seqv
57 , qsize = cap 64 , qsize = cap
58 , buffend = bufe 65 , buffend = bufe
66 , qOverWriteMode = False
67 }
68
69-- | Create a new PacketQueue with Overwrite on Wrap.
70newOverwrite :: Word32 -- ^ Capacity of queue.
71 -> Word32 -- ^ Initial sequence number.
72 -> STM (PacketQueue a)
73newOverwrite capacity seqstart = do
74 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
75 q <- newArray (0,cap - 1) Nothing
76 seqv <- newTVar seqstart
77 bufe <- newTVar 0
78 return PacketQueue
79 { pktq = q
80 , seqno = seqv
81 , qsize = cap
82 , buffend = bufe
83 , qOverWriteMode = True
59 } 84 }
60 85
61observeOutOfBand :: PacketQueue a -> Word32-> STM () 86observeOutOfBand :: PacketQueue a -> Word32-> STM ()
@@ -77,21 +102,33 @@ dequeue PacketQueue { pktq, seqno, qsize } = do
77 modifyTVar' seqno succ 102 modifyTVar' seqno succ
78 return x 103 return x
79 104
105-- | Like dequeue, but marks as handled rather than removing
106markButNotDequeue :: PacketQueue (Bool,a) -> STM a
107markButNotDequeue PacketQueue { pktq, seqno, qsize } = do
108 i0 <- readTVar seqno
109 let i = i0 `mod` qsize
110 (b,x) <- maybe retry return =<< readArray pktq i
111 writeArray pktq i (Just (True,x))
112 modifyTVar' seqno succ
113 return x
114
80-- | Enqueue a packet. Packets need not be enqueued in order as long as there 115-- | Enqueue a packet. Packets need not be enqueued in order as long as there
81-- is spare capacity in the queue. If there is not, the packet will be 116-- is spare capacity in the queue. If there is not, the packet will be
82-- silently discarded without blocking. 117-- silently discarded without blocking. (Unless this is an Overwrite-queue,
118-- in which case, the packets will simply wrap around overwriting the old ones.)
83enqueue :: PacketQueue a -- ^ The packet queue. 119enqueue :: PacketQueue a -- ^ The packet queue.
84 -> Word32 -- ^ Sequence number of the packet. 120 -> Word32 -- ^ Sequence number of the packet.
85 -> a -- ^ The packet. 121 -> a -- ^ The packet.
86 -> STM () 122 -> STM (Word32,Word32)
87enqueue PacketQueue{ pktq, seqno, qsize, buffend } no x = do 123enqueue PacketQueue{ pktq, seqno, qsize, buffend,qOverWriteMode } no x = do
88 low <- readTVar seqno 124 low <- readTVar seqno
89 let proj = no - low 125 let proj = no - low
90 -- Ignore packet if out of range. 126 -- Ignore packet if out of range.
91 when ( proj < qsize) $ do 127 when ( proj < qsize || qOverWriteMode) $ do
92 let i = no `mod` qsize 128 let i = no `mod` qsize
93 writeArray pktq i (Just x) 129 writeArray pktq i (Just x)
94 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 130 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
131 return (proj `divMod` qsize)
95 132
96-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a) 133-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
97-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo 134-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo