{-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} module Connection.Tox ( module Connection.Tox , ToxProgress(..) ) where import qualified Connection as G ;import Connection (Manager (..), Policy (..)) import Control.Concurrent.STM import Control.Monad -- import Data.Dependent.Sum import Data.Functor.Identity import qualified Data.Map as Map import Connection.Tox.Threads import Network.Tox.NodeId import Network.Tox.DHT.Handlers import Network.Tox.Crypto.Handlers import PingMachine import Text.Read #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import GHC.Conc (threadStatus,ThreadStatus(..)) data Parameters = Parameters { -- | Various Tox transports and clients. dhtRouting :: Routing -- | Thread to be forked when a connection is established. -- TODO: this function should accept relevant parameters. , onToxSession :: IO () } {- -- | A conneciton status that is tagged with a state type that is specific to -- the status. data Transient a where IsDormant :: Transient () IsAwaitingDHTKey :: Transient () IsAcquiringIPAddress :: Transient () IsAcquiringCookie :: Transient () IsAwaitingHandshake :: Transient () IsAwaitingSessionPacket :: Transient () IsEstablished :: Transient () untag :: DSum Transient Identity -> G.Status ToxProgress untag (IsDormant :=> _) = G.Dormant untag (IsAwaitingDHTKey :=> _) = G.InProgress AwaitingDHTKey untag (IsAcquiringIPAddress :=> _) = G.InProgress AcquiringIPAddress untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket untag (IsEstablished :=> _) = G.Established -} data StatefulTask st = StatefulTask { taskThread :: ThreadId , taskState :: TVar st } launch :: String -> st -> ((st -> STM ()) -> IO ()) -> IO (StatefulTask st) launch lbl st f = do stvar <- newTVarIO st tid <- forkIO (f $ writeTVar stvar) labelThread tid lbl return $ StatefulTask tid stvar data SessionTasks = SessionTasks { persuing :: StatefulTask (G.Status ToxProgress) , refreshing :: StatefulTask (G.Status ToxProgress) } data SessionState = SessionState { connPolicy :: TVar Policy , connPingLogic :: PingMachine , sessionTasks :: TVar SessionTasks -- , transient :: TVar (DSum Transient Identity) } sessionStatus :: SessionState -> G.Connection ToxProgress sessionStatus st = G.Connection { G.connStatus = combinedStatus =<< readTVar (sessionTasks st) , G.connPolicy = readTVar (connPolicy st) , G.connPingLogic = connPingLogic st } combinedStatus :: SessionTasks -> STM (G.Status ToxProgress) combinedStatus tasks = do p <- readTVar (taskState $ persuing tasks) r <- readTVar (taskState $ refreshing tasks) return $ maximum [p,r] lookupForPolicyChange :: TVar (Map.Map Key SessionState) -> Key -> Policy -> IO (Maybe SessionState) lookupForPolicyChange conmap k policy = atomically $ do cons <- readTVar conmap fmap join $ forM (Map.lookup k cons) $ \st -> do p <- readTVar (connPolicy st) writeTVar (connPolicy st) policy return $ do guard $ p /= policy return st callbackId :: Int callbackId = 1 -- | This function will fork threads as necessary. setToxPolicy :: Parameters -> TVar (Map.Map Key SessionState) -> Key -> Policy -> IO () setToxPolicy params conmap k policy = case policy of TryingToConnect -> do mst <- lookupForPolicyChange conmap k policy let accept_methods = AcceptContactMethods { getHandshake = retry -- :: STM (Handshake Identity) , handshakeIsSuitable = (\_ -> return False) -- :: Handshake Identity -> STM Bool , transitionToState = (\_ -> return ()) :: G.Status ToxProgress -> STM () } persue_methods = PersueContactMethods { getHandshakeParams = retry -- :: STM params , sendHandshake = \_ -> return () -- :: params -> IO () , retryInterval = _todo :: Int } freshen_methods = FreshenContactMethods { dhtkeyInterval = _todo :: Int , sockAddrInterval = _todo :: Int , nodeSch = _todo :: NodeSearch , getDHTKey = retry :: STM (Maybe NodeId) , getSockAddr = retry -- :: STM (Maybe SockAddr) , getBuckets = retry -- :: STM (BucketList NodeInfo) } forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st tasks <- atomically $ readTVar (sessionTasks st) persuing <- launch ("persue:"++show k) (G.InProgress $ toEnum 0) $ persueContact getPolicy _get_status persue_methods refreshing <- launch ("refresh:"++show k) (G.InProgress $ toEnum 0) $ freshenContact getPolicy _get_status freshen_methods atomically $ do writeTVar (sessionTasks st) $ SessionTasks persuing refreshing let routing = dhtRouting params Key _ nid = k registerNodeCallback routing $ NodeInfoCallback { interestingNodeId = nid , listenerId = callbackId , observedAddress = \ni -> return () -- TODO , rumoredAddress = \saddr ni -> return () -- TODO } return () RefusingToConnect -> do -- disconnect or cancel any pending connection mst <- lookupForPolicyChange conmap k policy -- Since the 3 connection threads poll the current policy, they should -- all terminate on their own. -- -- Here we block until they finish. forM_ mst $ \st -> do atomically $ do let routing = dhtRouting params Key _ nid = k unregisterNodeCallback callbackId routing nid atomically $ do tasks <- readTVar (sessionTasks st) p <- readTVar $ taskState (persuing tasks) r <- readTVar $ taskState (refreshing tasks) case (p,r) of (G.Dormant,G.Dormant) -> return () _ -> retry OpenToConnect -> do -- passively accept connections if they initiate. mst <- lookupForPolicyChange conmap k policy forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st accept_thread <- launch ("accept:"++show k) (G.InProgress $ toEnum 0) $ acceptContact getPolicy _accept_methods atomically $ do let routing = dhtRouting params Key _ nid = k registerNodeCallback routing $ NodeInfoCallback { interestingNodeId = nid , listenerId = callbackId , observedAddress = \ni -> return () -- TODO , rumoredAddress = \saddr ni -> return () -- TODO } stringToKey_ :: String -> Maybe Key stringToKey_ s = let (xs,ys) = break (==':') s in if null ys then Nothing else do me <- readMaybe xs them <- readMaybe (drop 1 ys) return $ Key me them toxManager :: Parameters -> IO (Manager ToxProgress Key) toxManager params = do conmap <- newTVarIO Map.empty return Manager { setPolicy = setToxPolicy params conmap -- k -> Policy -> IO () , connections = fmap sessionStatus <$> readTVar conmap -- STM (Map k (Connection status)) , stringToKey = stringToKey_ -- String -> Maybe k , showProgress = show -- status -> String , showKey = showKey_ -- k -> String }