summaryrefslogtreecommitdiff
path: root/Connection/Tox.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2018-06-13 16:11:11 -0400
committerjoe <joe@jerkface.net>2018-06-13 16:43:54 -0400
commitbc8360faf51a058f3c1dd90145d4b87506e2ddfe (patch)
tree7dbf64254ae6690b124d11df9fd8fd5e28ef166e /Connection/Tox.hs
parentfd3f604f8961d7c7db48015c9ed7be40ea872a7c (diff)
tox: Associate a state with each connection management thread.
Diffstat (limited to 'Connection/Tox.hs')
-rw-r--r--Connection/Tox.hs74
1 files changed, 64 insertions, 10 deletions
diff --git a/Connection/Tox.hs b/Connection/Tox.hs
index c8ce9a53..f6f15f3c 100644
--- a/Connection/Tox.hs
+++ b/Connection/Tox.hs
@@ -19,6 +19,8 @@ import Control.Concurrent.Lifted.Instrument
19import Control.Concurrent.Lifted 19import Control.Concurrent.Lifted
20import GHC.Conc (labelThread) 20import GHC.Conc (labelThread)
21#endif 21#endif
22import GHC.Conc (threadStatus,ThreadStatus(..))
23
22 24
23 25
24 26
@@ -35,7 +37,7 @@ data Key = Key NodeId{-me-} NodeId{-them-}
35 37
36instance Show Key where show = show . showKey_ 38instance Show Key where show = show . showKey_
37 39
38 40{-
39-- | A conneciton status that is tagged with a state type that is specific to 41-- | A conneciton status that is tagged with a state type that is specific to
40-- the status. 42-- the status.
41data Transient a where 43data Transient a where
@@ -56,21 +58,48 @@ untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie
56untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake 58untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake
57untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket 59untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket
58untag (IsEstablished :=> _) = G.Established 60untag (IsEstablished :=> _) = G.Established
61-}
62
63data StatefulTask st = StatefulTask
64 { taskThread :: ThreadId
65 , taskState :: TVar st
66 }
67
68launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st)
69launch lbl st f = do
70 stvar <- newTVarIO st
71 tid <- forkIO (f $ writeTVar stvar)
72 labelThread tid lbl
73 return $ StatefulTask tid stvar
59 74
60 75
76data SessionTasks = SessionTasks
77 { accepting :: StatefulTask (G.Status ToxProgress)
78 , persuing :: StatefulTask (G.Status ToxProgress)
79 , refreshing :: StatefulTask (G.Status ToxProgress)
80 }
81
61data SessionState = SessionState 82data SessionState = SessionState
62 { transient :: TVar (DSum Transient Identity) 83 { connPolicy :: TVar Policy
63 , connPolicy :: TVar Policy
64 , connPingLogic :: PingMachine 84 , connPingLogic :: PingMachine
85 , sessionTasks :: TVar SessionTasks
86 -- , transient :: TVar (DSum Transient Identity)
65 } 87 }
66 88
67sessionStatus :: SessionState -> G.Connection ToxProgress 89sessionStatus :: SessionState -> G.Connection ToxProgress
68sessionStatus st = G.Connection 90sessionStatus st = G.Connection
69 { G.connStatus = untag <$> readTVar (transient st) 91 { G.connStatus = combinedStatus =<< readTVar (sessionTasks st)
70 , G.connPolicy = readTVar (connPolicy st) 92 , G.connPolicy = readTVar (connPolicy st)
71 , G.connPingLogic = connPingLogic st 93 , G.connPingLogic = connPingLogic st
72 } 94 }
73 95
96combinedStatus :: SessionTasks -> STM (G.Status ToxProgress)
97combinedStatus tasks = do
98 a <- readTVar (taskState $ accepting tasks)
99 p <- readTVar (taskState $ persuing tasks)
100 r <- readTVar (taskState $ refreshing tasks)
101 return $ maximum [a,p,r]
102
74lookupForPolicyChange :: TVar (Map.Map Key SessionState) 103lookupForPolicyChange :: TVar (Map.Map Key SessionState)
75 -> Key -> Policy -> IO (Maybe SessionState) 104 -> Key -> Policy -> IO (Maybe SessionState)
76lookupForPolicyChange conmap k policy = atomically $ do 105lookupForPolicyChange conmap k policy = atomically $ do
@@ -93,22 +122,47 @@ setToxPolicy params conmap k policy = case policy of
93 mst <- lookupForPolicyChange conmap k policy 122 mst <- lookupForPolicyChange conmap k policy
94 forM_ mst $ \st -> do 123 forM_ mst $ \st -> do
95 let getPolicy = readTVar $ connPolicy st 124 let getPolicy = readTVar $ connPolicy st
96 --TODO accept_thread may already be started if policy was OpenToConnect 125 tasks <- atomically $ readTVar (sessionTasks st)
97 accept_thread <- forkIO $ acceptContact getPolicy _accept_methods 126 --TODO This check to determine whether to launch the accepting thread
98 persue_thread <- forkIO $ persueContact getPolicy _get_status _persue_methods 127 --is probably racey.
99 freshen_thread <- forkIO $ freshenContact getPolicy _get_status _freshen_methods 128 astat <- threadStatus (taskThread $ accepting tasks)
129 accepting <- if astat /= ThreadRunning
130 then launch ("accept:"++show k)
131 (G.InProgress $ toEnum 0)
132 $ acceptContact getPolicy _accept_methods
133 else return $ accepting tasks
134 persuing <- launch ("persue:"++show k)
135 (G.InProgress $ toEnum 0)
136 $ persueContact getPolicy _get_status _persue_methods
137 refreshing <- launch ("refresh:"++show k)
138 (G.InProgress $ toEnum 0)
139 $ freshenContact getPolicy _get_status _freshen_methods
140 atomically $ writeTVar (sessionTasks st)
141 $ SessionTasks accepting persuing refreshing
100 return () 142 return ()
101 return () 143 return ()
102 RefusingToConnect -> do -- disconnect or cancel any pending connection 144 RefusingToConnect -> do -- disconnect or cancel any pending connection
103 mst <- lookupForPolicyChange conmap k policy 145 mst <- lookupForPolicyChange conmap k policy
104 -- Since the 3 connection threads poll the current policy, they should 146 -- Since the 3 connection threads poll the current policy, they should
105 -- all terminate on their own. 147 -- all terminate on their own.
106 return () 148 --
149 -- Here we block until they finish.
150 forM_ mst $ \st -> do
151 atomically $ do
152 tasks <- readTVar (sessionTasks st)
153 a <- readTVar $ taskState (accepting tasks)
154 p <- readTVar $ taskState (persuing tasks)
155 r <- readTVar $ taskState (refreshing tasks)
156 case (a,p,r) of
157 (G.Dormant,G.Dormant,G.Dormant) -> return ()
158 _ -> retry
107 OpenToConnect -> do -- passively accept connections if they initiate. 159 OpenToConnect -> do -- passively accept connections if they initiate.
108 mst <- lookupForPolicyChange conmap k policy 160 mst <- lookupForPolicyChange conmap k policy
109 forM_ mst $ \st -> do 161 forM_ mst $ \st -> do
110 let getPolicy = readTVar $ connPolicy st 162 let getPolicy = readTVar $ connPolicy st
111 accept_thread <- forkIO $ acceptContact getPolicy _accept_methods 163 accept_thread <- launch ("accept:"++show k)
164 (G.InProgress $ toEnum 0)
165 $ acceptContact getPolicy _accept_methods
112 return () 166 return ()
113 167
114 168