From 43219aa4da88fbf3187230af0b5ad5b2d17f177a Mon Sep 17 00:00:00 2001 From: James Crayne Date: Tue, 14 Nov 2017 05:17:02 +0000 Subject: netcrypto dequeue thread --- src/Data/PacketQueue.hs | 1 + src/Network/Tox/Crypto/Handlers.hs | 34 +++++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/Data/PacketQueue.hs b/src/Data/PacketQueue.hs index 66cf3383..b349bf4b 100644 --- a/src/Data/PacketQueue.hs +++ b/src/Data/PacketQueue.hs @@ -8,6 +8,7 @@ module Data.PacketQueue , new , dequeue , enqueue + , observeOutOfBand ) where import Control.Concurrent.STM diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 787c69c2..b92d4805 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -30,6 +30,8 @@ import System.FilePath import System.IO.Temp import System.Environment import System.Directory +import Control.Concurrent +import GHC.Conc (labelThread) -- util, todo: move to another module maybeToEither :: Maybe b -> Either String b @@ -82,8 +84,9 @@ data NetCryptoSession = NCrypto { ncState :: TVar NetCryptoSessionStatus -- needs to possibly start another, as is -- the case in group chats , ncView :: TVar SessionView - , ncPacketQueue :: PacketQueue CryptoMessage + , ncPacketQueue :: PacketQueue CryptoData , ncBufferStart :: TVar Word32 + , ncDequeueThread :: ThreadId } data NetCryptoSessions = NCSessions { netCryptoSessions :: TVar (Map.Map SockAddr NetCryptoSession) @@ -107,7 +110,7 @@ newSessionsState crypto unrechook hooks = do grps <- atomically $ newTVar Map.empty pname <- getProgName cachedir <- sensibleCacheDirCreateIfMissing pname - tmpdir <- ( pname) <$> getCanonicalTemporaryDirectory + tmpdir <- ( pname) <$> (getTemporaryDirectory >>= canonicalizePath) -- getCanonicalTemporaryDirectory configdir <- sensibleVarLib pname homedir <- getHomeDirectory svDownloadDir0 <- atomically $ newTVar (homedir "Downloads") @@ -203,7 +206,7 @@ freshCryptoSession sessions ncView0 <- atomically $ newTVar (sessionView sessions) pktq <- atomically $ PQ.new (inboundQueueCapacity sessions) 0 bufstart <- atomically $ newTVar 0 - let netCryptoSession = + let netCryptoSession0 = NCrypto { ncState = ncState0 , ncTheirBaseNonce= ncTheirBaseNonce0 , ncMyPacketNonce = ncMyPacketNonce0 @@ -220,7 +223,16 @@ freshCryptoSession sessions , ncView = ncView0 , ncPacketQueue = pktq , ncBufferStart = bufstart + , ncDequeueThread = error "you want the NetCrypto-Dequeue thread id, but is it started?" } + threadid <- forkIO $ do + tid <- myThreadId + labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey)) + fix $ \loop -> do + cd <- atomically $ PQ.dequeue pktq + _ <- runCryptoHook (netCryptoSession0 {ncDequeueThread=tid}) cd + loop + let netCryptoSession = netCryptoSession0 {ncDequeueThread=threadid} atomically $ modifyTVar allsessions (Map.insert addr netCryptoSession) -- | Called when we get a handshake, but there's already a session entry. @@ -304,7 +316,7 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do -- Handle Encrypted Message case Map.lookup addr sessionsmap of Nothing -> return Nothing -- drop packet, we have no session - Just session@(NCrypto {ncState, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce}) -> do + Just session@(NCrypto {ncMessageTypes, ncState, ncPacketQueue, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce}) -> do theirBaseNonce <- atomically $ readTVar ncTheirBaseNonce -- Try to decrypt message let diff :: Word16 @@ -333,7 +345,18 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do writeTVar ncTheirBaseNonce y -- then set session confirmed, atomically $ writeTVar ncState Confirmed - runCryptoHook session cd + msgTypes <- atomically $ readTVar ncMessageTypes + let msgTyp = cd ^. messageType + msgTypMapped16 = msgTypes A.! fromEnum8 (msgID cm) + msgTypMapped = fromWord16 $ msgTypMapped16 + isLossy (GrpMsg KnownLossy _) = True + isLossy (Msg mid) | lossyness mid == Lossy = True + isLossy _ = False + if isLossy msgTypMapped + then do atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd + runCryptoHook session cd + else do atomically $ PQ.enqueue ncPacketQueue bufferEnd cd + return Nothing where last2Bytes :: Nonce24 -> Word last2Bytes (Nonce24 bs) = case S.decode (B.drop 22 bs) of @@ -341,6 +364,7 @@ cryptoNetHandler sessions addr (NetCrypto (CryptoPacket nonce16 encrypted)) = do _ -> error "unreachable-last2Bytes" dATA_NUM_THRESHOLD = 21845 -- = 65535 / 3 +-- | TODO: make this accept CrytpoMessage instead runCryptoHook :: NetCryptoSession -> CryptoData -> IO (Maybe (x -> x)) runCryptoHook session@(NCrypto {ncState, ncHooks,ncSessionSecret,ncTheirSessionPublic,ncTheirBaseNonce,ncMessageTypes}) cd@(CryptoData {bufferStart, bufferEnd, bufferData=cm}) = do -- cgit v1.2.3