From bc8360faf51a058f3c1dd90145d4b87506e2ddfe Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 13 Jun 2018 16:11:11 -0400 Subject: tox: Associate a state with each connection management thread. --- Connection/Tox.hs | 74 +++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 10 deletions(-) (limited to 'Connection/Tox.hs') diff --git a/Connection/Tox.hs b/Connection/Tox.hs index c8ce9a53..f6f15f3c 100644 --- a/Connection/Tox.hs +++ b/Connection/Tox.hs @@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif +import GHC.Conc (threadStatus,ThreadStatus(..)) + @@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-} instance Show Key where show = show . showKey_ - +{- -- | A conneciton status that is tagged with a state type that is specific to -- the status. data Transient a where @@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket untag (IsEstablished :=> _) = G.Established +-} + +data StatefulTask st = StatefulTask + { taskThread :: ThreadId + , taskState :: TVar st + } + +launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st) +launch lbl st f = do + stvar <- newTVarIO st + tid <- forkIO (f $ writeTVar stvar) + labelThread tid lbl + return $ StatefulTask tid stvar +data SessionTasks = SessionTasks + { accepting :: StatefulTask (G.Status ToxProgress) + , persuing :: StatefulTask (G.Status ToxProgress) + , refreshing :: StatefulTask (G.Status ToxProgress) + } + data SessionState = SessionState - { transient :: TVar (DSum Transient Identity) - , connPolicy :: TVar Policy + { connPolicy :: TVar Policy , connPingLogic :: PingMachine + , sessionTasks :: TVar SessionTasks + -- , transient :: TVar (DSum Transient Identity) } sessionStatus :: SessionState -> G.Connection ToxProgress sessionStatus st = G.Connection - { G.connStatus = untag <$> readTVar (transient st) + { G.connStatus = combinedStatus =<< readTVar (sessionTasks st) , G.connPolicy = readTVar (connPolicy st) , G.connPingLogic = connPingLogic st } +combinedStatus :: SessionTasks -> STM (G.Status ToxProgress) +combinedStatus tasks = do + a <- readTVar (taskState $ accepting tasks) + p <- readTVar (taskState $ persuing tasks) + r <- readTVar (taskState $ refreshing tasks) + return $ maximum [a,p,r] + lookupForPolicyChange :: TVar (Map.Map Key SessionState) -> Key -> Policy -> IO (Maybe SessionState) lookupForPolicyChange conmap k policy = atomically $ do @@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of mst <- lookupForPolicyChange conmap k policy forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st - --TODO accept_thread may already be started if policy was OpenToConnect - accept_thread <- forkIO $ acceptContact getPolicy _accept_methods - persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods - freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods + tasks <- atomically $ readTVar (sessionTasks st) + --TODO This check to determine whether to launch the accepting thread + --is probably racey. + astat <- threadStatus (taskThread $ accepting tasks) + accepting <- if astat /= ThreadRunning + then launch ("accept:"++show k) + (G.InProgress $ toEnum 0) + $ acceptContact getPolicy _accept_methods + else return $ accepting tasks + persuing <- launch ("persue:"++show k) + (G.InProgress $ toEnum 0) + $ persueContact getPolicy _get_status _persue_methods + refreshing <- launch ("refresh:"++show k) + (G.InProgress $ toEnum 0) + $ freshenContact getPolicy _get_status _freshen_methods + atomically $ writeTVar (sessionTasks st) + $ SessionTasks accepting persuing refreshing return () return () RefusingToConnect -> do -- disconnect or cancel any pending connection mst <- lookupForPolicyChange conmap k policy -- Since the 3 connection threads poll the current policy, they should -- all terminate on their own. - return () + -- + -- Here we block until they finish. + forM_ mst $ \st -> do + atomically $ do + tasks <- readTVar (sessionTasks st) + a <- readTVar $ taskState (accepting tasks) + p <- readTVar $ taskState (persuing tasks) + r <- readTVar $ taskState (refreshing tasks) + case (a,p,r) of + (G.Dormant,G.Dormant,G.Dormant) -> return () + _ -> retry OpenToConnect -> do -- passively accept connections if they initiate. mst <- lookupForPolicyChange conmap k policy forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st - accept_thread <- forkIO $ acceptContact getPolicy _accept_methods + accept_thread <- launch ("accept:"++show k) + (G.InProgress $ toEnum 0) + $ acceptContact getPolicy _accept_methods return () -- cgit v1.2.3