-- | -- -- This module defines three tasks intended to be run in separate threads: -- -- * 'acceptContact' -- -- * 'persueContact' -- -- * 'freshenContact' -- {-# LANGUAGE LambdaCase #-} module Connection.Tox.Threads where import Connection -- import Connection.Tox import Crypto.Tox import Data.IP (IP) import Network.Tox.Crypto.Transport import Network.Tox.Crypto.Handlers import Network.Tox.NodeId import Network.Tox.ContactInfo import Network.Tox.DHT.Handlers {- (nodeSearch) -} as DHT import Network.Tox.DHT.Transport as DHT (dhtpk) import Network.Socket import Network.Kademlia.Search import Network.Kademlia.Routing (BucketList) import Control.Concurrent.STM import Control.Monad import Data.Function import Data.Functor.Identity import Data.Time.Clock.POSIX import System.IO import System.Timeout type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo data AcceptContactMethods = AcceptContactMethods { getHandshake :: STM (Handshake Identity) , handshakeIsSuitable :: Handshake Identity -> STM Bool , transitionToState :: Status ToxProgress -> STM () } -- | Invokes an STM action on each incoming handshake. -- -- Does not return until getPolicy yields RefusingToConnect. acceptContact :: STM Policy -> AcceptContactMethods -> (Status ToxProgress -> STM ()) -> IO () acceptContact getPolicy AcceptContactMethods{..} writeState = fix $ \loop -> do join $ atomically $ do orElse (getPolicy >>= \case RefusingToConnect -> do writeState Dormant return $ return () -- QUIT Dormant/Established _ -> retry) (do hs <- getHandshake handshakeIsSuitable hs >>= \case True -> do -- Here we allocate a NetCrypto session for handling CryptoPacket. writeState (InProgress AwaitingSessionPacket) transitionToState (InProgress AwaitingSessionPacket) return loop False -> return loop) whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> TVar (Status ToxProgress) -> ((Int -> IO ()) -> STM (IO ())) -> IO () whileTryingAndNotEstablished getPolicy getStatus statusVar body = fix $ \loop -> do let retryWhileTrying k = getPolicy >>= \case TryingToConnect -> retry _ -> do writeTVar statusVar Dormant return k ifEstablished t e = getStatus >>= \case Established -> t _ -> e retryAfterTimeout interval = do timeout interval $ atomically $ orElse (retryWhileTrying ()) (ifEstablished (return ()) retry) loop join $ atomically $ orElse (retryWhileTrying (return ())) -- QUIT Dormant/Established (ifEstablished retry (body retryAfterTimeout)) data PersueContactMethods = PersueContactMethods { allsessions :: NetCryptoSessions , myseckey :: SecretKey , theirpubkey :: PublicKey , client :: DHT.Client , shortRetryInterval :: Int -- successful cookie, try again soon. , longRetryInterval :: Int -- no cookie, he's offline, give it some time. , contact :: Contact } retryUntilJust :: TVar (Maybe a) -> STM a retryUntilJust tvar = maybe retry return =<< readTVar tvar -- | Continuously attempt to send handshake packets until a connection is -- established. -- -- As long as getPolicy is TryingToConnect and there is no established -- connection, this function will continue. persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods -> TVar (Status ToxProgress) -> IO () persueContact getPolicy getStatus PersueContactMethods{..} statusVar = do -- AwaitingDHTKey atomically $ writeTVar statusVar (InProgress AwaitingDHTKey) whileTryingAndNotEstablished getPolicy getStatus statusVar $ \retryAfterTimeout -> orElse (do readTVar statusVar >>= check . (/= InProgress AcquiringIPAddress) theirDhtKey <- DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact) -- We don't have an IP address yet. maybe (return ()) (const retry) =<< readTVar (contactLastSeenAddr contact) return $ do -- AcquiringIPAddress atomically $ writeTVar statusVar (InProgress AcquiringIPAddress) retryAfterTimeout 0) (do theirDhtKey <- DHT.dhtpk <$> retryUntilJust (contactKeyPacket contact) saddr <- retryUntilJust (contactLastSeenAddr contact) ni <- either (const retry) return $ nodeInfo (key2id theirDhtKey) saddr return $ do -- AcquiringCookie atomically $ writeTVar statusVar (InProgress AcquiringCookie) let mykeyAsId = key2id (toPublic myseckey) theirkeyAsId = key2id theirpubkey crypto = transportCrypto allsessions mbCookie <- -- TODO: Check for recent cached cookie. DHT.cookieRequest crypto client (toPublic myseckey) ni interval <- case mbCookie of Nothing -> do hPutStrLn stderr ("persueContact: (" ++ show mykeyAsId ++") <--> (" ++ show theirkeyAsId ++ ").") hPutStrLn stderr ("persueContact: CookieRequest failed. TODO: dhtpkNodes thingy") return longRetryInterval Just cookie -> do hPutStrLn stderr "Have cookie, creating handshake packet..." let hp = HParam { hpOtherCookie = cookie , hpMySecretKey = myseckey , hpCookieRemotePubkey = theirpubkey , hpCookieRemoteDhtkey = theirDhtKey , hpTheirBaseNonce = Nothing , hpTheirSessionKeyPublic = Nothing } newsession <- generateSecretKey timestamp <- getPOSIXTime (myhandshake,ioAction) <- atomically $ freshCryptoSession allsessions saddr newsession timestamp hp ioAction -- send handshake forM myhandshake $ \response_handshake -> do sendHandshake allsessions saddr response_handshake atomically $ writeTVar statusVar $ InProgress AwaitingHandshake return shortRetryInterval -- AwaitingHandshake -- AwaitingSessionPacket retryAfterTimeout interval) data FreshenContactMethods = FreshenContactMethods { dhtkeyInterval :: Int , sockAddrInterval :: Int , nodeSch :: NodeSearch , getDHTKey :: STM (Maybe NodeId) , getSockAddr :: STM (Maybe SockAddr) , getBuckets :: STM (BucketList NodeInfo) } -- send my dht key -- search for their sockaddr -- monitor new dht key -- monitor new sockaddr -- -- Keep going while TryingToConnect -- pause while Established -- Useful: -- toxidSearch onionTimeout -- newSearch -- searchLoop -- searchCancel -- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. -- | Continuously search the DHT to obtain ip addresses and to send your dht -- key to contacts. -- -- As long as getPolicy is TryingToConnect and there is no established -- connection, this function will continue. freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> TVar (Status ToxProgress) -> IO () freshenContact getPolicy getStatus FreshenContactMethods{..} statusVar = whileTryingAndNotEstablished getPolicy getStatus statusVar -- retryAfterTimeout :: Int -> IO () $ \retryAfterTimeout -> getDHTKey >>= \case Nothing -> -- AwaitingDHTKey retry Just dk -> getSockAddr >>= \case Nothing -> do -- AcquiringIPAddress writeTVar statusVar (InProgress AcquiringIPAddress) return $ do bkts <- atomically $ getBuckets st <- search nodeSch bkts dk $ \r -> do -- TODO: store saddr, check for finish return True atomically $ searchIsFinished st >>= check -- TODO: searchCancel on stop condition retryAfterTimeout sockAddrInterval Just a -> do writeTVar statusVar (InProgress AcquiringCookie) return $ -- AcquiringCookie -- AwaitingHandshake -- AwaitingSessionPacket do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 retryAfterTimeout dhtkeyInterval