From 66ee00b2b74eea4258314a66b7599da7606a6539 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 12 Jun 2018 00:11:17 -0400 Subject: Started Tox connection management helper threads. --- Connection/Tox/Threads.hs | 150 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 Connection/Tox/Threads.hs (limited to 'Connection/Tox') diff --git a/Connection/Tox/Threads.hs b/Connection/Tox/Threads.hs new file mode 100644 index 00000000..8b19c7cf --- /dev/null +++ b/Connection/Tox/Threads.hs @@ -0,0 +1,150 @@ +-- | +-- +-- This module defines three tasks intended to be run in separate threads: +-- +-- * 'acceptContact' +-- +-- * 'persueContact' +-- +-- * 'freshenContact' +-- +{-# LANGUAGE LambdaCase #-} +module Connection.Tox.Threads where + +import Connection +import Connection.Tox +import Data.IP (IP) +import Network.Tox.Crypto.Transport +import Network.Tox.NodeId +-- import Network.Tox.DHT.Handlers (nodeSearch) +import Network.Socket +import Network.Kademlia.Search +import Network.Kademlia.Routing (BucketList) + +import Control.Concurrent.STM +import Control.Monad +import Data.Function +import Data.Functor.Identity +import System.Timeout + +type NodeSearch = Search NodeId (IP,PortNumber) () NodeInfo NodeInfo + +data AcceptContactMethods = AcceptContactMethods + { getHandshake :: STM (Handshake Identity) + , handshakeIsSuitable :: Handshake Identity -> STM Bool + , transitionToState :: Status ToxProgress -> STM () + } + +-- | Invokes an STM action on each incoming handshake. +-- +-- Does not return until getPolicy yields RefusingToConnect. +acceptContact :: STM Policy -> AcceptContactMethods -> IO () +acceptContact getPolicy AcceptContactMethods{..} = fix $ \loop -> do + join $ atomically $ do + orElse + (getPolicy >>= \case + RefusingToConnect -> return $ return () -- QUIT Dormant/Established + _ -> retry) + (do hs <- getHandshake + handshakeIsSuitable hs >>= \case + True -> do + -- Here we allocate a NetCrypto session for handling CryptoPacket. + transitionToState (InProgress AwaitingSessionPacket) + return loop + False -> return loop) + +whileTryingAndNotEstablished :: STM Policy -> STM (Status t) -> ((Int -> IO ()) -> STM (IO ())) -> IO () +whileTryingAndNotEstablished getPolicy getStatus body = fix $ \loop -> do + let retryWhileTrying k = getPolicy >>= \case + TryingToConnect -> retry + _ -> return k + ifEstablished t e = getStatus >>= \case + Established -> t + _ -> e + retryAfterTimeout interval = do + timeout interval $ atomically + $ orElse + (retryWhileTrying ()) + (ifEstablished (return ()) retry) + loop + join $ atomically $ orElse + (retryWhileTrying (return ())) -- QUIT Dormant/Established + (ifEstablished retry + (body retryAfterTimeout)) + +data PersueContactMethods params = PersueContactMethods + { getHandshakeParams :: STM params + , sendHandshake :: params -> IO () + , retryInterval :: Int + } + +-- | Continuously attempt to send handshake packets until a connection is +-- established. +-- +-- As long as getPolicy is TryingToConnect and there is no established +-- connection, this function will continue. +persueContact :: STM Policy -> STM (Status t) -> PersueContactMethods a -> IO () +persueContact getPolicy getStatus PersueContactMethods{..} + = whileTryingAndNotEstablished getPolicy getStatus + $ \retryAfterTimeout -> do + -- AwaitingDHTKey + -- AcquiringIPAddress + params <- getHandshakeParams + return $ do -- AcquiringCookie + -- AwaitingHandshake + -- AwaitingSessionPacket + sendHandshake params + retryAfterTimeout retryInterval + +data FreshenContactMethods = FreshenContactMethods + { dhtkeyInterval :: Int + , sockAddrInterval :: Int + , nodeSch :: NodeSearch + , getDHTKey :: STM (Maybe NodeId) + , getSockAddr :: STM (Maybe SockAddr) + , getBuckets :: STM (BucketList NodeInfo) + } + +-- send my dht key +-- search for their sockaddr +-- monitor new dht key +-- monitor new sockaddr +-- +-- Keep going while TryingToConnect +-- pause while Established + +-- Useful: +-- toxidSearch onionTimeout +-- newSearch +-- searchLoop +-- searchCancel +-- -> (r -> STM Bool) -- ^ Invoked on each result. Return False to quit searching. + +-- | Continuously search the DHT to obtain ip addresses and to send your dht +-- key to contacts. +-- +-- As long as getPolicy is TryingToConnect and there is no established +-- connection, this function will continue. +freshenContact :: STM Policy -> STM (Status t) -> FreshenContactMethods -> IO () +freshenContact getPolicy getStatus FreshenContactMethods{..} + = whileTryingAndNotEstablished getPolicy getStatus + -- retryAfterTimeout :: Int -> IO () + $ \retryAfterTimeout -> + getDHTKey >>= \case + Nothing -> -- AwaitingDHTKey + retry + Just dk -> getSockAddr >>= return . \case + Nothing -> -- AcquiringIPAddress + do bkts <- atomically $ getBuckets + st <- search nodeSch bkts dk $ + \r -> do -- TODO: store saddr, check for finish + return True + atomically $ searchIsFinished st >>= check + -- TODO: searchCancel on stop condition + retryAfterTimeout sockAddrInterval + Just a -> -- AcquiringCookie + -- AwaitingHandshake + -- AwaitingSessionPacket + do _todo_search_toxid_send_dhtkey -- 123 _todo_search_toxid_send_dhtkey :: IO a0 + retryAfterTimeout dhtkeyInterval + -- cgit v1.2.3