summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2018-05-28 19:54:59 +0000
committerJames Crayne <jim.crayne@gmail.com>2018-05-28 19:54:59 +0000
commit8fe574a8f1f2f71b68c89aeacccd05d084a4003d (patch)
treedc665a9b6cc2928f63630f8401af3573338ec7a9
parent714f05030737b1df78a67ba694ab33fd605d55ed (diff)
ncLastNMsgs is now CyclicBuffer type
-rw-r--r--dht-client.cabal1
-rw-r--r--examples/dhtd.hs10
-rw-r--r--src/Data/CyclicBuffer.hs24
-rw-r--r--src/Network/Tox/Crypto/Handlers.hs23
4 files changed, 30 insertions, 28 deletions
diff --git a/dht-client.cabal b/dht-client.cabal
index 9e3c78ae..fa49aff1 100644
--- a/dht-client.cabal
+++ b/dht-client.cabal
@@ -90,6 +90,7 @@ library
90 Network.BitTorrent.MainlineDHT.Symbols 90 Network.BitTorrent.MainlineDHT.Symbols
91 System.Global6 91 System.Global6
92 Data.PacketQueue 92 Data.PacketQueue
93 Data.CyclicBuffer
93 Data.Word64Map 94 Data.Word64Map
94 Data.Word64RangeMap 95 Data.Word64RangeMap
95 OnionRouter 96 OnionRouter
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 4b79b132..ce1e1b16 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -108,6 +108,7 @@ import Connection
108import ToxToXMPP 108import ToxToXMPP
109import qualified Connection.Tcp as Tcp (ConnectionEvent(..)) 109import qualified Connection.Tcp as Tcp (ConnectionEvent(..))
110import Control.Concurrent.Supply 110import Control.Concurrent.Supply
111import qualified Data.CyclicBuffer as CB
111 112
112 113
113showReport :: [(String,String)] -> String 114showReport :: [(String,String)] -> String
@@ -708,14 +709,11 @@ clientSession s@Session{..} sock cnum h = do
708 { ncSessionId = id 709 { ncSessionId = id
709 , ncMyPublicKey = yourkey 710 , ncMyPublicKey = yourkey
710 , ncTheirPublicKey = theirkey 711 , ncTheirPublicKey = theirkey
711 , ncLastNMsgs = msgQ 712 , ncLastNMsgs = lastN
712 , ncSockAddr = sockAddr 713 , ncSockAddr = sockAddr
713 , ncMsgNumVar = msgNumVar
714 , ncDropCntVar = dropCntVar
715 }) = do 714 }) = do
716 num <- atomically (readTVar msgNumVar) 715 (num,dropped) <- atomically $ liftA2 (,) (CB.getTotal lastN) (CB.getDropped lastN)
717 dropped <- atomically (readTVar dropCntVar) 716 as <- atomically (CB.cyclicBufferViewList lastN)
718 as <- atomically (packetQueueViewList msgQ)
719 let (h,u) = partition (fst . snd) as 717 let (h,u) = partition (fst . snd) as
720 countHandled = length h 718 countHandled = length h
721 countUnhandled = length u 719 countUnhandled = length u
diff --git a/src/Data/CyclicBuffer.hs b/src/Data/CyclicBuffer.hs
index ab022e3f..0cc87459 100644
--- a/src/Data/CyclicBuffer.hs
+++ b/src/Data/CyclicBuffer.hs
@@ -1,6 +1,7 @@
1{-# LANGUAGE NamedFieldPuns #-} 1{-# LANGUAGE NamedFieldPuns #-}
2{-# LANGUAGE FlexibleContexts #-} 2{-# LANGUAGE FlexibleContexts #-}
3module Data.CyclicBuffer where 3module Data.CyclicBuffer {- TODO: export list -} where
4
4 5
5import Control.Concurrent.STM 6import Control.Concurrent.STM
6import Control.Concurrent.STM.TArray 7import Control.Concurrent.STM.TArray
@@ -19,6 +20,7 @@ data CyclicBuffer a = CyclicBuffer
19 , qsize :: Word32 20 , qsize :: Word32
20 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1 21 , buffend :: TVar Word32 -- on incoming, highest packet number handled + 1
21 , dropCnt :: TVar Word32 22 , dropCnt :: TVar Word32
23 , totalCnt :: TVar Word32
22 } 24 }
23 25
24cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)] 26cyclicBufferViewList :: CyclicBuffer a -> STM [(Word32,a)]
@@ -30,6 +32,15 @@ cyclicBufferViewList p = do
30getCapacity :: Applicative m => CyclicBuffer t -> m Word32 32getCapacity :: Applicative m => CyclicBuffer t -> m Word32
31getCapacity (CyclicBuffer { qsize }) = pure qsize 33getCapacity (CyclicBuffer { qsize }) = pure qsize
32 34
35getTotal :: CyclicBuffer t -> STM Word32
36getTotal (CyclicBuffer { totalCnt }) = readTVar totalCnt
37
38getDropped :: CyclicBuffer t -> STM Word32
39getDropped (CyclicBuffer { dropCnt }) = readTVar dropCnt
40
41getNextSequenceNum :: CyclicBuffer t -> STM Word32
42getNextSequenceNum (CyclicBuffer { seqno }) = readTVar seqno
43
33-- | Create a new CyclicBuffer with Overwrite on Wrap. 44-- | Create a new CyclicBuffer with Overwrite on Wrap.
34new :: Word32 -- ^ Capacity of queue. 45new :: Word32 -- ^ Capacity of queue.
35 -> Word32 -- ^ Initial sequence number. 46 -> Word32 -- ^ Initial sequence number.
@@ -41,6 +52,7 @@ new capacity seqstart = do
41 seqv <- newTVar seqstart 52 seqv <- newTVar seqstart
42 bufe <- newTVar 0 53 bufe <- newTVar 0
43 dropped <- newTVar 0 54 dropped <- newTVar 0
55 total <- newTVar 0
44 return CyclicBuffer 56 return CyclicBuffer
45 { vwflgs = flgs 57 { vwflgs = flgs
46 , pktq = q 58 , pktq = q
@@ -48,6 +60,7 @@ new capacity seqstart = do
48 , qsize = cap 60 , qsize = cap
49 , buffend = bufe 61 , buffend = bufe
50 , dropCnt = dropped 62 , dropCnt = dropped
63 , totalCnt = total
51 } 64 }
52 65
53observeOutOfBand :: CyclicBuffer a -> Word32-> STM () 66observeOutOfBand :: CyclicBuffer a -> Word32-> STM ()
@@ -79,14 +92,15 @@ markButNotDequeue CyclicBuffer { vwflgs, pktq, seqno, qsize } = do
79 modifyTVar' seqno succ 92 modifyTVar' seqno succ
80 return x 93 return x
81 94
82-- | Enqueue a packet. Packets need not be enqueued in order as long as there 95-- | Enqueue a packet. If the capacity is exceeded, packets are
83-- is spare capacity in the queue. If the capacity is exceeded, packets are
84-- dropped and the drop count increased accordingly. 96-- dropped and the drop count increased accordingly.
97-- TODO: We no longer really support "out of order"
98-- So perhaps drop the num parameter
85enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue) 99enqueue :: CyclicBuffer a -- ^ The cyclic buffer(queue)
86 -> Word32 -- ^ Sequence number of the packet. 100 -> Word32 -- ^ Sequence number of the packet.
87 -> a -- ^ The packet. 101 -> a -- ^ The packet.
88 -> STM () 102 -> STM ()
89enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do 103enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt, totalCnt} no x = do
90 low <- readTVar seqno 104 low <- readTVar seqno
91 let proj = no - low 105 let proj = no - low
92 let i = no `mod` qsize 106 let i = no `mod` qsize
@@ -96,5 +110,7 @@ enqueue CyclicBuffer{vwflgs, pktq, seqno, qsize, buffend, dropCnt} no x = do
96 modifyTVar' dropCnt (+1) 110 modifyTVar' dropCnt (+1)
97 writeArray pktq i (Just x) 111 writeArray pktq i (Just x)
98 writeArray vwflgs i False -- mark as not viewed 112 writeArray vwflgs i False -- mark as not viewed
113 modifyTVar' totalCnt (+1)
114 writeTVar seqno (no+1)
99 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be) 115 modifyTVar' buffend (\be -> if be - low <= proj then no + 1 else be)
100 return () 116 return ()
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs
index ee8af399..92cb19b8 100644
--- a/src/Network/Tox/Crypto/Handlers.hs
+++ b/src/Network/Tox/Crypto/Handlers.hs
@@ -192,17 +192,12 @@ data NetCryptoSession = NCrypto
192 CryptoMessage 192 CryptoMessage
193 (CryptoPacket Encrypted) 193 (CryptoPacket Encrypted)
194 CryptoData 194 CryptoData
195 , ncLastNMsgs :: PacketQueue (Bool{-Handled?-},(ViewSnapshot,InOrOut CryptoMessage)) 195 , ncLastNMsgs :: CyclicBuffer (Bool{-Handled?-},(ViewSnapshot,InOrOut CryptoMessage))
196 -- ^ cyclic buffer, holds the last N non-handshake crypto messages 196 -- ^ cyclic buffer, holds the last N non-handshake crypto messages
197 -- even if there is no attached user interface. 197 -- even if there is no attached user interface.
198 , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage)) 198 , ncListeners :: TVar (IntMap.IntMap (ListenerType,TMChan CryptoMessage))
199 -- ^ user interfaces may "listen" by inserting themselves into this map 199 -- ^ user interfaces may "listen" by inserting themselves into this map
200 -- with a unique id and a new TChan, and then reading from the TChan 200 -- with a unique id and a new TChan, and then reading from the TChan
201 , ncMsgNumVar :: TVar Word32
202 -- ^ The number of non-handshake crypto messages written to ncLastNMsgs total
203 , ncDropCntVar :: TVar Word32
204 -- ^ The number of crypto messages that were overwritten in the ncLastNMsgs
205 -- before anybody got to see them.
206 } 201 }
207 202
208data NetCryptoSessions = NCSessions 203data NetCryptoSessions = NCSessions
@@ -450,7 +445,7 @@ freshCryptoSession sessions
450 writeTVar ncMyPacketNonce0 n24plus1 445 writeTVar ncMyPacketNonce0 n24plus1
451 return (return (f n24, n24, ncOutgoingIdMap0)) 446 return (return (f n24, n24, ncOutgoingIdMap0))
452 pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 447 pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0
453 msgQ <- atomically (PQ.newOverwrite 10 0 :: STM (PacketQueue (Bool,(ViewSnapshot,InOrOut CryptoMessage)))) 448 lastNQ <- atomically (CB.new 10 0 :: STM (CyclicBuffer (Bool,(ViewSnapshot,InOrOut CryptoMessage))))
454 listeners <- atomically $ newTVar IntMap.empty 449 listeners <- atomically $ newTVar IntMap.empty
455 msgNum <- atomically $ newTVar 0 450 msgNum <- atomically $ newTVar 0
456 dropNum <- atomically $ newTVar 0 451 dropNum <- atomically $ newTVar 0
@@ -478,10 +473,8 @@ freshCryptoSession sessions
478 , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" 473 , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?"
479 , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?" 474 , ncPingMachine = Nothing -- error "you want the NetCrypto-PingMachine, but is it started?"
480 , ncOutgoingQueue = pktoq 475 , ncOutgoingQueue = pktoq
481 , ncLastNMsgs = msgQ 476 , ncLastNMsgs = lastNQ
482 , ncListeners = listeners 477 , ncListeners = listeners
483 , ncMsgNumVar = msgNum
484 , ncDropCntVar = dropNum
485 } 478 }
486 -- launch dequeue thread 479 -- launch dequeue thread
487 threadid <- forkIO $ do 480 threadid <- forkIO $ do
@@ -886,17 +879,11 @@ hookHelper handledFlg typ session cm = do
886addMsgToLastN :: Bool -> MessageType -> NetCryptoSession -> InOrOut CryptoMessage -> IO () 879addMsgToLastN :: Bool -> MessageType -> NetCryptoSession -> InOrOut CryptoMessage -> IO ()
887addMsgToLastN handledFlg typ session cm = do 880addMsgToLastN handledFlg typ session cm = do
888 let lastNQ = ncLastNMsgs session 881 let lastNQ = ncLastNMsgs session
889 msgNumVar = ncMsgNumVar session
890 dropCntVar = ncDropCntVar session
891 atomically $ do 882 atomically $ do
892 num <- readTVar msgNumVar
893 view <- readTVar (ncView session) 883 view <- readTVar (ncView session)
894 snapshot <- viewSnapshot view 884 snapshot <- viewSnapshot view
895 (wraps,offset) <- PQ.enqueue lastNQ num (handledFlg,(snapshot,cm)) 885 num <- CB.getNextSequenceNum lastNQ
896 capacity <- PQ.getCapacity lastNQ 886 CB.enqueue lastNQ num (handledFlg,(snapshot,cm))
897 let dropped = wraps * capacity + offset
898 modifyTVar' msgNumVar (+1)
899 writeTVar dropCntVar dropped
900 887
901 888
902-- | use to add a single hook to a specific session. 889-- | use to add a single hook to a specific session.