diff options
Diffstat (limited to 'Connection')
-rw-r--r-- | Connection/Tox.hs | 74 | ||||
-rw-r--r-- | Connection/Tox/Threads.hs | 50 |
2 files changed, 99 insertions, 25 deletions
diff --git a/Connection/Tox.hs b/Connection/Tox.hs index c8ce9a53..f6f15f3c 100644 --- a/Connection/Tox.hs +++ b/Connection/Tox.hs | |||
@@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument | |||
19 | import Control.Concurrent.Lifted | 19 | import Control.Concurrent.Lifted |
20 | import GHC.Conc (labelThread) | 20 | import GHC.Conc (labelThread) |
21 | #endif | 21 | #endif |
22 | import GHC.Conc (threadStatus,ThreadStatus(..)) | ||
23 | |||
22 | 24 | ||
23 | 25 | ||
24 | 26 | ||
@@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-} | |||
35 | 37 | ||
36 | instance Show Key where show = show . showKey_ | 38 | instance Show Key where show = show . showKey_ |
37 | 39 | ||
38 | 40 | {- | |
39 | -- | A conneciton status that is tagged with a state type that is specific to | 41 | -- | A conneciton status that is tagged with a state type that is specific to |
40 | -- the status. | 42 | -- the status. |
41 | data Transient a where | 43 | data Transient a where |
@@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie | |||
56 | untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake | 58 | untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake |
57 | untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket | 59 | untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket |
58 | untag (IsEstablished :=> _) = G.Established | 60 | untag (IsEstablished :=> _) = G.Established |
61 | -} | ||
62 | |||
63 | data StatefulTask st = StatefulTask | ||
64 | { taskThread :: ThreadId | ||
65 | , taskState :: TVar st | ||
66 | } | ||
67 | |||
68 | launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st) | ||
69 | launch lbl st f = do | ||
70 | stvar <- newTVarIO st | ||
71 | tid <- forkIO (f $ writeTVar stvar) | ||
72 | labelThread tid lbl | ||
73 | return $ StatefulTask tid stvar | ||
59 | 74 | ||
60 | 75 | ||
76 | data SessionTasks = SessionTasks | ||
77 | { accepting :: StatefulTask (G.Status ToxProgress) | ||
78 | , persuing :: StatefulTask (G.Status ToxProgress) | ||
79 | , refreshing :: StatefulTask (G.Status ToxProgress) | ||
80 | } | ||
81 | |||
61 | data SessionState = SessionState | 82 | data SessionState = SessionState |
62 | { transient :: TVar (DSum Transient Identity) | 83 | { connPolicy :: TVar Policy |
63 | , connPolicy :: TVar Policy | ||
64 | , connPingLogic :: PingMachine | 84 | , connPingLogic :: PingMachine |
85 | , sessionTasks :: TVar SessionTasks | ||
86 | -- , transient :: TVar (DSum Transient Identity) | ||
65 | } | 87 | } |
66 | 88 | ||
67 | sessionStatus :: SessionState -> G.Connection ToxProgress | 89 | sessionStatus :: SessionState -> G.Connection ToxProgress |
68 | sessionStatus st = G.Connection | 90 | sessionStatus st = G.Connection |
69 | { G.connStatus = untag <$> readTVar (transient st) | 91 | { G.connStatus = combinedStatus =<< readTVar (sessionTasks st) |
70 | , G.connPolicy = readTVar (connPolicy st) | 92 | , G.connPolicy = readTVar (connPolicy st) |
71 | , G.connPingLogic = connPingLogic st | 93 | , G.connPingLogic = connPingLogic st |
72 | } | 94 | } |
73 | 95 | ||
96 | combinedStatus :: SessionTasks -> STM (G.Status ToxProgress) | ||
97 | combinedStatus tasks = do | ||
98 | a <- readTVar (taskState $ accepting tasks) | ||
99 | p <- readTVar (taskState $ persuing tasks) | ||
100 | r <- readTVar (taskState $ refreshing tasks) | ||
101 | return $ maximum [a,p,r] | ||
102 | |||
74 | lookupForPolicyChange :: TVar (Map.Map Key SessionState) | 103 | lookupForPolicyChange :: TVar (Map.Map Key SessionState) |
75 | -> Key -> Policy -> IO (Maybe SessionState) | 104 | -> Key -> Policy -> IO (Maybe SessionState) |
76 | lookupForPolicyChange conmap k policy = atomically $ do | 105 | lookupForPolicyChange conmap k policy = atomically $ do |
@@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of | |||
93 | mst <- lookupForPolicyChange conmap k policy | 122 | mst <- lookupForPolicyChange conmap k policy |
94 | forM_ mst $ \st -> do | 123 | forM_ mst $ \st -> do |
95 | let getPolicy = readTVar $ connPolicy st | 124 | let getPolicy = readTVar $ connPolicy st |
96 | --TODO accept_thread may already be started if policy was OpenToConnect | 125 | tasks <- atomically $ readTVar (sessionTasks st) |
97 | accept_thread <- forkIO $ acceptContact getPolicy _accept_methods | 126 | --TODO This check to determine whether to launch the accepting thread |
98 | persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods | 127 | --is probably racey. |
99 | freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods | 128 | astat <- threadStatus (taskThread $ accepting tasks) |
129 | accepting <- if astat /= ThreadRunning | ||
130 | then launch ("accept:"++show k) | ||
131 | (G.InProgress $ toEnum 0) | ||
132 | $ acceptContact getPolicy _accept_methods | ||
133 | else return $ accepting tasks | ||
134 | persuing <- launch ("persue:"++show k) | ||
135 | (G.InProgress $ toEnum 0) | ||
136 | $ persueContact getPolicy _get_status _persue_methods | ||
137 | refreshing <- launch ("refresh:"++show k) | ||
138 | (G.InProgress $ toEnum 0) | ||
139 | $ freshenContact getPolicy _get_status _freshen_methods | ||
140 | atomically $ writeTVar (sessionTasks st) | ||
141 | $ SessionTasks accepting persuing refreshing | ||
100 | return () | 142 | return () |
101 | return () | 143 | return () |
102 | RefusingToConnect -> do -- disconnect or cancel any pending connection | 144 | RefusingToConnect -> do -- disconnect or cancel any pending connection |
103 | mst <- lookupForPolicyChange conmap k policy | 145 | mst <- lookupForPolicyChange conmap k policy |
104 | -- Since the 3 connection threads poll the current policy, they should | 146 | -- Since the 3 connection threads poll the current policy, they should |
105 | -- all terminate on their own. | 147 | -- all terminate on their own. |
106 | return () | 148 | -- |
149 | -- Here we block until they finish. | ||
150 | forM_ mst $ \st -> do | ||
151 | atomically $ do | ||
152 | tasks <- readTVar (sessionTasks st) | ||
153 | a <- readTVar $ taskState (accepting tasks) | ||
154 | p <- readTVar $ taskState (persuing tasks) | ||
155 | r <- readTVar $ taskState (refreshing tasks) | ||
156 | case (a,p,r) of | ||
157 | (G.Dormant,G.Dormant,G.Dormant) -> return () | ||
158 | _ -> retry | ||
107 | OpenToConnect -> do -- passively accept connections if they initiate. | 159 | OpenToConnect -> do -- passively accept connections if they initiate. |
108 | mst <- lookupForPolicyChange conmap k policy | 160 | mst <- lookupForPolicyChange conmap k policy |
109 | forM_ mst $ \st -> do | 161 | forM_ mst $ \st -> do |
110 | let getPolicy = readTVar $ connPolicy st | 162 | let getPolicy = readTVar $ connPolicy st |
111 | accept_thread <- forkIO $ acceptContact getPolicy _accept_methods | 163 | accept_thread <- launch ("accept:"++show k) |
164 | (G.InProgress $ toEnum 0) | ||
165 | $ acceptContact getPolicy _accept_methods | ||
112 | return () | 166 | return () |
113 | 167 | ||
114 | 168 | ||
diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs index dcee37d1..2ff058b3 100644 --- a/Connection/Tox/Threads.hs +++ b/Connection/Tox/Threads.hs | |||
@@ -102,26 +102,33 @@ data AcceptContactMethods = AcceptContactMethods | |||
102 | -- | Invokes an STM action on each incoming handshake. | 102 | -- | Invokes an STM action on each incoming handshake. |
103 | -- | 103 | -- |
104 | -- Does not return until getPolicy yields RefusingToConnect. | 104 | -- Does not return until getPolicy yields RefusingToConnect. |
105 | acceptContact :: STM Policy -> AcceptContactMethods -> IO () | 105 | acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO () |
106 | acceptContact getPolicy AcceptContactMethods{..} = fix $ \loop -> do | 106 | acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do |
107 | join $ atomically $ do | 107 | join $ atomically $ do |
108 | orElse | 108 | orElse |
109 | (getPolicy >>= \case | 109 | (getPolicy >>= \case |
110 | RefusingToConnect -> return $ return () -- QUIT Dormant/Established | 110 | RefusingToConnect -> do writeState Dormant |
111 | return $ return () -- QUIT Dormant/Established | ||
111 | _ -> retry) | 112 | _ -> retry) |
112 | (do hs <- getHandshake | 113 | (do hs <- getHandshake |
113 | handshakeIsSuitable hs >>= \case | 114 | handshakeIsSuitable hs >>= \case |
114 | True -> do | 115 | True -> do |
115 | -- Here we allocate a NetCrypto session for handling CryptoPacket. | 116 | -- Here we allocate a NetCrypto session for handling CryptoPacket. |
117 | writeState (InProgress AwaitingSessionPacket) | ||
116 | transitionToState (InProgress AwaitingSessionPacket) | 118 | transitionToState (InProgress AwaitingSessionPacket) |
117 | return loop | 119 | return loop |
118 | False -> return loop) | 120 | False -> return loop) |
119 | 121 | ||
120 | whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> ((Int -> IO ()) -> STM (IO ())) -> IO () | 122 | whileTryingAndNotEstablished :: STM Policy |
121 | whileTryingAndNotEstablished getPolicy getStatus body = fix $ \loop -> do | 123 | -> STM (Status t) |
124 | -> (Status ToxProgress -> STM ()) | ||
125 | -> ((Int -> IO ()) -> STM (IO ())) | ||
126 | -> IO () | ||
127 | whileTryingAndNotEstablished getPolicy getStatus writeStatus body = fix $ \loop -> do | ||
122 | let retryWhileTrying k = getPolicy >>= \case | 128 | let retryWhileTrying k = getPolicy >>= \case |
123 | TryingToConnect -> retry | 129 | TryingToConnect -> retry |
124 | _ -> return k | 130 | _ -> do writeStatus Dormant |
131 | return k | ||
125 | ifEstablished t e = getStatus >>= \case | 132 | ifEstablished t e = getStatus >>= \case |
126 | Established -> t | 133 | Established -> t |
127 | _ -> e | 134 | _ -> e |
@@ -147,17 +154,23 @@ data PersueContactMethods params = PersueContactMethods | |||
147 | -- | 154 | -- |
148 | -- As long as getPolicy is TryingToConnect and there is no established | 155 | -- As long as getPolicy is TryingToConnect and there is no established |
149 | -- connection, this function will continue. | 156 | -- connection, this function will continue. |
150 | persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods a -> IO () | 157 | persueContact :: STM Policy |
151 | persueContact getPolicy getStatus PersueContactMethods{..} | 158 | -> STM (Status t) |
152 | = whileTryingAndNotEstablished getPolicy getStatus | 159 | -> PersueContactMethods a |
160 | -> (Status ToxProgress -> STM ()) | ||
161 | -> IO () | ||
162 | persueContact getPolicy getStatus PersueContactMethods{..} writeStatus | ||
163 | = whileTryingAndNotEstablished getPolicy getStatus writeStatus | ||
153 | $ \retryAfterTimeout -> do | 164 | $ \retryAfterTimeout -> do |
154 | -- AwaitingDHTKey | 165 | -- AwaitingDHTKey |
155 | -- AcquiringIPAddress | 166 | -- AcquiringIPAddress |
156 | params <- getHandshakeParams | 167 | params <- getHandshakeParams |
168 | writeStatus (InProgress AcquiringCookie) | ||
157 | return $ do -- AcquiringCookie | 169 | return $ do -- AcquiringCookie |
158 | -- AwaitingHandshake | 170 | -- AwaitingHandshake |
159 | -- AwaitingSessionPacket | 171 | -- AwaitingSessionPacket |
160 | sendHandshake params | 172 | sendHandshake params |
173 | atomically $ writeStatus $ InProgress AwaitingHandshake | ||
161 | retryAfterTimeout retryInterval | 174 | retryAfterTimeout retryInterval |
162 | 175 | ||
163 | data FreshenContactMethods = FreshenContactMethods | 176 | data FreshenContactMethods = FreshenContactMethods |
@@ -189,16 +202,20 @@ data FreshenContactMethods = FreshenContactMethods | |||
189 | -- | 202 | -- |
190 | -- As long as getPolicy is TryingToConnect and there is no established | 203 | -- As long as getPolicy is TryingToConnect and there is no established |
191 | -- connection, this function will continue. | 204 | -- connection, this function will continue. |
192 | freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> IO () | 205 | freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods |
193 | freshenContact getPolicy getStatus FreshenContactMethods{..} | 206 | -> (Status ToxProgress -> STM ()) |
194 | = whileTryingAndNotEstablished getPolicy getStatus | 207 | -> IO () |
208 | freshenContact getPolicy getStatus FreshenContactMethods{..} writeStatus | ||
209 | = whileTryingAndNotEstablished getPolicy getStatus writeStatus | ||
195 | -- retryAfterTimeout :: Int -> IO () | 210 | -- retryAfterTimeout :: Int -> IO () |
196 | $ \retryAfterTimeout -> | 211 | $ \retryAfterTimeout -> |
197 | getDHTKey >>= \case | 212 | getDHTKey >>= \case |
198 | Nothing -> -- AwaitingDHTKey | 213 | Nothing -> -- AwaitingDHTKey |
199 | retry | 214 | retry |
200 | Just dk -> getSockAddr >>= return . \case | 215 | Just dk -> getSockAddr >>= \case |
201 | Nothing -> -- AcquiringIPAddress | 216 | Nothing -> do -- AcquiringIPAddress |
217 | writeStatus (InProgress AcquiringIPAddress) | ||
218 | return $ | ||
202 | do bkts <- atomically $ getBuckets | 219 | do bkts <- atomically $ getBuckets |
203 | st <- search nodeSch bkts dk $ | 220 | st <- search nodeSch bkts dk $ |
204 | \r -> do -- TODO: store saddr, check for finish | 221 | \r -> do -- TODO: store saddr, check for finish |
@@ -206,7 +223,10 @@ freshenContact getPolicy getStatus FreshenContactMethods{..} | |||
206 | atomically $ searchIsFinished st >>= check | 223 | atomically $ searchIsFinished st >>= check |
207 | -- TODO: searchCancel on stop condition | 224 | -- TODO: searchCancel on stop condition |
208 | retryAfterTimeout sockAddrInterval | 225 | retryAfterTimeout sockAddrInterval |
209 | Just a -> -- AcquiringCookie | 226 | Just a -> do |
227 | writeStatus (InProgress AcquiringCookie) | ||
228 | return $ | ||
229 | -- AcquiringCookie | ||
210 | -- AwaitingHandshake | 230 | -- AwaitingHandshake |
211 | -- AwaitingSessionPacket | 231 | -- AwaitingSessionPacket |
212 | do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 | 232 | do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 |