From 8d9abc1df036a8184bc2fd88ddf6f1d621e7e4c1 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sun, 19 Nov 2017 22:52:07 +0000 Subject: Outgoing queue and related --- src/Network/Tox/Crypto/Handlers.hs | 73 +++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) (limited to 'src/Network/Tox/Crypto/Handlers.hs') diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index edfcb260..6a79da1b 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -1,5 +1,6 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE TypeOperators #-} module Network.Tox.Crypto.Handlers where import Network.Tox.NodeId @@ -22,6 +23,7 @@ import qualified Data.PacketQueue as PQ ;import Data.PacketQueue (PacketQueue) import Data.Serialize as S import Data.Word +import qualified Data.Word64Map as W64 import GHC.Conc (unsafeIOToSTM) import qualified Data.Set as Set import qualified Data.Array.Unboxed as A @@ -32,6 +34,7 @@ import System.Environment import System.Directory import Control.Concurrent import GHC.Conc (labelThread) +import System.IO.Unsafe(unsafeDupablePerformIO {- unsafeIOToSTM -}) -- util, todo: move to another module maybeToEither :: Maybe b -> Either String b @@ -45,7 +48,10 @@ data NetCryptoSessionStatus = Unaccepted | Accepted | Confirmed type IOHook addr x = addr -> x -> IO (Maybe (x -> x)) type NetCryptoHook = IOHook NetCryptoSession CryptoData type MsgTypeArray = A.UArray Word8 Word16 - +type MsgOutMap = W64.Word64Map Word8 +-- type MsgOutMap = A.UArray Word64 Word8 -- if above is too slow, switch to this, but use reasonable bounds +msgOutMapLookup :: Word64 -> MsgOutMap -> Maybe Word8 +msgOutMapLookup = W64.lookup -- | Information, that may be made visible in multiple sessions, as well -- as displayed in some way to the user via mutiple views. @@ -86,7 +92,7 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatu -- may not be in ncHooks yet, but they should appear -- here if ncUnrecognizedHook will add them to ncHooks -- on an as-need basis. - , ncOutgoingTypeArray :: TVar MsgTypeArray + , ncOutgoingIdMap :: TVar MsgOutMap , ncAllSessions :: NetCryptoSessions -- ^ may be needed if one net-crypto session -- needs to possibly start another, as is -- the case in group chats @@ -94,6 +100,7 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatu , ncPacketQueue :: PacketQueue CryptoData , ncBufferStart :: TVar Word32 , ncDequeueThread :: Maybe ThreadId + , ncOutgoingQueue :: PQ.PacketOutQueue (State,Nonce24,TVar MsgOutMap) CryptoMessage (CryptoPacket Encrypted) CryptoData } data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession) @@ -104,6 +111,7 @@ data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAdd , sessionView :: SessionView , msgTypeArray :: MsgTypeArray , inboundQueueCapacity :: Word32 + , outboundQueueCapacity :: Word32 , nextSessionId :: TVar SessionID } @@ -168,6 +176,7 @@ newSessionsState crypto unrechook hooks = do } , msgTypeArray = allMsgTypes -- todo make this a parameter , inboundQueueCapacity = 200 + , outboundQueueCapacity = 400 , nextSessionId = nextSessionId0 } @@ -208,6 +217,53 @@ newHandShakeData crypto basenonce (HParam {hpOtherCookie,hpMySecretKey,hpCookieR , otherCookie = freshCookie' }) freshCookie +type XMessage = CryptoMessage -- todo + +ncToWire :: STM (State,Nonce24,TVar MsgOutMap) + -> Word32{- packet number we expect to recieve -} + -> Word32{- buffer_end -} + -> Word32{- packet number -} + -> XMessage + -> STM (Maybe (CryptoPacket Encrypted,Word32{-next packet no-})) +ncToWire getState seqno bufend pktno msg = do + let typ = getMessageType msg + typ64 = toWord64 typ + let lsness msg = + case typ of + Msg mid -> lossyness mid + GrpMsg KnownLossy _ -> Lossy + GrpMsg KnownLossless _ -> Lossless + (state,n24,msgOutMapVar) <- getState + msgOutMap <- readTVar msgOutMapVar + case msgOutMapLookup typ64 msgOutMap of + Just outid -> do + let setMessageId (OneByte _) mid = OneByte (toEnum8 mid) + setMessageId (TwoByte _ x) mid = TwoByte (toEnum8 mid) x + setMessageId (UpToN _ x) mid = UpToN (toEnum8 mid) x + msg' = setMessageId msg outid + in case lsness msg of + UnknownLossyness -> return Nothing + Lossy -> let cd = + CryptoData + { bufferStart = seqno + , bufferEnd = bufend + , bufferData = msg' + } + plain = encodePlain cd + encrypted = encrypt state plain + pkt = CryptoPacket { pktNonce = nonce24ToWord16 n24, pktData = encrypted } + in return (Just (pkt, pktno)) + Lossless -> let cd = + CryptoData + { bufferStart = seqno + , bufferEnd = pktno + , bufferData = msg' + } + plain = encodePlain cd + encrypted = encrypt state plain + pkt = CryptoPacket { pktNonce = nonce24ToWord16 n24, pktData = encrypted } + in return (Just (pkt, pktno+1)) + -- | called when we recieve a crypto handshake with valid cookie -- TODO set priority on contact addr to 0 if it is older than ForgetPeriod, -- then increment it regardless. (Keep addr in MinMaxPSQ in Roster.Contact) @@ -250,10 +306,18 @@ freshCryptoSession sessions ncHooks0 <- atomically $ newTVar (defaultHooks sessions) ncUnrecognizedHook0 <- atomically $ newTVar (defaultUnrecognizedHook sessions) ncIncomingTypeArray0 <- atomically $ newTVar (msgTypeArray sessions) - ncOutgoingTypeArray0 <- atomically $ newTVar allMsgTypes + ncOutgoingIdMap0 <- atomically $ newTVar W64.empty ncView0 <- atomically $ newTVar (sessionView sessions) pktq <- atomically $ PQ.new (inboundQueueCapacity sessions) 0 bufstart <- atomically $ newTVar 0 + let toWireIO = do + f <- lookupNonceFunction crypto newsession theirSessionKey + atomically $ do + n24 <- readTVar ncMyPacketNonce0 + let n24plus1 = unsafeDupablePerformIO (incrementNonce24 n24) + writeTVar ncMyPacketNonce0 n24plus1 + return (return (f n24, n24, ncOutgoingIdMap0)) + pktoq <- atomically $ PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 let netCryptoSession0 = NCrypto { ncState = ncState0 , ncSessionId = sessionId @@ -270,11 +334,12 @@ freshCryptoSession sessions , ncUnrecognizedHook = ncUnrecognizedHook0 , ncAllSessions = sessions , ncIncomingTypeArray = ncIncomingTypeArray0 - , ncOutgoingTypeArray = ncOutgoingTypeArray0 + , ncOutgoingIdMap = ncOutgoingIdMap0 , ncView = ncView0 , ncPacketQueue = pktq , ncBufferStart = bufstart , ncDequeueThread = Nothing -- error "you want the NetCrypto-Dequeue thread id, but is it started?" + , ncOutgoingQueue = pktoq } threadid <- forkIO $ do tid <- myThreadId -- cgit v1.2.3