From 4aeaf247a25fbe80598ce54e4142a707ec5b9951 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Tue, 4 Sep 2018 23:27:07 -0400 Subject: Tox Alternative sessions: Outgoing seqno/ack reversed. --- src/Network/Lossless.hs | 30 +++++++++++++++++++++++------- src/Network/SessionTransports.hs | 33 +++++++++++++++++---------------- src/Network/Tox/Session.hs | 36 ++++++++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 31 deletions(-) (limited to 'src') diff --git a/src/Network/Lossless.hs b/src/Network/Lossless.hs index 45241b6d..bdbeb3a2 100644 --- a/src/Network/Lossless.hs +++ b/src/Network/Lossless.hs @@ -10,6 +10,7 @@ import Data.Function import Data.Word import Data.PacketBuffer as PB +import DPut import Network.QueryResponse #ifdef THREAD_DEBUG @@ -24,11 +25,12 @@ data SequenceInfo = SequenceInfo } deriving (Eq,Ord,Show) -lossless :: (x -> addr -> IO (PacketInboundEvent (x',addr'))) +lossless :: Show addr => + (x -> addr -> IO (PacketInboundEvent (x',addr'))) -> (SequenceInfo -> x' -> addr' -> IO y) -> addr - -> TransportA err addr x y - -> IO ( Transport err addr' x' + -> TransportA String addr x y + -> IO ( Transport String addr' x' , [Word32] -> IO () , IO ([Word32],Word32) ) @@ -40,6 +42,7 @@ lossless isLossless encode saddr udp = do rloop <- forkIO $ fix $ \loop -> do -- This thread enqueues inbound packets or writes them to the oob -- channel. + myThreadId >>= flip labelThread ("lossless."++show saddr) awaitMessage udp $ \m -> do forM_ m $ \raw -> do m' <- mapM (uncurry isLossless) raw @@ -53,7 +56,9 @@ lossless isLossless encode saddr udp = do PB.grokInboundPacket pb event case event of PacketReceivedLossy {} -> writeTChan oob (Right $ peReceivedPayload event) - _ -> return () + _ -> do + report <- pbReport "enqueued" pb + writeTChan oob (Left report) loop let tr = Transport { awaitMessage = \kont -> do @@ -61,14 +66,22 @@ lossless isLossless encode saddr udp = do (do x <- readTChan oob return $ kont $! Just x) (do x <- PB.awaitReadyPacket pb - return $ kont $! Just (Right x)) + report <- pbReport "dequeued" pb + return $ do + dput XNetCrypto report + kont $! Just (Right x)) , sendMessage = \a' x' -> do seqno <- atomically $ do seqno <- PB.nextToSendSequenceNumber pb ack <- PB.expectingSequenceNumber pb return $ SequenceInfo seqno ack x <- encode seqno x' a' - atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) + (isfull,nn) <- atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) + when isfull $ do + dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) + atomically $ do + (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) + when isfull retry sendMessage udp saddr x , closeTransport = do killThread rloop @@ -76,5 +89,8 @@ lossless isLossless encode saddr udp = do } resend ns = do xs <- atomically $ retrieveForResend pb ns - mapM_ (sendMessage udp saddr . snd) xs + dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." + forM_ xs $ \x -> do + dput XNetCrypto $ shows saddr $ " <-- Resending packet." + sendMessage udp saddr . snd $ x return (tr, resend, atomically $ PB.packetNumbersToRequest pb) diff --git a/src/Network/SessionTransports.hs b/src/Network/SessionTransports.hs index 17763e4e..e9daf6c1 100644 --- a/src/Network/SessionTransports.hs +++ b/src/Network/SessionTransports.hs @@ -49,7 +49,7 @@ newSession :: Sessions raw -> (addr -> y -> IO raw) -> (SockAddr -> raw -> IO (Maybe (x, addr))) -> SockAddr - -> IO (Maybe (TransportA err addr x y)) + -> IO (Maybe (Int,TransportA err addr x y)) newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwrap wrap addr0 = do mvar <- newEmptyMVar let saddr = -- Canonical in case of 6-mapped-4 addresses. @@ -69,21 +69,22 @@ newSession Sessions{sessionsByAddr,sessionsById,sessionIds,sessionsSendRaw} unwr $ IntMap.singleton sid handlePacket return sid forM msid $ \sid -> do - return Transport - { awaitMessage = \kont -> do - x <- takeMVar mvar - kont $! Right <$> x - , sendMessage = \addr x -> do - x' <- unwrap addr x - sessionsSendRaw saddr x' - , closeTransport = do - tryTakeMVar mvar - putMVar mvar Nothing - atomically $ do - modifyTVar' sessionIds $ S.delete sid - modifyTVar' sessionsById $ IntMap.delete sid - modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr - } + let tr = Transport + { awaitMessage = \kont -> do + x <- takeMVar mvar + kont $! Right <$> x + , sendMessage = \addr x -> do + x' <- unwrap addr x + sessionsSendRaw saddr x' + , closeTransport = do + tryTakeMVar mvar + putMVar mvar Nothing + atomically $ do + modifyTVar' sessionIds $ S.delete sid + modifyTVar' sessionsById $ IntMap.delete sid + modifyTVar' sessionsByAddr $ Map.alter (rmSession sid) saddr + } + return (sid,tr) sessionHandler :: Sessions x -> (SockAddr -> x -> IO (Maybe (x -> x))) sessionHandler Sessions{sessionsByAddr} = \addr0 x -> do diff --git a/src/Network/Tox/Session.hs b/src/Network/Tox/Session.hs index a52e9478..88221b11 100644 --- a/src/Network/Tox/Session.hs +++ b/src/Network/Tox/Session.hs @@ -10,6 +10,7 @@ import Network.Socket import Crypto.Tox import Data.PacketBuffer (PacketInboundEvent (..)) import Data.Tox.Message +import DPut import Network.Lossless import Network.QueryResponse import Network.SessionTransports @@ -43,6 +44,7 @@ data Session = Session -- convenience, a lower bound for the numbers in the list is also -- returned. Suggested polling interval: a few seconds. , sTransport :: Transport String () CryptoMessage + , sSessionID :: Int } handshakeH :: SessionParams @@ -63,7 +65,11 @@ plainHandshakeH :: SessionParams -> IO () plainHandshakeH sp saddr skey handshake = do let hd = runIdentity $ handshakeData handshake + prelude = show saddr ++ " --> " + dput XNetCrypto $ prelude ++ "handshake: " ++ show (otherCookie hd, baseNonce hd) sent <- spGetSentHandshake sp skey saddr (handshakeCookie handshake) (otherCookie hd) + -- TODO: this is always returning sent = Nothing + dput XNetCrypto $ prelude ++ "plainHandshakeH: cached outgoing: " ++ show (fmap (baseNonce . snd) sent) forM_ sent $ \(hd_skey,hd_sent) -> do sk <- SessionKeys (spCrypto sp) hd_skey @@ -71,7 +77,8 @@ plainHandshakeH sp saddr skey handshake = do <$> atomically (newTVar $ baseNonce hd) <*> atomically (newTVar $ baseNonce hd_sent) m <- newSession (spSessions sp) (\() p -> return p) (decryptPacket sk) saddr - forM_ m $ \t -> do + dput XNetCrypto $ prelude ++ "plainHandshakeH: session " ++ maybe "Nothing" (const "Just") m + forM_ m $ \(sid, t) -> do (t2,resend,getMissing) <- lossless (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) (\seqno p _ -> encryptPacket sk $ bookKeeping seqno p) @@ -79,6 +86,7 @@ plainHandshakeH sp saddr skey handshake = do t let _ = t :: TransportA String () (CryptoPacket Identity) (CryptoPacket Encrypted) _ = t2 :: Transport String () CryptoMessage + sendMessage t2 () $ OneByte ONLINE spOnNewSession sp Session { sOurKey = skey , sTheirAddr = saddr @@ -87,6 +95,7 @@ plainHandshakeH sp saddr skey handshake = do , sResendPackets = resend , sMissingInbound = getMissing , sTransport = t2 + , sSessionID = sid } return () @@ -122,17 +131,28 @@ data SessionKeys = SessionKeys , skNonceOutgoing :: TVar Nonce24 -- +1 on every packet } + +-- From spec.md: +-- +-- Data in the encrypted packets: +-- +-- [our recvbuffers buffer_start, (highest packet number handled + 1), (big endian)] +-- [uint32_t packet number if lossless, sendbuffer buffer_end if lossy, (big endian)] +-- [data] + + bookKeeping :: SequenceInfo -> CryptoMessage -> CryptoData bookKeeping (SequenceInfo seqno ack) m = CryptoData - { bufferStart = seqno :: Word32 - , bufferEnd = ack :: Word32 + { bufferStart = ack :: Word32 + , bufferEnd = seqno :: Word32 , bufferData = m } checkLossless :: CryptoData -> PacketInboundEvent CryptoMessage -checkLossless CryptoData{ bufferStart = ack - , bufferEnd = no - , bufferData = x } = tag no x ack +checkLossless cd@CryptoData{ bufferStart = ack + , bufferEnd = no + , bufferData = x } = tag no x' ack where - tag = case lossyness (msgID x) of Lossy -> PacketReceivedLossy - _ -> PacketReceived + x' = decodeRawCryptoMsg cd + tag = case lossyness (msgID x') of Lossy -> PacketReceivedLossy + _ -> PacketReceived -- cgit v1.2.3