summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-11-13 05:19:50 -0500
committerJames Crayne <jim.crayne@gmail.com>2017-11-19 23:40:14 +0000
commitadd36aaffd9ef06ca9c6f2a73a185ba531581c25 (patch)
treea2891abc1f0d35486227350994e14aad765bd431 /src
parentf9d32e34a71e91bc2cd09eb23a0c3a06b2f66a35 (diff)
Added PacketQueue for handling inbound netcrypto packets.
Diffstat (limited to 'src')
-rw-r--r--src/Data/PacketQueue.hs65
-rw-r--r--src/Network/Tox/Crypto/Handlers.hs14
2 files changed, 77 insertions, 2 deletions
diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs
new file mode 100644
index 00000000..a617d502
--- /dev/null
+++ b/src/Data/PacketQueue.hs
@@ -0,0 +1,65 @@
1-- | This module is useful for implementing a lossess protocol on top of a
2-- lossy datagram style protocol. It implements a buffer in which packets may
3-- be stored out of order, but from which they are extracted in the proper
4-- sequence.
5{-# LANGUAGE NamedFieldPuns #-}
6module Data.PacketQueue
7 ( PacketQueue
8 , new
9 , dequeue
10 , enqueue
11 ) where
12
13import Control.Concurrent.STM
14import Control.Concurrent.STM.TArray
15import Control.Monad
16import Data.Word
17import Data.Array.MArray
18
19data PacketQueue a = PacketQueue
20 { pktq :: TArray Word32 (Maybe a)
21 , seqno :: TVar Word32
22 , qsize :: Word32
23 }
24
25-- | Create a new PacketQueue.
26new :: Word32 -- ^ Capacity of queue.
27 -> Word32 -- ^ Initial sequence number.
28 -> STM (PacketQueue a)
29new capacity seqstart = do
30 let cap = if capacity `mod` 2 == 0 then capacity else capacity + 1
31 q <- newArray (0,cap - 1) Nothing
32 seqv <- newTVar seqstart
33 return PacketQueue
34 { pktq = q
35 , seqno = seqv
36 , qsize = cap
37 }
38
39-- | Retry until the next expected packet is enqueued. Then return it.
40dequeue :: PacketQueue a -> STM a
41dequeue PacketQueue { pktq, seqno, qsize } = do
42 i0 <- readTVar seqno
43 let i = i0 `mod` qsize
44 x <- maybe retry return =<< readArray pktq i
45 writeArray pktq i Nothing
46 modifyTVar' seqno succ
47 return x
48
49-- | Enqueue a packet. Packets need not be enqueued in order as long as there
50-- is spare capacity in the queue. If there is not, the packet will be
51-- silently discarded without blocking.
52enqueue :: PacketQueue a -- ^ The packet queue.
53 -> Word32 -- ^ Sequence number of the packet.
54 -> a -- ^ The packet.
55 -> STM ()
56enqueue PacketQueue{ pktq, seqno, qsize } no x = do
57 low <- readTVar seqno
58 let proj = no - low
59 -- Ignore packet if out of range.
60 when ( proj < qsize) $ do
61 let i = no `mod` qsize
62 writeArray pktq i (Just x)
63
64-- lookup :: PacketQueue a -> Word32 -> STM (Maybe a)
65-- lookup PacketQueue{ pktq, seqno, qsize } no = _todo
diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs
index 13db02f3..787c69c2 100644
--- a/src/Network/Tox/Crypto/Handlers.hs
+++ b/src/Network/Tox/Crypto/Handlers.hs
@@ -18,6 +18,8 @@ import qualified Data.ByteString as B
18import Data.ByteString (ByteString) 18import Data.ByteString (ByteString)
19import Control.Lens 19import Control.Lens
20import Data.Function 20import Data.Function
21import qualified Data.PacketQueue as PQ
22 ;import Data.PacketQueue (PacketQueue)
21import Data.Serialize as S 23import Data.Serialize as S
22import Data.Word 24import Data.Word
23import GHC.Conc (unsafeIOToSTM) 25import GHC.Conc (unsafeIOToSTM)
@@ -63,7 +65,7 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatus
63 , ncTheirBaseNonce :: TVar Nonce24 -- base nonce + packet number 65 , ncTheirBaseNonce :: TVar Nonce24 -- base nonce + packet number
64 , ncMyPacketNonce :: TVar Nonce24 -- base nonce + packet number 66 , ncMyPacketNonce :: TVar Nonce24 -- base nonce + packet number
65 , ncHandShake :: TVar (Maybe (Handshake Encrypted)) 67 , ncHandShake :: TVar (Maybe (Handshake Encrypted))
66 , ncCookie :: TVar (Maybe Cookie) 68 , ncCookie :: TVar (Maybe Cookie) -- ^ Cookie issued by remote peer
67 , ncTheirDHTKey :: PublicKey 69 , ncTheirDHTKey :: PublicKey
68 , ncTheirSessionPublic :: Maybe PublicKey 70 , ncTheirSessionPublic :: Maybe PublicKey
69 , ncSessionSecret :: SecretKey 71 , ncSessionSecret :: SecretKey
@@ -80,6 +82,8 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatus
80 -- needs to possibly start another, as is 82 -- needs to possibly start another, as is
81 -- the case in group chats 83 -- the case in group chats
82 , ncView :: TVar SessionView 84 , ncView :: TVar SessionView
85 , ncPacketQueue :: PacketQueue CryptoMessage
86 , ncBufferStart :: TVar Word32
83 } 87 }
84 88
85data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession) 89data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession)
@@ -88,6 +92,7 @@ data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAdd
88 , defaultUnrecognizedHook :: MessageType -> NetCryptoHook 92 , defaultUnrecognizedHook :: MessageType -> NetCryptoHook
89 , sessionView :: SessionView 93 , sessionView :: SessionView
90 , msgTypeArray :: MsgTypeArray 94 , msgTypeArray :: MsgTypeArray
95 , inboundQueueCapacity :: Word32
91 } 96 }
92 97
93newSessionsState :: TransportCrypto 98newSessionsState :: TransportCrypto
@@ -120,6 +125,7 @@ newSessionsState crypto unrechook hooks = do
120 , svDownloadDir = svDownloadDir0 125 , svDownloadDir = svDownloadDir0
121 } 126 }
122 , msgTypeArray = allMsgTypes -- todo make this a parameter 127 , msgTypeArray = allMsgTypes -- todo make this a parameter
128 , inboundQueueCapacity = 200
123 } 129 }
124 130
125data HandshakeParams 131data HandshakeParams
@@ -195,6 +201,8 @@ freshCryptoSession sessions
195 ncUnrecognizedHook0 <- atomically $ newTVar (defaultUnrecognizedHook sessions) 201 ncUnrecognizedHook0 <- atomically $ newTVar (defaultUnrecognizedHook sessions)
196 ncMessageTypes0 <- atomically $ newTVar (msgTypeArray sessions) 202 ncMessageTypes0 <- atomically $ newTVar (msgTypeArray sessions)
197 ncView0 <- atomically $ newTVar (sessionView sessions) 203 ncView0 <- atomically $ newTVar (sessionView sessions)
204 pktq <- atomically $ PQ.new (inboundQueueCapacity sessions) 0
205 bufstart <- atomically $ newTVar 0
198 let netCryptoSession = 206 let netCryptoSession =
199 NCrypto { ncState = ncState0 207 NCrypto { ncState = ncState0
200 , ncTheirBaseNonce= ncTheirBaseNonce0 208 , ncTheirBaseNonce= ncTheirBaseNonce0
@@ -210,6 +218,8 @@ freshCryptoSession sessions
210 , ncAllSessions = sessions 218 , ncAllSessions = sessions
211 , ncMessageTypes = ncMessageTypes0 219 , ncMessageTypes = ncMessageTypes0
212 , ncView = ncView0 220 , ncView = ncView0
221 , ncPacketQueue = pktq
222 , ncBufferStart = bufstart
213 } 223 }
214 atomically $ modifyTVar allsessions (Map.insert addr netCryptoSession) 224 atomically $ modifyTVar allsessions (Map.insert addr netCryptoSession)
215 225
@@ -224,7 +234,7 @@ updateCryptoSession sessions addr hp session = do
224 -- duplicate handshake packet, otherwise disregard everything, and 234 -- duplicate handshake packet, otherwise disregard everything, and
225 -- refresh all state. 235 -- refresh all state.
226 -- 236 --
227 then when ( Just ncTheirBaseNonce0 /= hpTheirBaseNonce hp 237 then when ( Just ncTheirBaseNonce0 /= hpTheirBaseNonce hp -- XXX: Do we really want to compare base nonce here?
228 || ncTheirDHTKey session /= hpCookieRemoteDhtkey hp 238 || ncTheirDHTKey session /= hpCookieRemoteDhtkey hp
229 ) $ freshCryptoSession sessions addr hp 239 ) $ freshCryptoSession sessions addr hp
230 else if ( Just ncTheirBaseNonce0 /= hpTheirBaseNonce hp) 240 else if ( Just ncTheirBaseNonce0 /= hpTheirBaseNonce hp)