From 17c32ae0dcb1c10cfd3b82ceaf10ca5d3990b10b Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 17 Jan 2020 00:22:08 -0500 Subject: Discard unusable Dormant AggregateSession objects. --- dht/ToxManager.hs | 20 +++++++++++++------- dht/examples/dhtd.hs | 19 +++++++++++++------ dht/src/Network/Tox/AggregateSession.hs | 8 +++++++- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/dht/ToxManager.hs b/dht/ToxManager.hs index 25a4f0f2..126efa36 100644 --- a/dht/ToxManager.hs +++ b/dht/ToxManager.hs @@ -134,7 +134,8 @@ toxman ssvar announcer tox presence = ToxManager toxAnnounceInterval) pub - forkAccountWatcher ssvar + forkIO $ do + accountWatcher ssvar (TCP.tcpClient $ tcpKademliaClient $ toxOnionRoutes tox) acnt tox @@ -155,6 +156,11 @@ toxman ssvar announcer tox presence = ToxManager -- Remove this xmpp client /k/ from the set holding this -- account active. modifyTVar' (accountExtra acnt) $ Map.delete k + is_last <- Map.null <$> readTVar (accountExtra acnt) + when is_last $ do + -- Forget secret key if this was the last client. + -- This ensures that incoming connections are rejected. + modifyTVar' accounts $ HashMap.delete pubid return rs return $ if (Map.null $ Map.delete k refs) then @@ -165,7 +171,7 @@ toxman ssvar announcer tox presence = ToxManager let Just pubid = mpubid pub = Tox.id2key pubid -- Stop the announce-toxid task for this account. Note that other - -- announced tasks will be stopped by the forkAccountWatcher thread + -- announced tasks will be stopped by the accountWatcher thread -- when it terminates. cancel announcer akey @@ -705,12 +711,12 @@ closeSessions me them ssvar = do -- Just True <- checkCompatible ag (id2key me) (id2key them) closeAll ag -forkAccountWatcher :: TVar (Map.Map Uniq24 AggregateSession) +accountWatcher :: TVar (Map.Map Uniq24 AggregateSession) -> TCP.RelayClient - -> Account JabberClients -> Tox JabberClients -> PresenceState Pending -> Announcer -> IO ThreadId -forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do + -> Account JabberClients -> Tox JabberClients -> PresenceState Pending -> Announcer -> IO () +accountWatcher ssvar tcp acc tox st announcer = do let me = key2id $ toPublic $ userSecret acc - dput XMan $ "forkAccountWatcher(" ++ show me ++") started" + dput XMan $ "accountWatcher(" ++ show me ++") started" myThreadId >>= flip labelThread ("online:" ++ show (key2id $ toPublic $ userSecret acc)) (chan,cs) <- atomically $ do @@ -743,7 +749,7 @@ forkAccountWatcher ssvar tcp acc tox st announcer = forkIO $ do forM_ (HashMap.toList cs) $ \(them,c) -> do stopConnecting tx (id2key them) "disabled account" closeSessions me them ssvar - dput XMan $ "forkAccountWatcher(" ++ show me ++") stopped" + dput XMan $ "accountWatcher(" ++ show me ++") stopped" toxAnnounceInterval :: POSIXTime diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index f4d04761..de315e35 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs @@ -1326,17 +1326,24 @@ onNewToxSession :: (IO () -> STM ()) -> Tox.Session -> IO () onNewToxSession runio sv ssvar invc ContactInfo{accounts} addrTox netcrypto = do + dput XMan "onNewToxSession" let them s = Tox.longTermKey $ runIdentity cookie -- remote tox key where Tox.Cookie _ cookie = Tox.handshakeCookie (sReceivedHandshake s) me s = toPublic $ sOurKey s - onStatusChange :: (Tox.Session -> Tcp.ConnectionEvent XML.Event -> STM ()) + uniqkey <- xor24 <$> hash24 (them netcrypto) <*> hash24 (me netcrypto) + + let onStatusChange :: (Tox.Session -> Tcp.ConnectionEvent XML.Event -> STM ()) -> AggregateSession -> Tox.Session -> Status Tox.ToxProgress -> STM () onStatusChange announce c s Established = onConnect announce c s onStatusChange announce _ s status = onEOF announce s status onEOF announce s status = do + case status of + Dormant -> -- Dormant AggregateSession is useless, so discard it. + modifyTVar' ssvar $ Map.delete uniqkey + _ -> return () runio $ dput XMan $ "EOF(" ++ take 16 (showKey256 $ them s) ++ "): " ++ show status HashMap.lookup (Tox.key2id $ me s) <$> readTVar accounts >>= mapM_ (setTerminated $ them s) @@ -1364,12 +1371,10 @@ onNewToxSession runio sv ssvar invc ContactInfo{accounts} addrTox netcrypto = do Chunk x -> Just (Nothing,x)) .| toxSnk - uniqkey <- xor24 <$> hash24 (them netcrypto) <*> hash24 (me netcrypto) - let me_dot_tox = xmppHostname $ me netcrypto them_dot_tox = xmppHostname $ them netcrypto - c <- atomically $ do + c <- join $ atomically $ do mc <- Map.lookup uniqkey <$> readTVar ssvar case mc of Nothing -> do @@ -1387,8 +1392,10 @@ onNewToxSession runio sv ssvar invc ContactInfo{accounts} addrTox netcrypto = do return $ \s e -> writeTChan (xmppEventChannel sv) ( (ck, condta s), e) c <- newAggregateSession $ onStatusChange announce modifyTVar' ssvar $ Map.insert uniqkey c - return c - Just c -> return c + return $ do + dput XMan $ "New AggregateSession!" + return c + Just c -> return $ return c addSession c netcrypto diff --git a/dht/src/Network/Tox/AggregateSession.hs b/dht/src/Network/Tox/AggregateSession.hs index d1f42e91..feb634f0 100644 --- a/dht/src/Network/Tox/AggregateSession.hs +++ b/dht/src/Network/Tox/AggregateSession.hs @@ -31,6 +31,7 @@ import Data.Function import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import Data.List +import Data.Maybe import Data.Time.Clock.POSIX import System.IO.Error @@ -108,6 +109,7 @@ newAggregateSession notify = do data AddResult = FirstSession -- ^ Initial connection with this contact. | AddedSession -- ^ Added another connection to active session. | RejectedSession -- ^ Failed to add session (wrong contact / closed session). + deriving (Eq,Show) -- | The 'keepAlive' thread juggles three scheduled tasks. data KeepAliveEvents = DoTimeout -- ^ A session timed-out, close it. @@ -244,7 +246,9 @@ addSession c s = do return (result,Just con,singleSession <$> s0) mapM_ sClose rejected - forM_ (mcon :: Maybe SingleCon) $ \con -> + when (isNothing mcon) $ dput XMan "addSession: Rejected session!" + forM_ (mcon :: Maybe SingleCon) $ \con -> do + dput XMan $ "addSession: forkSession! " ++ show result forkSession c s $ \progress -> do status0 <- aggregateStatus c writeTVar (singleStatus con) progress @@ -314,6 +318,8 @@ closeAll :: AggregateSession -> IO () closeAll c = join $ atomically $ do imap <- readTVar (contactSession c) closeTMChan (contactChannel c) + forM_ (listToMaybe $ IntMap.elems imap) $ \(SingleCon s _) -> do + notifyState c c s Dormant return $ forM_ (IntMap.toList imap) $ \(sid,SingleCon s _) -> do sClose s delSession c sid -- cgit v1.2.3