diff options
-rw-r--r-- | dht-client.cabal | 1 | ||||
-rw-r--r-- | examples/dhtd.hs | 10 | ||||
-rw-r--r-- | src/Data/CyclicBuffer.hs | 24 | ||||
-rw-r--r-- | src/Network/Tox/Crypto/Handlers.hs | 23 |
4 files changed, 30 insertions, 28 deletions
diff --git a/dht-client.cabal b/dht-client.cabal index 9e3c78ae..fa49aff1 100644 --- a/dht-client.cabal +++ b/dht-client.cabal | |||
@@ -90,6 +90,7 @@ library | |||
90 | Network.BitTorrent.MainlineDHT.Symbols | 90 | Network.BitTorrent.MainlineDHT.Symbols |
91 | System.Global6 | 91 | System.Global6 |
92 | Data.PacketQueue | 92 | Data.PacketQueue |
93 | Data.CyclicBuffer | ||
93 | Data.Word64Map | 94 | Data.Word64Map |
94 | Data.Word64RangeMap | 95 | Data.Word64RangeMap |
95 | OnionRouter | 96 | OnionRouter |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 4b79b132..ce1e1b16 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -108,6 +108,7 @@ import Connection | |||
108 | import ToxToXMPP | 108 | import ToxToXMPP |
109 | import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) | 109 | import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) |
110 | import Control.Concurrent.Supply | 110 | import Control.Concurrent.Supply |
111 | import qualified Data.CyclicBuffer as CB | ||
111 | 112 | ||
112 | 113 | ||
113 | showReport :: [(String,String)] -> String | 114 | showReport :: [(String,String)] -> String |
@@ -708,14 +709,11 @@ clientSession s@Session{..} sock cnum h = do | |||
708 | { ncSessionId = id | 709 | { ncSessionId = id |
709 | , ncMyPublicKey = yourkey | 710 | , ncMyPublicKey = yourkey |
710 | , ncTheirPublicKey = theirkey | 711 | , ncTheirPublicKey = theirkey |
711 | , ncLastNMsgs = msgQ | 712 | , ncLastNMsgs = lastN |
712 | , ncSockAddr = sockAddr | 713 | , ncSockAddr = sockAddr |
713 | , ncMsgNumVar = msgNumVar | ||
714 | , ncDropCntVar = dropCntVar | ||
715 | }) = do | 714 | }) = do |
716 | num <- atomically (readTVar msgNumVar) | 715 | (num,dropped) <- atomically $ liftA2 (,) (CB.getTotal lastN) (CB.getDropped lastN) |
717 | dropped <- atomically (readTVar dropCntVar) | 716 | as <- atomically (CB.cyclicBufferViewList lastN) |
718 | as <- atomically (packetQueueViewList msgQ) | ||
719 | let (h,u) = partition (fst . snd) as | 717 | let (h,u) = partition (fst . snd) as |
720 | countHandled = length h | 718 | countHandled = length h |
721 | countUnhandled = length u | 719 | countUnhandled = length u |
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs index ab022e3f..0cc87459 100644 --- a/src/Data/CyclicBuffer.hs +++ b/src/Data/CyclicBuffer.hs | |||
@@ -1,6 +1,7 @@ | |||
1 | {-# LANGUAGE NamedFieldPuns #-} | 1 | {-# LANGUAGE NamedFieldPuns #-} |
2 | {-# LANGUAGE FlexibleContexts #-} | 2 | {-# LANGUAGE FlexibleContexts #-} |
3 | module Data.CyclicBuffer where | 3 | module Data.CyclicBuffer {- TODO: export list -} where |
4 | |||
4 | 5 | ||
5 | import Control.Concurrent.STM | 6 | import Control.Concurrent.STM |
6 | import Control.Concurrent.STM.TArray | 7 | import Control.Concurrent.STM.TArray |
@@ -19,6 +20,7 @@ data CyclicBuffer a = CyclicBuffer | |||
19 | , qsize :: Word32 | 20 | , qsize :: Word32 |
20 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 | 21 | , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 |
21 | , dropCnt :: TVar Word32 | 22 | , dropCnt :: TVar Word32 |
23 | , totalCnt :: TVar Word32 | ||
22 | } | 24 | } |
23 | 25 | ||
24 | cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] | 26 | cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] |
@@ -30,6 +32,15 @@ cyclicBufferViewList p = do | |||
30 | getCapacity :: Applicative m => CyclicBuffer t -> m Word32 | 32 | getCapacity :: Applicative m => CyclicBuffer t -> m Word32 |
31 | getCapacity (CyclicBuffer { qsize }) = pure qsize | 33 | getCapacity (CyclicBuffer { qsize }) = pure qsize |
32 | 34 | ||
35 | getTotal :: CyclicBuffer t -> STM Word32 | ||
36 | getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt | ||
37 | |||
38 | getDropped :: CyclicBuffer t -> STM Word32 | ||
39 | getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt | ||
40 | |||
41 | getNextSequenceNum :: CyclicBuffer t -> STM Word32 | ||
42 | getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno | ||
43 | |||
33 | -- | Create a new CyclicBuffer with Overwrite on Wrap. | 44 | -- | Create a new CyclicBuffer with Overwrite on Wrap. |
34 | new :: Word32 -- ^ Capacity of queue. | 45 | new :: Word32 -- ^ Capacity of queue. |
35 | -> Word32 -- ^ Initial sequence number. | 46 | -> Word32 -- ^ Initial sequence number. |
@@ -41,6 +52,7 @@ new capacity seqstart = do | |||
41 | seqv <- newTVar seqstart | 52 | seqv <- newTVar seqstart |
42 | bufe <- newTVar 0 | 53 | bufe <- newTVar 0 |
43 | dropped <- newTVar 0 | 54 | dropped <- newTVar 0 |
55 | total <- newTVar 0 | ||
44 | return CyclicBuffer | 56 | return CyclicBuffer |
45 | { vwflgs = flgs | 57 | { vwflgs = flgs |
46 | , pktq = q | 58 | , pktq = q |
@@ -48,6 +60,7 @@ new capacity seqstart = do | |||
48 | , qsize = cap | 60 | , qsize = cap |
49 | , buffend = bufe | 61 | , buffend = bufe |
50 | , dropCnt = dropped | 62 | , dropCnt = dropped |
63 | , totalCnt = total | ||
51 | } | 64 | } |
52 | 65 | ||
53 | observeOutOfBand :: CyclicBuffer a -> Word32-> STM () | 66 | observeOutOfBand :: CyclicBuffer a -> Word32-> STM () |
@@ -79,14 +92,15 @@ markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do | |||
79 | modifyTVar' seqno succ | 92 | modifyTVar' seqno succ |
80 | return x | 93 | return x |
81 | 94 | ||
82 | -- | Enqueue a packet. Packets need not be enqueued in order as long as there | 95 | -- | Enqueue a packet. If the capacity is exceeded, packets are |
83 | -- is spare capacity in the queue. If the capacity is exceeded, packets are | ||
84 | -- dropped and the drop count increased accordingly. | 96 | -- dropped and the drop count increased accordingly. |
97 | -- TODO: We no longer really support "out of order" | ||
98 | -- So perhaps drop the num parameter | ||
85 | enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) | 99 | enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) |
86 | -> Word32 -- ^ Sequence number of the packet. | 100 | -> Word32 -- ^ Sequence number of the packet. |
87 | -> a -- ^ The packet. | 101 | -> a -- ^ The packet. |
88 | -> STM () | 102 | -> STM () |
89 | enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do | 103 | enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do |
90 | low <- readTVar seqno | 104 | low <- readTVar seqno |
91 | let proj = no - low | 105 | let proj = no - low |
92 | let i = no `mod` qsize | 106 | let i = no `mod` qsize |
@@ -96,5 +110,7 @@ enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do | |||
96 | modifyTVar' dropCnt (+1) | 110 | modifyTVar' dropCnt (+1) |
97 | writeArray pktq i (Just x) | 111 | writeArray pktq i (Just x) |
98 | writeArray vwflgs i False -- mark as not viewed | 112 | writeArray vwflgs i False -- mark as not viewed |
113 | modifyTVar' totalCnt (+1) | ||
114 | writeTVar seqno (no+1) | ||
99 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) | 115 | modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) |
100 | return () | 116 | return () |
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index ee8af399..92cb19b8 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs | |||
@@ -192,17 +192,12 @@ data NetCryptoSession = NCrypto | |||
192 | CryptoMessage | 192 | CryptoMessage |
193 | (CryptoPacket Encrypted) | 193 | (CryptoPacket Encrypted) |
194 | CryptoData | 194 | CryptoData |
195 | , ncLastNMsgs :: PacketQueue (Bool{-Handled?-},(ViewSnapshot,InOrOut CryptoMessage)) | 195 | , ncLastNMsgs :: CyclicBuffer (Bool{-Handled?-},(ViewSnapshot,InOrOut CryptoMessage)) |
196 | -- ^ cyclic buffer, holds the last N non-handshake crypto messages | 196 | -- ^ cyclic buffer, holds the last N non-handshake crypto messages |
197 | -- even if there is no attached user interface. | 197 | -- even if there is no attached user interface. |
198 | , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) | 198 | , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) |
199 | -- ^ user interfaces may "listen" by inserting themselves into this map | 199 | -- ^ user interfaces may "listen" by inserting themselves into this map |
200 | -- with a unique id and a new TChan, and then reading from the TChan | 200 | -- with a unique id and a new TChan, and then reading from the TChan |
201 | , ncMsgNumVar :: TVar Word32 | ||
202 | -- ^ The number of non-handshake crypto messages written to ncLastNMsgs total | ||
203 | , ncDropCntVar :: TVar Word32 | ||
204 | -- ^ The number of crypto messages that were overwritten in the ncLastNMsgs | ||
205 | -- before anybody got to see them. | ||
206 | } | 201 | } |
207 | 202 | ||
208 | data NetCryptoSessions = NCSessions | 203 | data NetCryptoSessions = NCSessions |
@@ -450,7 +445,7 @@ freshCryptoSession sessions | |||
450 | writeTVar ncMyPacketNonce0 n24plus1 | 445 | writeTVar ncMyPacketNonce0 n24plus1 |
451 | return (return (f n24, n24, ncOutgoingIdMap0)) | 446 | return (return (f n24, n24, ncOutgoingIdMap0)) |
452 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 | 447 | pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 |
453 | msgQ <- atomically (PQ.newOverwrite 10 0 :: STM (PacketQueue (Bool,(ViewSnapshot,InOrOut CryptoMessage)))) | 448 | lastNQ <- atomically (CB.new 10 0 :: STM (CyclicBuffer (Bool,(ViewSnapshot,InOrOut CryptoMessage)))) |
454 | listeners <- atomically $ newTVar IntMap.empty | 449 | listeners <- atomically $ newTVar IntMap.empty |
455 | msgNum <- atomically $ newTVar 0 | 450 | msgNum <- atomically $ newTVar 0 |
456 | dropNum <- atomically $ newTVar 0 | 451 | dropNum <- atomically $ newTVar 0 |
@@ -478,10 +473,8 @@ freshCryptoSession sessions | |||
478 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" | 473 | , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" |
479 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" | 474 | , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" |
480 | , ncOutgoingQueue = pktoq | 475 | , ncOutgoingQueue = pktoq |
481 | , ncLastNMsgs = msgQ | 476 | , ncLastNMsgs = lastNQ |
482 | , ncListeners = listeners | 477 | , ncListeners = listeners |
483 | , ncMsgNumVar = msgNum | ||
484 | , ncDropCntVar = dropNum | ||
485 | } | 478 | } |
486 | -- launch dequeue thread | 479 | -- launch dequeue thread |
487 | threadid <- forkIO $ do | 480 | threadid <- forkIO $ do |
@@ -886,17 +879,11 @@ hookHelper handledFlg typ session cm = do | |||
886 | addMsgToLastN :: Bool -> MessageType -> NetCryptoSession -> InOrOut CryptoMessage -> IO () | 879 | addMsgToLastN :: Bool -> MessageType -> NetCryptoSession -> InOrOut CryptoMessage -> IO () |
887 | addMsgToLastN handledFlg typ session cm = do | 880 | addMsgToLastN handledFlg typ session cm = do |
888 | let lastNQ = ncLastNMsgs session | 881 | let lastNQ = ncLastNMsgs session |
889 | msgNumVar = ncMsgNumVar session | ||
890 | dropCntVar = ncDropCntVar session | ||
891 | atomically $ do | 882 | atomically $ do |
892 | num <- readTVar msgNumVar | ||
893 | view <- readTVar (ncView session) | 883 | view <- readTVar (ncView session) |
894 | snapshot <- viewSnapshot view | 884 | snapshot <- viewSnapshot view |
895 | (wraps,offset) <- PQ.enqueue lastNQ num (handledFlg,(snapshot,cm)) | 885 | num <- CB.getNextSequenceNum lastNQ |
896 | capacity <- PQ.getCapacity lastNQ | 886 | CB.enqueue lastNQ num (handledFlg,(snapshot,cm)) |
897 | let dropped = wraps * capacity + offset | ||
898 | modifyTVar' msgNumVar (+1) | ||
899 | writeTVar dropCntVar dropped | ||
900 | 887 | ||
901 | 888 | ||
902 | -- | use to add a single hook to a specific session. | 889 | -- | use to add a single hook to a specific session. |