From 6f72701a1f67132649236513959791d8ff4a884f Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Thu, 16 Jan 2020 21:50:34 -0500 Subject: Improved netcrypto session cleanup. --- dht/ToxManager.hs | 4 +++- dht/src/Data/Tox/DHT/Multi.hs | 10 ++++++++-- dht/src/Network/Lossless.hs | 27 +++++++++++++++++---------- dht/src/Network/Tox/AggregateSession.hs | 27 +++++++++++++++------------ dht/src/Network/Tox/Session.hs | 6 ++++-- server/src/Network/StreamServer.hs | 5 ++++- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs index b0990430..25a4f0f2 100644 --- a/dht/ToxManager.hs +++ b/dht/ToxManager.hs @@ -709,6 +709,8 @@ forkAccountWatcher :: TVar (Map.Map Uniq24 AggregateSession) -> TCP.RelayClient -> Account JabberClients -> Tox JabberClients -> PresenceState Pending -> Announcer -> IO ThreadId forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do + let me = key2id $ toPublic $ userSecret acc + dput XMan $ "forkAccountWatcher(" ++ show me ++") started" myThreadId >>= flip labelThread ("online:" ++ show (key2id $ toPublic $ userSecret acc)) (chan,cs) <- atomically $ do @@ -738,10 +740,10 @@ forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do -- Stop tasks associated with each contact for this account. cs <- atomically $ readTVar (contacts acc) - let me = key2id $ toPublic $ userSecret acc forM_ (HashMap.toList cs) $ \(them,c) -> do stopConnecting tx (id2key them) "disabled account" closeSessions me them ssvar + dput XMan $ "forkAccountWatcher(" ++ show me ++") stopped" toxAnnounceInterval :: POSIXTime diff --git a/dht/src/Data/Tox/DHT/Multi.hs b/dht/src/Data/Tox/DHT/Multi.hs index d31ae4b8..878b47e6 100644 --- a/dht/src/Data/Tox/DHT/Multi.hs +++ b/dht/src/Data/Tox/DHT/Multi.hs @@ -13,7 +13,7 @@ import qualified Network.Tox.NodeId as UDP import qualified Network.Tox.TCP.NodeId as TCP import Data.Tox.Onion (OnionDestination,RouteId) import Data.Tox.Relay hiding (NodeInfo) -import Network.Address (either4or6) +import Network.Address as SockAddr (canonize) import Network.Tox.TCP as TCP (ViaRelay(..), tcpConnectionRequest_) import Network.QueryResponse as QR (Tagged(..), Client) @@ -88,7 +88,7 @@ untagOnion (OnionTCP :=> Identity o) = o -- Canonical in case of 6-mapped-4 addresses. canonize :: DSum S Identity -> DSum S Identity -canonize (SessionUDP :=> Identity saddr) = SessionUDP ==> either id id (either4or6 saddr) +canonize (SessionUDP :=> Identity saddr) = SessionUDP ==> SockAddr.canonize saddr canonize taddr = taddr type NodeInfo = DSum T Identity @@ -144,3 +144,9 @@ tcpConnectionRequest :: QR.Client err PacketNumber tid TCP.NodeInfo (Bool, Relay tcpConnectionRequest client pubkey ni = do mcon <- tcpConnectionRequest_ client pubkey ni return $ fmap (\conid -> TCP ==> ViaRelay (Just conid) (UDP.key2id pubkey) ni) mcon + +showSessionAddr :: SessionAddress -> String +showSessionAddr (SessionUDP :=> Identity udp) = + show (SockAddr.canonize udp) +showSessionAddr (SessionTCP :=> Identity (ViaRelay mcon _ tcp)) = + "TCP:" ++ maybe "" (\(ConId con) -> "(" ++ show con ++ ")") mcon ++ show tcp diff --git a/dht/src/Network/Lossless.hs b/dht/src/Network/Lossless.hs index 5a313aed..079f4d07 100644 --- a/dht/src/Network/Lossless.hs +++ b/dht/src/Network/Lossless.hs @@ -9,6 +9,7 @@ {-# LANGUAGE TupleSections #-} module Network.Lossless where +import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Monad import Control.Monad.STM @@ -42,7 +43,8 @@ data OutgoingInfo y = OutgoingInfo -- | Obtain a reliable transport form an unreliable one. lossless :: Show addr => - (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. + String -- ^ Label for debugging. + -> (x -> addr -> IO (PacketInboundEvent (x',addr'))) -- ^ Used to classify newly arrived packets. -> (SequenceInfo -> x' -> addr' -> IO (OutgoingInfo y)) -- ^ Used to encode and classify outbound packets. -> addr -- ^ The remote address for this session. -> TransportA String addr x y -- ^ An unreliable lossy transport. @@ -51,7 +53,7 @@ lossless :: Show addr => , [Word32] -> IO () -- ^ Use this to request lost packets be re-sent. , IO ([Word32],Word32) -- ^ Use this to discover missing packets to request. ) -lossless isLossless encode saddr udp = do +lossless lbl isLossless encode saddr udp = do pb <- atomically newPacketBuffer oob <- atomically newTChan -- Out-of-band channel, these packets (or -- errors) bypass the packet buffer to be @@ -80,11 +82,14 @@ lossless isLossless encode saddr udp = do report <- pbReport "enqueued" pb writeTChan oob (ParseError report) loop - labelThread rloop ("lossless."++show saddr) + labelThread rloop ("lossless."++lbl) + term <- newTVarIO retry -- In case awaitMessage is called multiple times beyond termination, + -- we will use this STM action stop it from waiting on the oob TChan. + -- XXX: This shouldn't be neccessary and might be costly. let tr = Transport { awaitMessage = \kont -> orElse - (do x <- readTChan oob + (do x <- readTChan oob `orElse` join (readTVar term) return $ kont $! x) (do x <- PB.awaitReadyPacket pb report <- pbReport "dequeued" pb @@ -100,13 +105,13 @@ lossless isLossless encode saddr udp = do (isfull,nn) <- if islossy then do - dput XNetCrypto $ shows saddr $ " <-- Lossy packet " ++ show seqno + dput XNetCrypto $ mappend lbl $ " <-- Lossy packet " ++ show seqno return (False,(0,0)) -- avoid updating seqno on lossy packets. else do - dput XNetCrypto $ shows saddr $ " <-- Lossless packet " ++ show seqno + dput XNetCrypto $ mappend lbl $ " <-- Lossless packet " ++ show seqno atomically $ PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull $ do - dput XNetCrypto $ shows saddr $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) + dput XNetCrypto $ mappend lbl $ " <-- Outbound queue is full! Retrying... " ++ show (nn,seqno) atomically $ do (isfull,_) <- PB.grokOutboundPacket pb (PacketSent (sequenceNumber seqno) x) when isfull retry @@ -114,14 +119,16 @@ lossless isLossless encode saddr udp = do maybe sendit (catchIOError sendit) oops , setActive = \case False -> do - atomically $ writeTChan oob Terminated -- quit rloop thread + atomically $ do + writeTChan oob Terminated -- quit rloop thread + writeTVar term (return Terminated) setActive udp False True -> return () } resend ns = do xs <- atomically $ retrieveForResend pb ns - dput XNetCrypto $ shows saddr $ " <-- Resending " ++ show (length xs) ++ " packets." + dput XNetCrypto $ mappend lbl $ " <-- Resending " ++ show (length xs) ++ " packets." forM_ xs $ \x -> do - dput XNetCrypto $ shows saddr $ " <-- Resending packet." + dput XNetCrypto $ mappend lbl $ " <-- Resending packet." sendMessage udp saddr . snd $ x return (tr, resend, atomically $ PB.packetNumbersToRequest pb) diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs index 9a784291..d1f42e91 100644 --- a/dht/src/Network/Tox/AggregateSession.hs +++ b/dht/src/Network/Tox/AggregateSession.hs @@ -24,6 +24,7 @@ module Network.Tox.AggregateSession import Control.Concurrent.STM import Control.Concurrent.STM.TMChan +import Control.Exception import Control.Monad import Data.Dependent.Sum import Data.Function @@ -114,7 +115,7 @@ data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. | DoRequestMissing -- ^ Detect and request lost packets. deriving Enum --- | This call loops until the provided sesison is closed or times out. It +-- | This call loops until the provided session is closed or times out. It -- monitors the provided (non-empty) priority queue for scheduled tasks (see -- 'KeepAliveEvents') to perform for the connection. keepAlive :: Session -> TVar (PSQ POSIXTime) -> IO () @@ -142,23 +143,23 @@ keepAlive s q = do now <- getPOSIXTime atomically $ modifyTVar' q $ PSQ.insert (fromEnum DoTimeout) now - re tm again e io = do + re tm e io = do io atomically $ modifyTVar' q $ PSQ.insert (fromEnum e) tm - again doEvent again now e = case e of DoTimeout -> do dput XNetCrypto $ "TIMEOUT: " ++ show (sSessionID s) sClose s - DoAlive -> re (now + 10) again e doAlive - DoRequestMissing -> re (now + 5) again e doRequestMissing -- tox-core does this at 1 second intervals + DoAlive -> re (now + 10) e doAlive >> again + DoRequestMissing -> re (now + 5{- toxcore uses 1sec -}) e doRequestMissing >> again fix $ \again -> do now <- getPOSIXTime join $ atomically $ do PSQ.findMin <$> readTVar q >>= \case - Nothing -> error "keepAlive: unexpected empty PSQ." + Nothing -> return $ do dput XUnexpected "keepAlive: unexpected empty PSQ." + sClose s Just ( k :-> tm ) -> return $ if now < tm then threadDelay (toMicroseconds $ tm - now) >> again else doEvent again now (toEnum k) @@ -202,7 +203,7 @@ forkSession c s setStatus = forkIO $ do atomically $ do setStatus Established sendPacket online bump - beacon <- forkIO $ keepAlive s q + beacon <- forkIO $ keepAlive s q `finally` sClose s awaitPacket $ \awaitNext x -> do bump case msgID x of @@ -223,7 +224,7 @@ forkSession c s setStatus = forkIO $ do -- one active session). addSession :: AggregateSession -> Session -> IO AddResult addSession c s = do - (result,mcon,replaced) <- atomically $ do + (result,mcon,rejected) <- atomically $ do let them = sTheirUserKey s me = toPublic $ sOurKey s compat <- checkCompatible me them c @@ -232,7 +233,7 @@ addSession c s = do Just True -> AddedSession Just False -> RejectedSession case result of - RejectedSession -> return (result,Nothing,Nothing) + RejectedSession -> return (result,Nothing,Just s) _ -> do statvar <- newTVar Dormant imap <- readTVar (contactSession c) @@ -240,9 +241,9 @@ addSession c s = do s0 = IntMap.lookup (sSessionID s) imap imap' = IntMap.insert (sSessionID s) con imap writeTVar (contactSession c) imap' - return (result,Just con,s0) + return (result,Just con,singleSession <$> s0) - mapM_ (sClose . singleSession) replaced + mapM_ sClose rejected forM_ (mcon :: Maybe SingleCon) $ \con -> forkSession c s $ \progress -> do status0 <- aggregateStatus c @@ -313,7 +314,9 @@ closeAll :: AggregateSession -> IO () closeAll c = join $ atomically $ do imap <- readTVar (contactSession c) closeTMChan (contactChannel c) - return $ forM_ (IntMap.keys imap) $ \sid -> delSession c sid + return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do + sClose s + delSession c sid -- | Query the current status of the aggregate, there are three possible -- values: diff --git a/dht/src/Network/Tox/Session.hs b/dht/src/Network/Tox/Session.hs index ff86e502..9bd12c69 100644 --- a/dht/src/Network/Tox/Session.hs +++ b/dht/src/Network/Tox/Session.hs @@ -27,7 +27,7 @@ import Network.Lossless import Network.QueryResponse import Network.SessionTransports import Network.Tox.Crypto.Transport -import Network.Tox.DHT.Transport (Cookie (..), key2id, longTermKey) +import Network.Tox.DHT.Transport (Cookie (..), key2id, longTermKey, CookieData (..)) import Network.Tox.Handshake -- | Alias for 'SecretKey' to document that it is used as the temporary Tox @@ -128,6 +128,7 @@ plainHandshakeH sp saddr0 skey handshake = do -- TODO: this is always returning sent = Nothing dput XNetCrypto $ " <-- (cached) handshake baseNonce " ++ show (fmap (baseNonce . snd . snd) sent) forM_ sent $ \(saddr, (hd_skey,hd_sent)) -> do + let Cookie _ (Identity CookieData{ longTermKey = them }) = handshakeCookie handshake sk <- SessionKeys (spCrypto sp) hd_skey (sessionKey hd) @@ -137,7 +138,8 @@ plainHandshakeH sp saddr0 skey handshake = do dput XNetCrypto $ prelude ++ "plainHandshakeH: session " ++ maybe "Nothing" (const "Just") m forM_ m $ \(sid, t) -> do (t2,resend,getMissing) - <- lossless (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) + <- lossless (take 8 (showKey256 them) ++ "." ++ Multi.showSessionAddr saddr) + (\cp a -> return $ fmap (,a) $ checkLossless $ runIdentity $ pktData cp) (\seqno p@(Pkt m :=> _) _ -> do y <- encryptPacket sk $ bookKeeping seqno p return OutgoingInfo diff --git a/server/src/Network/StreamServer.hs b/server/src/Network/StreamServer.hs index 8ebdf678..eda5212f 100644 --- a/server/src/Network/StreamServer.hs +++ b/server/src/Network/StreamServer.hs @@ -161,8 +161,11 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do let conkey = n + 1 laddr <- Socket.getSocketName con h <- socketToHandle con ReadWriteMode + let tlbl = case canonize laddr of + SockAddrUnix {} -> show laddr ++ "," ++ show n + _ -> show raddr forkIO $ do - myThreadId >>= flip labelThread ("stream.session." ++ show (canonize raddr)) + myThreadId >>= flip labelThread ("stream.session." ++ tlbl) serverSession cfg (restrictHandleSocket h con, (Local laddr, Remote raddr)) conkey h acceptLoop cfg sock (n + 1) -- cgit v1.2.3