From fde5005a2d1ef3a0636cff21547d4cda22b7b215 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 17 Aug 2018 05:05:17 -0400 Subject: Simplified PacketQueue/PacketBuffer interface. --- src/Network/Tox/Crypto/Handlers.hs | 138 ++++++++++++++++++++++++++----------- 1 file changed, 99 insertions(+), 39 deletions(-) (limited to 'src/Network/Tox/Crypto/Handlers.hs') diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 6f440ea5..db7543f7 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -22,8 +22,7 @@ import qualified Data.ByteString as B import Data.ByteString (ByteString) import Control.Lens import Data.Function -import qualified Data.PacketQueue as PQ - ;import Data.PacketQueue (PacketQueue) +import Data.PacketBuffer as PB import qualified Data.CyclicBuffer as CB ;import Data.CyclicBuffer (CyclicBuffer) import Data.Serialize as S @@ -354,7 +353,7 @@ data NetCryptoSession = NCrypto -- the case in group chats , ncView :: TVar SessionView -- ^ contains your nick, status etc - , ncPacketQueue :: PacketQueue CryptoData + , ncPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) -- ^ a buffer in which incoming packets may be stored out of order -- but from which they may be extracted in sequence, -- helps ensure lossless packets are processed in order @@ -383,13 +382,13 @@ data NetCryptoSession = NCrypto , ncPingThread :: TVar (Maybe ThreadId) -- ^ thread which actually queues outgoing pings , ncIdleEventThread :: TVar (Maybe ThreadId) - , ncOutgoingQueue :: TVar - (UponHandshake + , ncOutgoingQueue :: TVar (UponHandshake NetCryptoOutQueue) + {- (PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) CryptoMessage (CryptoPacket Encrypted) - CryptoData)) + CryptoData)) -} -- ^ To send a message add it to this queue, by calling 'tryAppendQueueOutgoing' -- but remember to call 'readyOutGoing' first, because the shared secret cache -- presently requires the IO monad. @@ -643,7 +642,7 @@ freshCryptoSession sessions insertArrayAt outHooks 0 (A.array (0,64) (map assignHook [0..64])) return (idmap,lossyEsc,losslessEsc,outHooks) ncView0 <- newTVar (sessionView sessions) - pktq <- PQ.new (inboundQueueCapacity sessions) 0 + pktq <- PB.newPacketBuffer bufstart <- newTVar 0 mbpktoq <- case mbtheirSessionKey of @@ -696,7 +695,7 @@ freshCryptoSession sessions , ncOutgoingIdMapEscapedLossy = lossyEscapeIdMap , ncOutgoingIdMapEscapedLossless = losslessEscapeIdMap , ncView = ncView0 - , ncPacketQueue = pktq + , ncPacketBuffer = pktq , ncStoredRequests = ncStoredRequests0 , ncRequestInterval = ncRequestInterval0 , ncAliveInterval = ncAliveInterval0 @@ -720,12 +719,25 @@ freshCryptoSession sessions HaveHandshake pktoq -> return (runUponHandshake netCryptoSession0 addr pktoq) return (myhandshake,maybeLaunchMissles) +{- type NetCryptoOutQueue = PQ.PacketOutQueue (State,Nonce24,U.RangeMap TArray Word8 TVar) CryptoMessage (CryptoPacket Encrypted) CryptoData +-} +data NetCryptoOutQueue = NetCryptoOutQueue + { nqPacketBuffer :: PacketBuffer CryptoData (CryptoPacket Encrypted) + , nqToWire :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) + -> Word32 + -> Word32 + -> Word32 + -> XMessage + -> STM (Maybe (CryptoPacket Encrypted, Word32)) + , nqToWireIO :: IO (STM (State, Nonce24, U.RangeMap TArray Word8 TVar)) + , nqPacketNo :: TVar Word32 + } -createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketQueue CryptoData +createNetCryptoOutQueue :: NetCryptoSessions -> SecretKey -> PublicKey -> PacketBuffer CryptoData (CryptoPacket Encrypted) -> TVar Nonce24 -> U.RangeMap TArray Word8 TVar -> STM (UponHandshake NetCryptoOutQueue) createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce0 ncOutgoingIdMap0 = do let crypto = transportCrypto sessions @@ -739,8 +751,13 @@ createNetCryptoOutQueue sessions newsession theirSessionKey pktq ncMyPacketNonce ++ "\n toWireIO: my public session key = " ++ show (key2id (toPublic newsession)) ) $ writeTVar ncMyPacketNonce0 n24plus1 return (return (f n24, n24, ncOutgoingIdMap0)) - pktoq <- PQ.newOutGoing pktq ncToWire toWireIO 0 (outboundQueueCapacity sessions) 0 - return (HaveHandshake pktoq) + seqnoVar <- newTVar 0 + return (HaveHandshake NetCryptoOutQueue + { nqPacketBuffer = pktq + , nqToWire = ncToWire + , nqToWireIO = toWireIO + , nqPacketNo = seqnoVar + }) -- | add new session to the lookup maps addSessionToMap :: NetCryptoSessions -> SockAddr -> NetCryptoSession -> STM () @@ -788,11 +805,39 @@ addSessionToMapIfNotThere sessions addrRaw netCryptoSession = do -- in case we're using the same long term key on different IPs ... modifyTVar allsessionsByKey (Map.insert remotePublicKey (netCryptoSession:xs)) +data OutGoingResult a = OGSuccess a | OGFull | OGEncodeFail + deriving (Show) + +-- | Convert a message to packet format and append it to the front of a queue +-- used for outgoing messages. (Note that ‘front‛ usually means the higher +-- index in this implementation.) +-- +-- Called from 'runUponHandshake' and 'sendCrypto'. +-- +-- Whenever this is called, you should also send the resulting packet out on +-- the network. + +tryAppendQueueOutgoing :: STM (State, Nonce24, U.RangeMap TArray Word8 TVar) + -> NetCryptoOutQueue + -> CryptoMessage + -> STM (OutGoingResult (CryptoPacket Encrypted)) +tryAppendQueueOutgoing getExtra outq msg = do + pktno <- readTVar (nqPacketNo outq) + nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) + be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) + mbWire <- nqToWire outq getExtra nextno be pktno msg + case mbWire of + Just (payload,seqno) -> do + PB.grokPacket (nqPacketBuffer outq) (PacketSent seqno payload) + return $ OGSuccess payload + Nothing -> return OGEncodeFail + + runUponHandshake :: NetCryptoSession -> SockAddr -> NetCryptoOutQueue -> IO () runUponHandshake netCryptoSession0 addr pktoq = do dput XNetCrypto "(((((((runUponHandshake))))))) Launching threads" let sessions = ncAllSessions netCryptoSession0 - pktq = ncPacketQueue netCryptoSession0 + pktq = ncPacketBuffer netCryptoSession0 remotePublicKey = ncTheirPublicKey netCryptoSession0 crypto = transportCrypto sessions allsessions = netCryptoSessions sessions @@ -805,7 +850,7 @@ runUponHandshake netCryptoSession0 addr pktoq = do atomically $ writeTVar (ncDequeueThread netCryptoSession0) (Just tid) labelThread tid ("NetCryptoDequeue." ++ show (key2id remotePublicKey) ++ sidStr) fix $ \loop -> do - cd <- atomically $ PQ.dequeue pktq + cd <- atomically $ PB.awaitReadyPacket pktq if msgID (bufferData cd) == PacketRequest then do dput XNetCrypto $ "Dequeued::PacketRequest seqno=" ++ show (bufferStart cd) ++ " " ++ show (bufferData cd) @@ -824,19 +869,21 @@ runUponHandshake netCryptoSession0 addr pktoq = do labelThread tid ("NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr) fix $ \loop -> do atomically (readTVar (ncRequestInterval netCryptoSession0)) >>= threadDelay . (* 1000) - nums <- atomically $ PQ.getMissing pktq + (nums,seqno) <- atomically $ PB.packetNumbersToRequest pktq dput XNetCrypto $ "(Request Thread) Missing Packets detected:" ++ show nums - getOutGoingParam <- PQ.readyOutGoing pktoq - atomically $ do - seqno <- PQ.getLastDequeuedPlus1 pktq - ogresult <- PQ.tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) + getOutGoingParam <- nqToWireIO pktoq + x <- atomically $ do + ogresult <- tryAppendQueueOutgoing getOutGoingParam pktoq (createRequestPacket seqno nums) case ogresult of - PQ.OGSuccess _ -> return () + OGSuccess x -> return x _ -> retry + sendSessionPacket sessions addr x loop dput XNetCrypto $ "runUponHandshake: " ++ show threadid ++ " = NetCryptoRequest." ++ show (key2id remotePublicKey) ++ sidStr -- launch dequeueOutgoing thread + {- + -- TODO threadidOutgoing <- forkIO $ do tid <- myThreadId atomically $ writeTVar (ncDequeueOutGoingThread netCryptoSession0) (Just tid) @@ -846,7 +893,8 @@ runUponHandshake netCryptoSession0 addr pktoq = do dput XNetCrypto "NetCryptoDequeueOutgoing thread... Sending encrypted Packet" sendSessionPacket sessions addr pkt loop - dput XNetCrypto $ "runUponHandshake: " ++ show threadidOutgoing ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr + -} + dput XNetCrypto $ "runUponHandshake: " ++ show "threadidOutgoing" ++ " = NetCryptoDequeueOutgoing." ++ show (key2id remotePublicKey) ++ sidStr -- launch ping Machine thread pingMachine <- forkPingMachineDynamic ("NetCrypto." ++ show (key2id remotePublicKey) ++ sidStr) (ncIdleEvent netCryptoSession0) (ncTimeOut netCryptoSession0) @@ -859,10 +907,10 @@ runUponHandshake netCryptoSession0 addr pktoq = do labelThread tid ("NetCryptoPingSender." ++ show (key2id remotePublicKey) ++ sidStr) fix $ \loop -> do atomically (readTVar (ncAliveInterval netCryptoSession0)) >>= threadDelay . (* 1000) - dput XNetCrypto $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" + dput XNetCryptoOut $ "pingThread (session: " ++ show (ncSessionId netCryptoSession0) ++ ") Sending Alive(PING) Packet" lr <- sendPing crypto netCryptoSession0 case lr of - Left s -> dput XNetCrypto $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s + Left s -> dput XNetCryptoOut $ "(pingThread session: " ++ show (ncSessionId netCryptoSession0) ++ ") " ++ s Right _ -> return () loop @@ -972,7 +1020,7 @@ updateCryptoSession sessions addr newsession timestamp hp session handshake = do sessions newsession theirSessionPublic - (ncPacketQueue session) + (ncPacketBuffer session) (ncMyPacketNonce session) (ncOutgoingIdMap session) writeTVar (ncOutgoingQueue session) mbpktoq @@ -1055,7 +1103,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do Nothing -> do dput XNetCrypto "Dropping packet.. no session" return Nothing -- drop packet, we have no session - Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketQueue, ncHooks, + Just session@(NCrypto { ncIncomingTypeArray, ncState, ncPacketBuffer, ncHooks, ncSessionSecret, ncTheirSessionPublic, ncTheirBaseNonce, ncPingMachine, ncSessionId, ncStoredRequests}) -> do -- Unrecognized packets, try them thrice so as to give @@ -1130,9 +1178,10 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do isLossy (GrpMsg KnownLossy _) = True isLossy (Msg mid) | lossyness mid == Lossy = True isLossy _ = False + ack = bufferStart -- Earliest sequence number they've seen from us. if isLossy msgTypMapped then do dput XNetCrypto $ "enqueue ncPacketQueue Lossy " ++ show cm - atomically $ PQ.observeOutOfBand ncPacketQueue bufferEnd + atomically $ PB.grokPacket ncPacketBuffer (PacketReceivedLossy bufferEnd ack) runCryptoHook session (bufferData cd) else do dput XNetCrypto $ "enqueue ncPacketQueue Lossless " ++ show cm when (msgID cm == PING) $ @@ -1143,7 +1192,7 @@ sessionPacketH sessions addrRaw (CryptoPacket nonce16 encrypted) = do -- num <- CB.getNextSequenceNum ncStoredRequests -- CB.enqueue ncStoredRequests num cd handlePacketRequest session cd - atomically $ PQ.enqueue ncPacketQueue bufferEnd cd + atomically $ PB.grokPacket ncPacketBuffer (PacketReceived bufferEnd cd ack) return Nothing where last2Bytes :: Nonce24 -> Word16 @@ -1262,14 +1311,18 @@ sendCrypto crypto session updateLocal cm = do HaveHandshake outq <- atomically $ readTVar (ncOutgoingQueue session) -- XXX: potential race? if shared secret comes out of sync with cache? dput XNetCrypto "sendCrypto: enter " - getOutGoingParam <- PQ.readyOutGoing outq + getOutGoingParam <- nqToWireIO outq dput XNetCrypto "sendCrypto: got the io extra stuff" - atomically $ do - result <- PQ.tryAppendQueueOutgoing getOutGoingParam outq cm + r <- atomically $ do + result <- tryAppendQueueOutgoing getOutGoingParam outq cm case result of - PQ.OGSuccess x -> updateLocal >> return (Right x) - PQ.OGFull -> return (Left "Outgoing packet buffer is full") - PQ.OGEncodeFail -> return (Left "Failed to encode outgoing packet") + OGSuccess x -> updateLocal >> return (Right x) + OGFull -> return (Left "Outgoing packet buffer is full") + OGEncodeFail -> return (Left "Failed to encode outgoing packet") + case ncSockAddr session of + HaveDHTKey saddr -> mapM_ (sendSessionPacket (ncAllSessions session) saddr) r + _ -> return () + return r sendPing :: TransportCrypto -> NetCryptoSession -> IO (Either String (CryptoPacket Encrypted)) sendPing crypto session = do @@ -1327,8 +1380,16 @@ sendCryptoLossy crypto session updateLocal cm = do updateLocal return (Left errmsg) HaveHandshake outq -> do - getOutGoingParam <- PQ.readyOutGoing outq - mbPkt <- atomically $ PQ.peekPacket getOutGoingParam outq cm + getOutGoingParam <- nqToWireIO outq + mbPkt <- atomically $ do + pktno <- readTVar (nqPacketNo outq) + nextno <- PB.expectingSequenceNumber (nqPacketBuffer outq) + be <- PB.nextToSendSequenceNumber (nqPacketBuffer outq) + nqToWire outq getOutGoingParam -- See 'ncToWire' + nextno -- packet number we expect to recieve + be -- buffer_end (for lossy) + pktno -- packet number (for lossless) + cm case mbPkt of Nothing -> do let errmsg = "Error sending lossy packet! (sessionid: " ++ show (ncSessionId session) ++ ") " ++ show cm @@ -1466,11 +1527,10 @@ handlePacketRequest session (CryptoData { bufferStart=num mbOutQ <- atomically $ readTVar (ncOutgoingQueue session) case mbOutQ of HaveHandshake pktoq -> do - getOutGoingParam <-PQ.readyOutGoing pktoq - ps <- atomically $ PQ.getRequested getOutGoingParam pktoq num bs - let resend (Just (n,pkt)) = sendSessionPacket (ncAllSessions session) addr pkt - resend _ = return () - dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst $ catMaybes ps)) + getOutGoingParam <-nqToWireIO pktoq + ps <- atomically $ PB.retrieveForResend (nqPacketBuffer pktoq) $ PB.decompressSequenceNumbers num bs + let resend (n,pkt) = sendSessionPacket (ncAllSessions session) addr pkt + dput XNetCrypto ("Re-Sending Packets: " ++ show (map fst ps)) mapM_ resend ps _ -> return () -- cgit v1.2.3