diff options
author | joe <joe@jerkface.net> | 2018-06-13 16:11:11 -0400 |
---|---|---|
committer | joe <joe@jerkface.net> | 2018-06-13 16:43:54 -0400 |
commit | bc8360faf51a058f3c1dd90145d4b87506e2ddfe (patch) | |
tree | 7dbf64254ae6690b124d11df9fd8fd5e28ef166e /Connection/Tox.hs | |
parent | fd3f604f8961d7c7db48015c9ed7be40ea872a7c (diff) |
tox: Associate a state with each connection management thread.
Diffstat (limited to 'Connection/Tox.hs')
-rw-r--r-- | Connection/Tox.hs | 74 |
1 files changed, 64 insertions, 10 deletions
diff --git a/Connection/Tox.hs b/Connection/Tox.hs index c8ce9a53..f6f15f3c 100644 --- a/Connection/Tox.hs +++ b/Connection/Tox.hs | |||
@@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument | |||
19 | import Control.Concurrent.Lifted | 19 | import Control.Concurrent.Lifted |
20 | import GHC.Conc (labelThread) | 20 | import GHC.Conc (labelThread) |
21 | #endif | 21 | #endif |
22 | import GHC.Conc (threadStatus,ThreadStatus(..)) | ||
23 | |||
22 | 24 | ||
23 | 25 | ||
24 | 26 | ||
@@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-} | |||
35 | 37 | ||
36 | instance Show Key where show = show . showKey_ | 38 | instance Show Key where show = show . showKey_ |
37 | 39 | ||
38 | 40 | {- | |
39 | -- | A conneciton status that is tagged with a state type that is specific to | 41 | -- | A conneciton status that is tagged with a state type that is specific to |
40 | -- the status. | 42 | -- the status. |
41 | data Transient a where | 43 | data Transient a where |
@@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie | |||
56 | untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake | 58 | untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake |
57 | untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket | 59 | untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket |
58 | untag (IsEstablished :=> _) = G.Established | 60 | untag (IsEstablished :=> _) = G.Established |
61 | -} | ||
62 | |||
63 | data StatefulTask st = StatefulTask | ||
64 | { taskThread :: ThreadId | ||
65 | , taskState :: TVar st | ||
66 | } | ||
67 | |||
68 | launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st) | ||
69 | launch lbl st f = do | ||
70 | stvar <- newTVarIO st | ||
71 | tid <- forkIO (f $ writeTVar stvar) | ||
72 | labelThread tid lbl | ||
73 | return $ StatefulTask tid stvar | ||
59 | 74 | ||
60 | 75 | ||
76 | data SessionTasks = SessionTasks | ||
77 | { accepting :: StatefulTask (G.Status ToxProgress) | ||
78 | , persuing :: StatefulTask (G.Status ToxProgress) | ||
79 | , refreshing :: StatefulTask (G.Status ToxProgress) | ||
80 | } | ||
81 | |||
61 | data SessionState = SessionState | 82 | data SessionState = SessionState |
62 | { transient :: TVar (DSum Transient Identity) | 83 | { connPolicy :: TVar Policy |
63 | , connPolicy :: TVar Policy | ||
64 | , connPingLogic :: PingMachine | 84 | , connPingLogic :: PingMachine |
85 | , sessionTasks :: TVar SessionTasks | ||
86 | -- , transient :: TVar (DSum Transient Identity) | ||
65 | } | 87 | } |
66 | 88 | ||
67 | sessionStatus :: SessionState -> G.Connection ToxProgress | 89 | sessionStatus :: SessionState -> G.Connection ToxProgress |
68 | sessionStatus st = G.Connection | 90 | sessionStatus st = G.Connection |
69 | { G.connStatus = untag <$> readTVar (transient st) | 91 | { G.connStatus = combinedStatus =<< readTVar (sessionTasks st) |
70 | , G.connPolicy = readTVar (connPolicy st) | 92 | , G.connPolicy = readTVar (connPolicy st) |
71 | , G.connPingLogic = connPingLogic st | 93 | , G.connPingLogic = connPingLogic st |
72 | } | 94 | } |
73 | 95 | ||
96 | combinedStatus :: SessionTasks -> STM (G.Status ToxProgress) | ||
97 | combinedStatus tasks = do | ||
98 | a <- readTVar (taskState $ accepting tasks) | ||
99 | p <- readTVar (taskState $ persuing tasks) | ||
100 | r <- readTVar (taskState $ refreshing tasks) | ||
101 | return $ maximum [a,p,r] | ||
102 | |||
74 | lookupForPolicyChange :: TVar (Map.Map Key SessionState) | 103 | lookupForPolicyChange :: TVar (Map.Map Key SessionState) |
75 | -> Key -> Policy -> IO (Maybe SessionState) | 104 | -> Key -> Policy -> IO (Maybe SessionState) |
76 | lookupForPolicyChange conmap k policy = atomically $ do | 105 | lookupForPolicyChange conmap k policy = atomically $ do |
@@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of | |||
93 | mst <- lookupForPolicyChange conmap k policy | 122 | mst <- lookupForPolicyChange conmap k policy |
94 | forM_ mst $ \st -> do | 123 | forM_ mst $ \st -> do |
95 | let getPolicy = readTVar $ connPolicy st | 124 | let getPolicy = readTVar $ connPolicy st |
96 | --TODO accept_thread may already be started if policy was OpenToConnect | 125 | tasks <- atomically $ readTVar (sessionTasks st) |
97 | accept_thread <- forkIO $ acceptContact getPolicy _accept_methods | 126 | --TODO This check to determine whether to launch the accepting thread |
98 | persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods | 127 | --is probably racey. |
99 | freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods | 128 | astat <- threadStatus (taskThread $ accepting tasks) |
129 | accepting <- if astat /= ThreadRunning | ||
130 | then launch ("accept:"++show k) | ||
131 | (G.InProgress $ toEnum 0) | ||
132 | $ acceptContact getPolicy _accept_methods | ||
133 | else return $ accepting tasks | ||
134 | persuing <- launch ("persue:"++show k) | ||
135 | (G.InProgress $ toEnum 0) | ||
136 | $ persueContact getPolicy _get_status _persue_methods | ||
137 | refreshing <- launch ("refresh:"++show k) | ||
138 | (G.InProgress $ toEnum 0) | ||
139 | $ freshenContact getPolicy _get_status _freshen_methods | ||
140 | atomically $ writeTVar (sessionTasks st) | ||
141 | $ SessionTasks accepting persuing refreshing | ||
100 | return () | 142 | return () |
101 | return () | 143 | return () |
102 | RefusingToConnect -> do -- disconnect or cancel any pending connection | 144 | RefusingToConnect -> do -- disconnect or cancel any pending connection |
103 | mst <- lookupForPolicyChange conmap k policy | 145 | mst <- lookupForPolicyChange conmap k policy |
104 | -- Since the 3 connection threads poll the current policy, they should | 146 | -- Since the 3 connection threads poll the current policy, they should |
105 | -- all terminate on their own. | 147 | -- all terminate on their own. |
106 | return () | 148 | -- |
149 | -- Here we block until they finish. | ||
150 | forM_ mst $ \st -> do | ||
151 | atomically $ do | ||
152 | tasks <- readTVar (sessionTasks st) | ||
153 | a <- readTVar $ taskState (accepting tasks) | ||
154 | p <- readTVar $ taskState (persuing tasks) | ||
155 | r <- readTVar $ taskState (refreshing tasks) | ||
156 | case (a,p,r) of | ||
157 | (G.Dormant,G.Dormant,G.Dormant) -> return () | ||
158 | _ -> retry | ||
107 | OpenToConnect -> do -- passively accept connections if they initiate. | 159 | OpenToConnect -> do -- passively accept connections if they initiate. |
108 | mst <- lookupForPolicyChange conmap k policy | 160 | mst <- lookupForPolicyChange conmap k policy |
109 | forM_ mst $ \st -> do | 161 | forM_ mst $ \st -> do |
110 | let getPolicy = readTVar $ connPolicy st | 162 | let getPolicy = readTVar $ connPolicy st |
111 | accept_thread <- forkIO $ acceptContact getPolicy _accept_methods | 163 | accept_thread <- launch ("accept:"++show k) |
164 | (G.InProgress $ toEnum 0) | ||
165 | $ acceptContact getPolicy _accept_methods | ||
112 | return () | 166 | return () |
113 | 167 | ||
114 | 168 | ||