summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2018-06-13 16:11:11 -0400
committerjoe <joe@jerkface.net>2018-06-13 16:43:54 -0400
commitbc8360faf51a058f3c1dd90145d4b87506e2ddfe (patch)
tree7dbf64254ae6690b124d11df9fd8fd5e28ef166e
parentfd3f604f8961d7c7db48015c9ed7be40ea872a7c (diff)
tox: Associate a state with each connection management thread.
-rw-r--r--Connection/Tox.hs74
-rw-r--r--Connection/Tox/Threads.hs50
2 files changed, 99 insertions, 25 deletions
diff --git a/Connection/Tox.hs b/Connection/Tox.hs
index c8ce9a53..f6f15f3c 100644
--- a/Connection/Tox.hs
+++ b/Connection/Tox.hs
@@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument
19import Control.Concurrent.Lifted 19import Control.Concurrent.Lifted
20import GHC.Conc (labelThread) 20import GHC.Conc (labelThread)
21#endif 21#endif
22import GHC.Conc (threadStatus,ThreadStatus(..))
23
22 24
23 25
24 26
@@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-}
35 37
36instance Show Key where show = show . showKey_ 38instance Show Key where show = show . showKey_
37 39
38 40{-
39-- | A conneciton status that is tagged with a state type that is specific to 41-- | A conneciton status that is tagged with a state type that is specific to
40-- the status. 42-- the status.
41data Transient a where 43data Transient a where
@@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie
56untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake 58untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake
57untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket 59untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket
58untag (IsEstablished :=> _) = G.Established 60untag (IsEstablished :=> _) = G.Established
61-}
62
63data StatefulTask st = StatefulTask
64 { taskThread :: ThreadId
65 , taskState :: TVar st
66 }
67
68launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st)
69launch lbl st f = do
70 stvar <- newTVarIO st
71 tid <- forkIO (f $ writeTVar stvar)
72 labelThread tid lbl
73 return $ StatefulTask tid stvar
59 74
60 75
76data SessionTasks = SessionTasks
77 { accepting :: StatefulTask (G.Status ToxProgress)
78 , persuing :: StatefulTask (G.Status ToxProgress)
79 , refreshing :: StatefulTask (G.Status ToxProgress)
80 }
81
61data SessionState = SessionState 82data SessionState = SessionState
62 { transient :: TVar (DSum Transient Identity) 83 { connPolicy :: TVar Policy
63 , connPolicy :: TVar Policy
64 , connPingLogic :: PingMachine 84 , connPingLogic :: PingMachine
85 , sessionTasks :: TVar SessionTasks
86 -- , transient :: TVar (DSum Transient Identity)
65 } 87 }
66 88
67sessionStatus :: SessionState -> G.Connection ToxProgress 89sessionStatus :: SessionState -> G.Connection ToxProgress
68sessionStatus st = G.Connection 90sessionStatus st = G.Connection
69 { G.connStatus = untag <$> readTVar (transient st) 91 { G.connStatus = combinedStatus =<< readTVar (sessionTasks st)
70 , G.connPolicy = readTVar (connPolicy st) 92 , G.connPolicy = readTVar (connPolicy st)
71 , G.connPingLogic = connPingLogic st 93 , G.connPingLogic = connPingLogic st
72 } 94 }
73 95
96combinedStatus :: SessionTasks -> STM (G.Status ToxProgress)
97combinedStatus tasks = do
98 a <- readTVar (taskState $ accepting tasks)
99 p <- readTVar (taskState $ persuing tasks)
100 r <- readTVar (taskState $ refreshing tasks)
101 return $ maximum [a,p,r]
102
74lookupForPolicyChange :: TVar (Map.Map Key SessionState) 103lookupForPolicyChange :: TVar (Map.Map Key SessionState)
75 -> Key -> Policy -> IO (Maybe SessionState) 104 -> Key -> Policy -> IO (Maybe SessionState)
76lookupForPolicyChange conmap k policy = atomically $ do 105lookupForPolicyChange conmap k policy = atomically $ do
@@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of
93 mst <- lookupForPolicyChange conmap k policy 122 mst <- lookupForPolicyChange conmap k policy
94 forM_ mst $ \st -> do 123 forM_ mst $ \st -> do
95 let getPolicy = readTVar $ connPolicy st 124 let getPolicy = readTVar $ connPolicy st
96 --TODO accept_thread may already be started if policy was OpenToConnect 125 tasks <- atomically $ readTVar (sessionTasks st)
97 accept_thread <- forkIO $ acceptContact getPolicy _accept_methods 126 --TODO This check to determine whether to launch the accepting thread
98 persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods 127 --is probably racey.
99 freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods 128 astat <- threadStatus (taskThread $ accepting tasks)
129 accepting <- if astat /= ThreadRunning
130 then launch ("accept:"++show k)
131 (G.InProgress $ toEnum 0)
132 $ acceptContact getPolicy _accept_methods
133 else return $ accepting tasks
134 persuing <- launch ("persue:"++show k)
135 (G.InProgress $ toEnum 0)
136 $ persueContact getPolicy _get_status _persue_methods
137 refreshing <- launch ("refresh:"++show k)
138 (G.InProgress $ toEnum 0)
139 $ freshenContact getPolicy _get_status _freshen_methods
140 atomically $ writeTVar (sessionTasks st)
141 $ SessionTasks accepting persuing refreshing
100 return () 142 return ()
101 return () 143 return ()
102 RefusingToConnect -> do -- disconnect or cancel any pending connection 144 RefusingToConnect -> do -- disconnect or cancel any pending connection
103 mst <- lookupForPolicyChange conmap k policy 145 mst <- lookupForPolicyChange conmap k policy
104 -- Since the 3 connection threads poll the current policy, they should 146 -- Since the 3 connection threads poll the current policy, they should
105 -- all terminate on their own. 147 -- all terminate on their own.
106 return () 148 --
149 -- Here we block until they finish.
150 forM_ mst $ \st -> do
151 atomically $ do
152 tasks <- readTVar (sessionTasks st)
153 a <- readTVar $ taskState (accepting tasks)
154 p <- readTVar $ taskState (persuing tasks)
155 r <- readTVar $ taskState (refreshing tasks)
156 case (a,p,r) of
157 (G.Dormant,G.Dormant,G.Dormant) -> return ()
158 _ -> retry
107 OpenToConnect -> do -- passively accept connections if they initiate. 159 OpenToConnect -> do -- passively accept connections if they initiate.
108 mst <- lookupForPolicyChange conmap k policy 160 mst <- lookupForPolicyChange conmap k policy
109 forM_ mst $ \st -> do 161 forM_ mst $ \st -> do
110 let getPolicy = readTVar $ connPolicy st 162 let getPolicy = readTVar $ connPolicy st
111 accept_thread <- forkIO $ acceptContact getPolicy _accept_methods 163 accept_thread <- launch ("accept:"++show k)
164 (G.InProgress $ toEnum 0)
165 $ acceptContact getPolicy _accept_methods
112 return () 166 return ()
113 167
114 168
diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs
index dcee37d1..2ff058b3 100644
--- a/Connection/Tox/Threads.hs
+++ b/Connection/Tox/Threads.hs
@@ -102,26 +102,33 @@ data AcceptContactMethods = AcceptContactMethods
102-- | Invokes an STM action on each incoming handshake. 102-- | Invokes an STM action on each incoming handshake.
103-- 103--
104-- Does not return until getPolicy yields RefusingToConnect. 104-- Does not return until getPolicy yields RefusingToConnect.
105acceptContact :: STM Policy -> AcceptContactMethods -> IO () 105acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO ()
106acceptContact getPolicy AcceptContactMethods{..} = fix $ \loop -> do 106acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do
107 join $ atomically $ do 107 join $ atomically $ do
108 orElse 108 orElse
109 (getPolicy >>= \case 109 (getPolicy >>= \case
110 RefusingToConnect -> return $ return () -- QUIT Dormant/Established 110 RefusingToConnect -> do writeState Dormant
111 return $ return () -- QUIT Dormant/Established
111 _ -> retry) 112 _ -> retry)
112 (do hs <- getHandshake 113 (do hs <- getHandshake
113 handshakeIsSuitable hs >>= \case 114 handshakeIsSuitable hs >>= \case
114 True -> do 115 True -> do
115 -- Here we allocate a NetCrypto session for handling CryptoPacket. 116 -- Here we allocate a NetCrypto session for handling CryptoPacket.
117 writeState (InProgress AwaitingSessionPacket)
116 transitionToState (InProgress AwaitingSessionPacket) 118 transitionToState (InProgress AwaitingSessionPacket)
117 return loop 119 return loop
118 False -> return loop) 120 False -> return loop)
119 121
120whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> ((Int -> IO ()) -> STM (IO ())) -> IO () 122whileTryingAndNotEstablished :: STM Policy
121whileTryingAndNotEstablished getPolicy getStatus body = fix $ \loop -> do 123 -> STM (Status t)
124 -> (Status ToxProgress -> STM ())
125 -> ((Int -> IO ()) -> STM (IO ()))
126 -> IO ()
127whileTryingAndNotEstablished getPolicy getStatus writeStatus body = fix $ \loop -> do
122 let retryWhileTrying k = getPolicy >>= \case 128 let retryWhileTrying k = getPolicy >>= \case
123 TryingToConnect -> retry 129 TryingToConnect -> retry
124 _ -> return k 130 _ -> do writeStatus Dormant
131 return k
125 ifEstablished t e = getStatus >>= \case 132 ifEstablished t e = getStatus >>= \case
126 Established -> t 133 Established -> t
127 _ -> e 134 _ -> e
@@ -147,17 +154,23 @@ data PersueContactMethods params = PersueContactMethods
147-- 154--
148-- As long as getPolicy is TryingToConnect and there is no established 155-- As long as getPolicy is TryingToConnect and there is no established
149-- connection, this function will continue. 156-- connection, this function will continue.
150persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods a -> IO () 157persueContact :: STM Policy
151persueContact getPolicy getStatus PersueContactMethods{..} 158 -> STM (Status t)
152 = whileTryingAndNotEstablished getPolicy getStatus 159 -> PersueContactMethods a
160 -> (Status ToxProgress -> STM ())
161 -> IO ()
162persueContact getPolicy getStatus PersueContactMethods{..} writeStatus
163 = whileTryingAndNotEstablished getPolicy getStatus writeStatus
153 $ \retryAfterTimeout -> do 164 $ \retryAfterTimeout -> do
154 -- AwaitingDHTKey 165 -- AwaitingDHTKey
155 -- AcquiringIPAddress 166 -- AcquiringIPAddress
156 params <- getHandshakeParams 167 params <- getHandshakeParams
168 writeStatus (InProgress AcquiringCookie)
157 return $ do -- AcquiringCookie 169 return $ do -- AcquiringCookie
158 -- AwaitingHandshake 170 -- AwaitingHandshake
159 -- AwaitingSessionPacket 171 -- AwaitingSessionPacket
160 sendHandshake params 172 sendHandshake params
173 atomically $ writeStatus $ InProgress AwaitingHandshake
161 retryAfterTimeout retryInterval 174 retryAfterTimeout retryInterval
162 175
163data FreshenContactMethods = FreshenContactMethods 176data FreshenContactMethods = FreshenContactMethods
@@ -189,16 +202,20 @@ data FreshenContactMethods = FreshenContactMethods
189-- 202--
190-- As long as getPolicy is TryingToConnect and there is no established 203-- As long as getPolicy is TryingToConnect and there is no established
191-- connection, this function will continue. 204-- connection, this function will continue.
192freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> IO () 205freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods
193freshenContact getPolicy getStatus FreshenContactMethods{..} 206 -> (Status ToxProgress -> STM ())
194 = whileTryingAndNotEstablished getPolicy getStatus 207 -> IO ()
208freshenContact getPolicy getStatus FreshenContactMethods{..} writeStatus
209 = whileTryingAndNotEstablished getPolicy getStatus writeStatus
195 -- retryAfterTimeout :: Int -> IO () 210 -- retryAfterTimeout :: Int -> IO ()
196 $ \retryAfterTimeout -> 211 $ \retryAfterTimeout ->
197 getDHTKey >>= \case 212 getDHTKey >>= \case
198 Nothing -> -- AwaitingDHTKey 213 Nothing -> -- AwaitingDHTKey
199 retry 214 retry
200 Just dk -> getSockAddr >>= return . \case 215 Just dk -> getSockAddr >>= \case
201 Nothing -> -- AcquiringIPAddress 216 Nothing -> do -- AcquiringIPAddress
217 writeStatus (InProgress AcquiringIPAddress)
218 return $
202 do bkts <- atomically $ getBuckets 219 do bkts <- atomically $ getBuckets
203 st <- search nodeSch bkts dk $ 220 st <- search nodeSch bkts dk $
204 \r -> do -- TODO: store saddr, check for finish 221 \r -> do -- TODO: store saddr, check for finish
@@ -206,7 +223,10 @@ freshenContact getPolicy getStatus FreshenContactMethods{..}
206 atomically $ searchIsFinished st >>= check 223 atomically $ searchIsFinished st >>= check
207 -- TODO: searchCancel on stop condition 224 -- TODO: searchCancel on stop condition
208 retryAfterTimeout sockAddrInterval 225 retryAfterTimeout sockAddrInterval
209 Just a -> -- AcquiringCookie 226 Just a -> do
227 writeStatus (InProgress AcquiringCookie)
228 return $
229 -- AcquiringCookie
210 -- AwaitingHandshake 230 -- AwaitingHandshake
211 -- AwaitingSessionPacket 231 -- AwaitingSessionPacket
212 do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 232 do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0