From bc8360faf51a058f3c1dd90145d4b87506e2ddfe Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 13 Jun 2018 16:11:11 -0400 Subject: tox: Associate a state with each connection management thread. --- Connection/Tox.hs | 74 ++++++++++++++++++++++++++++++++++++++++------- Connection/Tox/Threads.hs | 50 ++++++++++++++++++++++---------- 2 files changed, 99 insertions(+), 25 deletions(-) (limited to 'Connection') diff --git a/Connection/Tox.hs b/Connection/Tox.hs index c8ce9a53..f6f15f3c 100644 --- a/Connection/Tox.hs +++ b/Connection/Tox.hs @@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif +import GHC.Conc (threadStatus,ThreadStatus(..)) + @@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-} instance Show Key where show = show . showKey_ - +{- -- | A conneciton status that is tagged with a state type that is specific to -- the status. data Transient a where @@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket untag (IsEstablished :=> _) = G.Established +-} + +data StatefulTask st = StatefulTask + { taskThread :: ThreadId + , taskState :: TVar st + } + +launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st) +launch lbl st f = do + stvar <- newTVarIO st + tid <- forkIO (f $ writeTVar stvar) + labelThread tid lbl + return $ StatefulTask tid stvar +data SessionTasks = SessionTasks + { accepting :: StatefulTask (G.Status ToxProgress) + , persuing :: StatefulTask (G.Status ToxProgress) + , refreshing :: StatefulTask (G.Status ToxProgress) + } + data SessionState = SessionState - { transient :: TVar (DSum Transient Identity) - , connPolicy :: TVar Policy + { connPolicy :: TVar Policy , connPingLogic :: PingMachine + , sessionTasks :: TVar SessionTasks + -- , transient :: TVar (DSum Transient Identity) } sessionStatus :: SessionState -> G.Connection ToxProgress sessionStatus st = G.Connection - { G.connStatus = untag <$> readTVar (transient st) + { G.connStatus = combinedStatus =<< readTVar (sessionTasks st) , G.connPolicy = readTVar (connPolicy st) , G.connPingLogic = connPingLogic st } +combinedStatus :: SessionTasks -> STM (G.Status ToxProgress) +combinedStatus tasks = do + a <- readTVar (taskState $ accepting tasks) + p <- readTVar (taskState $ persuing tasks) + r <- readTVar (taskState $ refreshing tasks) + return $ maximum [a,p,r] + lookupForPolicyChange :: TVar (Map.Map Key SessionState) -> Key -> Policy -> IO (Maybe SessionState) lookupForPolicyChange conmap k policy = atomically $ do @@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of mst <- lookupForPolicyChange conmap k policy forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st - --TODO accept_thread may already be started if policy was OpenToConnect - accept_thread <- forkIO $ acceptContact getPolicy _accept_methods - persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods - freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods + tasks <- atomically $ readTVar (sessionTasks st) + --TODO This check to determine whether to launch the accepting thread + --is probably racey. + astat <- threadStatus (taskThread $ accepting tasks) + accepting <- if astat /= ThreadRunning + then launch ("accept:"++show k) + (G.InProgress $ toEnum 0) + $ acceptContact getPolicy _accept_methods + else return $ accepting tasks + persuing <- launch ("persue:"++show k) + (G.InProgress $ toEnum 0) + $ persueContact getPolicy _get_status _persue_methods + refreshing <- launch ("refresh:"++show k) + (G.InProgress $ toEnum 0) + $ freshenContact getPolicy _get_status _freshen_methods + atomically $ writeTVar (sessionTasks st) + $ SessionTasks accepting persuing refreshing return () return () RefusingToConnect -> do -- disconnect or cancel any pending connection mst <- lookupForPolicyChange conmap k policy -- Since the 3 connection threads poll the current policy, they should -- all terminate on their own. - return () + -- + -- Here we block until they finish. + forM_ mst $ \st -> do + atomically $ do + tasks <- readTVar (sessionTasks st) + a <- readTVar $ taskState (accepting tasks) + p <- readTVar $ taskState (persuing tasks) + r <- readTVar $ taskState (refreshing tasks) + case (a,p,r) of + (G.Dormant,G.Dormant,G.Dormant) -> return () + _ -> retry OpenToConnect -> do -- passively accept connections if they initiate. mst <- lookupForPolicyChange conmap k policy forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st - accept_thread <- forkIO $ acceptContact getPolicy _accept_methods + accept_thread <- launch ("accept:"++show k) + (G.InProgress $ toEnum 0) + $ acceptContact getPolicy _accept_methods return () diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs index dcee37d1..2ff058b3 100644 --- a/Connection/Tox/Threads.hs +++ b/Connection/Tox/Threads.hs @@ -102,26 +102,33 @@ data AcceptContactMethods = AcceptContactMethods -- | Invokes an STM action on each incoming handshake. -- -- Does not return until getPolicy yields RefusingToConnect. -acceptContact :: STM Policy -> AcceptContactMethods -> IO () -acceptContact getPolicy AcceptContactMethods{..} = fix $ \loop -> do +acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO () +acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do join $ atomically $ do orElse (getPolicy >>= \case - RefusingToConnect -> return $ return () -- QUIT Dormant/Established + RefusingToConnect -> do writeState Dormant + return $ return () -- QUIT Dormant/Established _ -> retry) (do hs <- getHandshake handshakeIsSuitable hs >>= \case True -> do -- Here we allocate a NetCrypto session for handling CryptoPacket. + writeState (InProgress AwaitingSessionPacket) transitionToState (InProgress AwaitingSessionPacket) return loop False -> return loop) -whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> ((Int -> IO ()) -> STM (IO ())) -> IO () -whileTryingAndNotEstablished getPolicy getStatus body = fix $ \loop -> do +whileTryingAndNotEstablished :: STM Policy + -> STM (Status t) + -> (Status ToxProgress -> STM ()) + -> ((Int -> IO ()) -> STM (IO ())) + -> IO () +whileTryingAndNotEstablished getPolicy getStatus writeStatus body = fix $ \loop -> do let retryWhileTrying k = getPolicy >>= \case TryingToConnect -> retry - _ -> return k + _ -> do writeStatus Dormant + return k ifEstablished t e = getStatus >>= \case Established -> t _ -> e @@ -147,17 +154,23 @@ data PersueContactMethods params = PersueContactMethods -- -- As long as getPolicy is TryingToConnect and there is no established -- connection, this function will continue. -persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods a -> IO () -persueContact getPolicy getStatus PersueContactMethods{..} - = whileTryingAndNotEstablished getPolicy getStatus +persueContact :: STM Policy + -> STM (Status t) + -> PersueContactMethods a + -> (Status ToxProgress -> STM ()) + -> IO () +persueContact getPolicy getStatus PersueContactMethods{..} writeStatus + = whileTryingAndNotEstablished getPolicy getStatus writeStatus $ \retryAfterTimeout -> do -- AwaitingDHTKey -- AcquiringIPAddress params <- getHandshakeParams + writeStatus (InProgress AcquiringCookie) return $ do -- AcquiringCookie -- AwaitingHandshake -- AwaitingSessionPacket sendHandshake params + atomically $ writeStatus $ InProgress AwaitingHandshake retryAfterTimeout retryInterval data FreshenContactMethods = FreshenContactMethods @@ -189,16 +202,20 @@ data FreshenContactMethods = FreshenContactMethods -- -- As long as getPolicy is TryingToConnect and there is no established -- connection, this function will continue. -freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> IO () -freshenContact getPolicy getStatus FreshenContactMethods{..} - = whileTryingAndNotEstablished getPolicy getStatus +freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods + -> (Status ToxProgress -> STM ()) + -> IO () +freshenContact getPolicy getStatus FreshenContactMethods{..} writeStatus + = whileTryingAndNotEstablished getPolicy getStatus writeStatus -- retryAfterTimeout :: Int -> IO () $ \retryAfterTimeout -> getDHTKey >>= \case Nothing -> -- AwaitingDHTKey retry - Just dk -> getSockAddr >>= return . \case - Nothing -> -- AcquiringIPAddress + Just dk -> getSockAddr >>= \case + Nothing -> do -- AcquiringIPAddress + writeStatus (InProgress AcquiringIPAddress) + return $ do bkts <- atomically $ getBuckets st <- search nodeSch bkts dk $ \r -> do -- TODO: store saddr, check for finish @@ -206,7 +223,10 @@ freshenContact getPolicy getStatus FreshenContactMethods{..} atomically $ searchIsFinished st >>= check -- TODO: searchCancel on stop condition retryAfterTimeout sockAddrInterval - Just a -> -- AcquiringCookie + Just a -> do + writeStatus (InProgress AcquiringCookie) + return $ + -- AcquiringCookie -- AwaitingHandshake -- AwaitingSessionPacket do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 -- cgit v1.2.3