From d8ac778e803ac6de7c01ab4c8af767647ebc2d07 Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 13 Jun 2018 20:50:00 -0400 Subject: tox: mechanism to register node-info callbacks. --- Connection/Tox.hs | 36 ++++++++++++++++++++++++++++++------ src/Network/Tox/Crypto/Handlers.hs | 2 +- src/Network/Tox/DHT/Handlers.hs | 22 ++++++++++++++++++++++ 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/Connection/Tox.hs b/Connection/Tox.hs index 41361d8f..de2bb879 100644 --- a/Connection/Tox.hs +++ b/Connection/Tox.hs @@ -13,7 +13,9 @@ import Data.Dependent.Sum import Data.Functor.Identity import qualified Data.Map as Map import Connection.Tox.Threads +import Network.Tox import Network.Tox.NodeId +import Network.Tox.DHT.Handlers import PingMachine import Text.Read #ifdef THREAD_DEBUG @@ -29,10 +31,10 @@ import GHC.Conc (threadStatus,ThreadStatus(..)) data Parameters = Parameters { -- | Various Tox transports and clients. - -- toxTransports :: Tox + toxTransports :: Tox -- | Thread to be forked when a connection is established. -- TODO: this function should accept relevant parameters. - onToxSession :: IO () + , onToxSession :: IO () } data Key = Key NodeId{-me-} NodeId{-them-} @@ -114,6 +116,9 @@ lookupForPolicyChange conmap k policy = atomically $ do guard $ p /= policy return st +callbackId :: Int +callbackId = 1 + -- | This function will fork threads as necessary. setToxPolicy :: Parameters -> TVar (Map.Map Key SessionState) @@ -140,10 +145,17 @@ setToxPolicy params conmap k policy = case policy of refreshing <- launch ("refresh:"++show k) (G.InProgress $ toEnum 0) $ freshenContact getPolicy _get_status _freshen_methods - atomically $ writeTVar (sessionTasks st) - $ SessionTasks accepting persuing refreshing + atomically $ do + writeTVar (sessionTasks st) $ SessionTasks accepting persuing refreshing + let routing = toxRouting $ toxTransports params + Key _ nid = k + registerNodeCallback routing $ NodeInfoCallback + { interestingNodeId = nid + , listenerId = callbackId + , observedAddress = \ni -> return () -- TODO + , rumoredAddress = \saddr ni -> return () -- TODO + } return () - return () RefusingToConnect -> do -- disconnect or cancel any pending connection mst <- lookupForPolicyChange conmap k policy -- Since the 3 connection threads poll the current policy, they should @@ -151,6 +163,10 @@ setToxPolicy params conmap k policy = case policy of -- -- Here we block until they finish. forM_ mst $ \st -> do + atomically $ do + let routing = toxRouting $ toxTransports params + Key _ nid = k + unregisterNodeCallback callbackId routing nid atomically $ do tasks <- readTVar (sessionTasks st) a <- readTVar $ taskState (accepting tasks) @@ -166,7 +182,15 @@ setToxPolicy params conmap k policy = case policy of accept_thread <- launch ("accept:"++show k) (G.InProgress $ toEnum 0) $ acceptContact getPolicy _accept_methods - return () + atomically $ do + let routing = toxRouting $ toxTransports params + Key _ nid = k + registerNodeCallback routing $ NodeInfoCallback + { interestingNodeId = nid + , listenerId = callbackId + , observedAddress = \ni -> return () -- TODO + , rumoredAddress = \saddr ni -> return () -- TODO + } showKey_ :: Key -> String diff --git a/src/Network/Tox/Crypto/Handlers.hs b/src/Network/Tox/Crypto/Handlers.hs index 2fc12559..20560a48 100644 --- a/src/Network/Tox/Crypto/Handlers.hs +++ b/src/Network/Tox/Crypto/Handlers.hs @@ -56,7 +56,7 @@ import Debug.Trace import Text.Printf import Data.Bool import Connection (Status(..)) -import Connection.Tox (ToxProgress(..)) +import Connection.Tox.Threads (ToxProgress(..)) -- * These types are isomorphic to Maybe, but have the advantage of documenting diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index 25244a9b..091374f5 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs @@ -130,6 +130,28 @@ data Routing = Routing , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) } +registerNodeCallback :: Routing -> NodeInfoCallback -> STM () +registerNodeCallback Routing{nodesOfInterest} cb = do + cbm <- readTVar nodesOfInterest + let ns = fromMaybe [] $ HashMap.lookup (interestingNodeId cb) cbm + bs = filter nonMatching ns + where nonMatching n = (listenerId n /= listenerId cb) + writeTVar nodesOfInterest $ HashMap.insert (interestingNodeId cb) + (cb : bs) + cbm + +unregisterNodeCallback :: Int -> Routing -> NodeId -> STM () +unregisterNodeCallback callbackId Routing{nodesOfInterest} nid = do + cbm <- readTVar nodesOfInterest + let ns = fromMaybe [] $ HashMap.lookup nid cbm + bs = filter nonMatching ns + where nonMatching n = (listenerId n /= callbackId) + writeTVar nodesOfInterest + $ if null bs + then HashMap.delete nid cbm + else HashMap.insert nid bs cbm + + sched4 :: Routing -> TVar (Int.PSQ POSIXTime) sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue -- cgit v1.2.3