{-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE NamedFieldPuns #-} module Connection.Tox ( module Connection.Tox , ToxProgress(..) ) where import qualified Connection as G ;import Connection (Manager (..), Policy (..)) import Connection.Tox.Threads import Control.Concurrent.STM import Control.Monad import Crypto.Tox import qualified Data.HashMap.Strict as HashMap import qualified Data.Map as Map import Data.Maybe import Network.Kademlia.Routing as R import Network.Kademlia.Search import Network.Tox.ContactInfo import Network.Tox.Crypto.Handlers import Network.Tox.DHT.Handlers as DHT import Network.Tox.DHT.Transport as DHT import PingMachine import Text.Read #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif import GHC.Conc (ThreadStatus (..), threadStatus) import System.IO data Parameters = Parameters { -- | Various Tox transports and clients. dhtRouting :: Routing , roster :: ContactInfo , sessions :: NetCryptoSessions , dhtClient :: DHT.Client -- | Thread to be forked when a connection is established. -- TODO: this function should accept relevant parameters. , onToxSession :: IO () } {- -- | A conneciton status that is tagged with a state type that is specific to -- the status. data Transient a where IsDormant :: Transient () IsAwaitingDHTKey :: Transient () IsAcquiringIPAddress :: Transient () IsAcquiringCookie :: Transient () IsAwaitingHandshake :: Transient () IsAwaitingSessionPacket :: Transient () IsEstablished :: Transient () untag :: DSum Transient Identity -> G.Status ToxProgress untag (IsDormant :=> _) = G.Dormant untag (IsAwaitingDHTKey :=> _) = G.InProgress AwaitingDHTKey untag (IsAcquiringIPAddress :=> _) = G.InProgress AcquiringIPAddress untag (IsAcquiringCookie :=> _) = G.InProgress AcquiringCookie untag (IsAwaitingHandshake :=> _) = G.InProgress AwaitingHandshake untag (IsAwaitingSessionPacket :=> _) = G.InProgress AwaitingSessionPacket untag (IsEstablished :=> _) = G.Established -} data StatefulTask st = StatefulTask { taskThread :: ThreadId , taskState :: TVar st } launch :: String -> st -> (TVar st -> IO ()) -> IO (StatefulTask st) launch lbl st f = do stvar <- newTVarIO st tid <- forkIO (f stvar) labelThread tid lbl stat <- threadStatus tid hPutStrLn stderr $ "launch "++lbl++" "++show stat return $ StatefulTask tid stvar data SessionState = SessionState { connPolicy :: TVar Policy , connPingLogic :: PingMachine , handshakeTask :: TVar (StatefulTask (G.Status ToxProgress)) } newSessionState :: IO SessionState newSessionState = do pings <- forkPingMachine "SessionState" 25000 -- 25 ms send ping 30000 -- 30 ms timed out a <- fork $ return () atomically $ do av <- newTVar G.Dormant let tasks = StatefulTask a av SessionState <$> newTVar G.RefusingToConnect <*> pure pings <*> newTVar tasks sessionStatus :: SessionState -> G.Connection ToxProgress sessionStatus st = G.Connection { G.connStatus = readTVar . taskState =<< readTVar (handshakeTask st) , G.connPolicy = readTVar (connPolicy st) , G.connPingLogic = connPingLogic st } lookupForPolicyChange :: TVar (Map.Map Key SessionState) -> Key -> Policy -> IO (Maybe SessionState) lookupForPolicyChange conmap k policy = do cons <- atomically $ readTVar conmap st <- case Map.lookup k cons of Nothing -> newSessionState Just st -> return st atomically $ do p <- readTVar (connPolicy st) writeTVar (connPolicy st) policy return $ do guard $ p /= policy return st callbackId :: Int callbackId = 1 lookupContact :: Key -> ContactInfo -> STM (Maybe (SecretKey,Contact)) lookupContact (Key me them) ContactInfo{accounts} = do acnts <- readTVar accounts fmap join $ forM (HashMap.lookup me acnts) $ \Account{userSecret,contacts} -> do cs <- readTVar contacts forM (HashMap.lookup them cs) $ \c -> do return (userSecret,c) -- | This function will fork threads as necessary. setToxPolicy :: Parameters -> TVar (Map.Map Key SessionState) -> Key -> Policy -> IO () setToxPolicy params conmap k@(Key me them) policy = do hPutStrLn stderr $ "C.setToxPolicy "++show (them,policy) case policy of TryingToConnect -> do mst <- lookupForPolicyChange conmap k policy r <- atomically $ lookupContact k (roster params) hPutStrLn stderr $ "C.r="++show (fmap (const ()) r) forM_ r $ \(sec,c) -> do let persue_methods = PersueContactMethods { allsessions = sessions params , myseckey = sec , theirpubkey = id2key them , client = dhtClient params , shortRetryInterval = _todo , longRetryInterval = _todo , contact = c } sch = nodeSearch (dhtClient params) (nodesOfInterest $ dhtRouting params) freshen_methods = FreshenContactMethods { dhtkeyInterval = _todo :: Int , sockAddrInterval = _todo :: Int , nodeSch = sch , getDHTKey = retry :: STM (Maybe NodeId) , getSockAddr = retry -- :: STM (Maybe SockAddr) , nearestNodes = \nid -> do bkts4 <- readTVar $ routing4 $ dhtRouting params bkts6 <- readTVar $ routing6 $ dhtRouting params let interweave [] ys = ys interweave (x:xs) ys = x : interweave ys xs return $ interweave (R.kclosest (searchSpace sch) searchK nid bkts4) (R.kclosest (searchSpace sch) searchK nid bkts6) } get_status = do sbk <- readTVar $ netCryptoSessionsByKey (sessions params) fmap (fromMaybe G.Dormant) $ forM (Map.lookup (id2key them) sbk) $ \ss -> do stats <- mapM (readTVar . ncState) ss return $ maximum stats hPutStrLn stderr $ "C.mst="++show (fmap (const ()) mst) forM_ mst $ \st -> do let getPolicy = readTVar $ connPolicy st tasks <- atomically $ readTVar (handshakeTask st) persuing <- launch ("persue:"++show k) (G.InProgress $ toEnum 0) $ persueContact getPolicy get_status persue_methods atomically $ do writeTVar (handshakeTask st) $ persuing let routing = dhtRouting params Key _ nid = k registerNodeCallback routing $ NodeInfoCallback { interestingNodeId = nid , listenerId = callbackId , observedAddress = \ni -> writeTVar (contactLastSeenAddr c) (Just $ nodeAddr ni) , rumoredAddress = \saddr ni -> do m <- readTVar (contactLastSeenAddr c) -- TODO remember information source and handle multiple rumors. case m of Just _ -> return () Nothing -> writeTVar (contactLastSeenAddr c) (Just $ nodeAddr ni) } return () RefusingToConnect -> do -- disconnect or cancel any pending connection mst <- lookupForPolicyChange conmap k policy -- Since the connection threads poll the current policy, they should -- all terminate on their own. -- -- Here we block until they finish. forM_ mst $ \st -> do atomically $ do let routing = dhtRouting params Key _ nid = k unregisterNodeCallback callbackId routing nid atomically $ do tasks <- readTVar (handshakeTask st) p <- readTVar $ taskState tasks case p of G.Dormant -> return () _ -> retry OpenToConnect -> do -- passively accept connections if they initiate. mst <- lookupForPolicyChange conmap k policy r <- atomically $ lookupContact k (roster params) forM_ r $ \(sec,c) -> do forM_ mst $ \st -> do {- let getPolicy = readTVar $ connPolicy st accept_thread <- launch ("accept:"++show k) (G.InProgress $ toEnum 0) $ acceptContact getPolicy _accept_methods -} atomically $ do let routing = dhtRouting params Key _ nid = k registerNodeCallback routing $ NodeInfoCallback { interestingNodeId = nid , listenerId = callbackId , observedAddress = \ni -> writeTVar (contactLastSeenAddr c) (Just $ nodeAddr ni) , rumoredAddress = \saddr ni -> do m <- readTVar (contactLastSeenAddr c) -- TODO remember information source and handle multiple rumors. case m of Just _ -> return () Nothing -> writeTVar (contactLastSeenAddr c) (Just $ nodeAddr ni) } stringToKey_ :: String -> Maybe Key stringToKey_ s = let (xs,ys) = break (==':') s in if null ys then Nothing else do me <- readMaybe xs them <- readMaybe (drop 1 ys) return $ Key me them toxManager :: Parameters -> IO (Manager ToxProgress Key) toxManager params = do conmap <- newTVarIO Map.empty return Manager { setPolicy = setToxPolicy params conmap -- k -> Policy -> IO () , connections = fmap sessionStatus <$> readTVar conmap -- STM (Map k (Connection status)) , stringToKey = stringToKey_ -- String -> Maybe k , showProgress = show -- status -> String , showKey = showKey_ -- k -> String }