From dbce015d0137152f74f46dea3b00d2b51e7c53f7 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 7 Nov 2017 18:51:05 -0500 Subject: Moved BucketRefresher construction responsibility for greater encapsulation. --- src/Network/BitTorrent/MainlineDHT.hs | 48 +++++++++++++++++------------------ src/Network/Kademlia/Bootstrap.hs | 34 +++++++++++++++++++++++++ src/Network/Tox/DHT/Handlers.hs | 30 +++++++++++----------- 3 files changed, 72 insertions(+), 40 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index 268cacfb..3e7a0eda 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs @@ -530,15 +530,19 @@ traced (TableMethods ins del lkup) type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) +-- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'. +mkNodeInfo :: NodeId -> SockAddr -> NodeInfo +mkNodeInfo nid addr = NodeInfo + { nodeId = nid + , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr + , nodePort = fromMaybe 0 $ sockAddrPort addr + } + newClient :: SwarmsDatabase -> SockAddr -> IO (MainlineClient, Routing) newClient swarms addr = do udp <- udpTransport addr nid <- NodeId <$> getRandomBytes 20 - let tentative_info = NodeInfo - { nodeId = nid - , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr - , nodePort = fromMaybe 0 $ sockAddrPort addr - } + let tentative_info = mkNodeInfo nid addr tentative_info6 <- maybe tentative_info (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) @@ -549,37 +553,31 @@ newClient swarms addr = do addr4 <- atomically $ newTChan addr6 <- atomically $ newTChan mkrouting <- atomically $ do - let nobkts = R.defaultBucketCount :: Int - tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts - tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts - let updateIPVote tblvar addrvar a = do + -- We defer initializing the refreshSearch and refreshPing until we + -- have a client to send queries with. + let nullPing = const $ return False + nullSearch = mainlineSearch $ \_ _ -> return Nothing + refresher4 <- newBucketRefresher mainlineSpace tentative_info nullSearch nullPing + refresher6 <- newBucketRefresher mainlineSpace tentative_info6 nullSearch nullPing + let tbl4 = refreshBuckets refresher4 + tbl6 = refreshBuckets refresher6 + updateIPVote tblvar addrvar a = do bkts <- readTVar tblvar case bep42 a (nodeId $ R.thisNode bkts) of Just nid -> do let tbl = R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) - (NodeInfo nid - (fromMaybe (toEnum 0) $ fromSockAddr a) - (fromMaybe 0 $ sockAddrPort a)) - nobkts + (mkNodeInfo nid a) + (R.defaultBucketCount) writeTVar tblvar tbl writeTChan addrvar (a,map fst $ concat $ R.toList bkts) Nothing -> return () committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 - sched4 <- newTVar Int.empty - sched6 <- newTVar Int.empty return $ \client -> - let refresher sched bkts = BucketRefresher - { refreshInterval = 15 * 60 - , refreshQueue = sched - , refreshSearch = nodeSearch client - , refreshBuckets = bkts - , refreshPing = ping client - } - refresher4 = refresher sched4 tbl4 - refresher6 = refresher sched6 tbl6 - in Routing tentative_info committee4 committee6 refresher4 refresher6 + -- Now we have a client, so tell the BucketRefresher how to search and ping. + let updIO r = updateRefresherIO (nodeSearch client) (ping client) r + in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) map_var <- atomically $ newTVar (0, mempty) let routing = mkrouting outgoingClient diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 92a20ca5..87fdc22f 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs @@ -32,6 +32,7 @@ import Data.IP import Data.Monoid import Data.Serialize (Serialize) import Data.Time.Clock.POSIX (POSIXTime) +import Data.Ord import System.Entropy import System.Timeout import Text.PrettyPrint as PP hiding (($$), (<>)) @@ -72,6 +73,39 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher , refreshPing :: ni -> IO Bool } +newBucketRefresher :: (Ord addr, Ord a, Hashable a) + => KademliaSpace a ni + -> ni + -> Search nid addr tok ni ni + -> (ni -> IO Bool) + -> STM (BucketRefresher nid ni) +newBucketRefresher spc template_ni sch ping = do + let nodeId = kademliaLocation spc + bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount + sched <- newTVar Int.empty + return BucketRefresher + { refreshInterval = 15 * 60 + , refreshQueue = sched + , refreshSearch = sch + , refreshBuckets = bkts + , refreshPing = ping + } + +-- | This was added to avoid the compile error "Record update for +-- insufficiently polymorphic field" when trying to update the existentially +-- quantified field 'refreshSearch'. +updateRefresherIO :: Ord addr + => Search nid addr tok ni ni + -> (ni -> IO Bool) + -> BucketRefresher nid ni -> BucketRefresher nid ni +updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher + { refreshSearch = sch + , refreshPing = ping + , refreshInterval = refreshInterval + , refreshBuckets = refreshBuckets + , refreshQueue = refreshQueue + } + -- | Fork a refresh loop. Kill the returned thread to terminate it. forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId forkPollForRefresh BucketRefresher{ refreshInterval diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index d3d36525..c1f57177 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs @@ -150,24 +150,24 @@ newRouting addr crypto update4 update6 = do SockAddrInet {} -> return Nothing _ -> global6 atomically $ do - let nobkts = R.defaultBucketCount :: Int - tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 nobkts - tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts + -- We defer initializing the refreshSearch and refreshPing until we + -- have a client to send queries with. + let nullPing = const $ return False + nullSearch = Search + { searchSpace = toxSpace + , searchNodeAddress = nodeIP &&& nodePort + , searchQuery = \_ _ -> return Nothing + } + refresher4 <- newBucketRefresher toxSpace tentative_info nullSearch nullPing + refresher6 <- newBucketRefresher toxSpace tentative_info6 nullSearch nullPing + let tbl4 = refreshBuckets refresher4 + tbl6 = refreshBuckets refresher6 committee4 <- newTriadCommittee (update4 tbl4) -- updateIPVote tbl4 addr4 committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 - sched4 <- newTVar Int.empty - sched6 <- newTVar Int.empty return $ \client -> - let refresher sched bkts = BucketRefresher - { refreshInterval = 15 * 60 - , refreshQueue = sched - , refreshSearch = nodeSearch client - , refreshBuckets = bkts - , refreshPing = ping client - } - refresher4 = refresher sched4 tbl6 - refresher6 = refresher sched6 tbl6 - in Routing tentative_info committee4 committee6 refresher4 refresher6 + -- Now we have a client, so tell the BucketRefresher how to search and ping. + let updIO r = updateRefresherIO (nodeSearch client) (ping client) r + in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) -- TODO: This should cover more cases -- cgit v1.2.3