diff options
author | joe <joe@jerkface.net> | 2017-11-07 18:51:05 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-08 02:30:43 -0500 |
commit | dbce015d0137152f74f46dea3b00d2b51e7c53f7 (patch) | |
tree | 39947fb5a0d0d0aedb0121f4bdd95c41caf0e152 | |
parent | 8c94bb53cc2eb09a5e1c550c3430935701c6f090 (diff) |
Moved BucketRefresher construction responsibility for greater
encapsulation.
-rw-r--r-- | src/Network/BitTorrent/MainlineDHT.hs | 48 | ||||
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 34 | ||||
-rw-r--r-- | src/Network/Tox/DHT/Handlers.hs | 30 |
3 files changed, 72 insertions, 40 deletions
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index 268cacfb..3e7a0eda 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -530,15 +530,19 @@ traced (TableMethods ins del lkup) | |||
530 | 530 | ||
531 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) | 531 | type MainlineClient = Client String Method TransactionId NodeInfo (Message BValue) |
532 | 532 | ||
533 | -- | Like 'nodeInfo' but falls back to 'iNADDR_ANY' for nodeIP' and 'nodePort'. | ||
534 | mkNodeInfo :: NodeId -> SockAddr -> NodeInfo | ||
535 | mkNodeInfo nid addr = NodeInfo | ||
536 | { nodeId = nid | ||
537 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr | ||
538 | , nodePort = fromMaybe 0 $ sockAddrPort addr | ||
539 | } | ||
540 | |||
533 | newClient :: SwarmsDatabase -> SockAddr -> IO (MainlineClient, Routing) | 541 | newClient :: SwarmsDatabase -> SockAddr -> IO (MainlineClient, Routing) |
534 | newClient swarms addr = do | 542 | newClient swarms addr = do |
535 | udp <- udpTransport addr | 543 | udp <- udpTransport addr |
536 | nid <- NodeId <$> getRandomBytes 20 | 544 | nid <- NodeId <$> getRandomBytes 20 |
537 | let tentative_info = NodeInfo | 545 | let tentative_info = mkNodeInfo nid addr |
538 | { nodeId = nid | ||
539 | , nodeIP = fromMaybe (toEnum 0) $ fromSockAddr addr | ||
540 | , nodePort = fromMaybe 0 $ sockAddrPort addr | ||
541 | } | ||
542 | tentative_info6 <- | 546 | tentative_info6 <- |
543 | maybe tentative_info | 547 | maybe tentative_info |
544 | (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) | 548 | (\ip6 -> tentative_info { nodeId = fromMaybe (nodeId tentative_info) |
@@ -549,37 +553,31 @@ newClient swarms addr = do | |||
549 | addr4 <- atomically $ newTChan | 553 | addr4 <- atomically $ newTChan |
550 | addr6 <- atomically $ newTChan | 554 | addr6 <- atomically $ newTChan |
551 | mkrouting <- atomically $ do | 555 | mkrouting <- atomically $ do |
552 | let nobkts = R.defaultBucketCount :: Int | 556 | -- We defer initializing the refreshSearch and refreshPing until we |
553 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts | 557 | -- have a client to send queries with. |
554 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts | 558 | let nullPing = const $ return False |
555 | let updateIPVote tblvar addrvar a = do | 559 | nullSearch = mainlineSearch $ \_ _ -> return Nothing |
560 | refresher4 <- newBucketRefresher mainlineSpace tentative_info nullSearch nullPing | ||
561 | refresher6 <- newBucketRefresher mainlineSpace tentative_info6 nullSearch nullPing | ||
562 | let tbl4 = refreshBuckets refresher4 | ||
563 | tbl6 = refreshBuckets refresher6 | ||
564 | updateIPVote tblvar addrvar a = do | ||
556 | bkts <- readTVar tblvar | 565 | bkts <- readTVar tblvar |
557 | case bep42 a (nodeId $ R.thisNode bkts) of | 566 | case bep42 a (nodeId $ R.thisNode bkts) of |
558 | Just nid -> do | 567 | Just nid -> do |
559 | let tbl = R.nullTable (comparing nodeId) | 568 | let tbl = R.nullTable (comparing nodeId) |
560 | (\s -> hashWithSalt s . nodeId) | 569 | (\s -> hashWithSalt s . nodeId) |
561 | (NodeInfo nid | 570 | (mkNodeInfo nid a) |
562 | (fromMaybe (toEnum 0) $ fromSockAddr a) | 571 | (R.defaultBucketCount) |
563 | (fromMaybe 0 $ sockAddrPort a)) | ||
564 | nobkts | ||
565 | writeTVar tblvar tbl | 572 | writeTVar tblvar tbl |
566 | writeTChan addrvar (a,map fst $ concat $ R.toList bkts) | 573 | writeTChan addrvar (a,map fst $ concat $ R.toList bkts) |
567 | Nothing -> return () | 574 | Nothing -> return () |
568 | committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 | 575 | committee4 <- newTriadCommittee $ updateIPVote tbl4 addr4 |
569 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 | 576 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 |
570 | sched4 <- newTVar Int.empty | ||
571 | sched6 <- newTVar Int.empty | ||
572 | return $ \client -> | 577 | return $ \client -> |
573 | let refresher sched bkts = BucketRefresher | 578 | -- Now we have a client, so tell the BucketRefresher how to search and ping. |
574 | { refreshInterval = 15 * 60 | 579 | let updIO r = updateRefresherIO (nodeSearch client) (ping client) r |
575 | , refreshQueue = sched | 580 | in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) |
576 | , refreshSearch = nodeSearch client | ||
577 | , refreshBuckets = bkts | ||
578 | , refreshPing = ping client | ||
579 | } | ||
580 | refresher4 = refresher sched4 tbl4 | ||
581 | refresher6 = refresher sched6 tbl6 | ||
582 | in Routing tentative_info committee4 committee6 refresher4 refresher6 | ||
583 | map_var <- atomically $ newTVar (0, mempty) | 581 | map_var <- atomically $ newTVar (0, mempty) |
584 | 582 | ||
585 | let routing = mkrouting outgoingClient | 583 | let routing = mkrouting outgoingClient |
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 92a20ca5..87fdc22f 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs | |||
@@ -32,6 +32,7 @@ import Data.IP | |||
32 | import Data.Monoid | 32 | import Data.Monoid |
33 | import Data.Serialize (Serialize) | 33 | import Data.Serialize (Serialize) |
34 | import Data.Time.Clock.POSIX (POSIXTime) | 34 | import Data.Time.Clock.POSIX (POSIXTime) |
35 | import Data.Ord | ||
35 | import System.Entropy | 36 | import System.Entropy |
36 | import System.Timeout | 37 | import System.Timeout |
37 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 38 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
@@ -72,6 +73,39 @@ data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | |||
72 | , refreshPing :: ni -> IO Bool | 73 | , refreshPing :: ni -> IO Bool |
73 | } | 74 | } |
74 | 75 | ||
76 | newBucketRefresher :: (Ord addr, Ord a, Hashable a) | ||
77 | => KademliaSpace a ni | ||
78 | -> ni | ||
79 | -> Search nid addr tok ni ni | ||
80 | -> (ni -> IO Bool) | ||
81 | -> STM (BucketRefresher nid ni) | ||
82 | newBucketRefresher spc template_ni sch ping = do | ||
83 | let nodeId = kademliaLocation spc | ||
84 | bkts <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) template_ni R.defaultBucketCount | ||
85 | sched <- newTVar Int.empty | ||
86 | return BucketRefresher | ||
87 | { refreshInterval = 15 * 60 | ||
88 | , refreshQueue = sched | ||
89 | , refreshSearch = sch | ||
90 | , refreshBuckets = bkts | ||
91 | , refreshPing = ping | ||
92 | } | ||
93 | |||
94 | -- | This was added to avoid the compile error "Record update for | ||
95 | -- insufficiently polymorphic field" when trying to update the existentially | ||
96 | -- quantified field 'refreshSearch'. | ||
97 | updateRefresherIO :: Ord addr | ||
98 | => Search nid addr tok ni ni | ||
99 | -> (ni -> IO Bool) | ||
100 | -> BucketRefresher nid ni -> BucketRefresher nid ni | ||
101 | updateRefresherIO sch ping BucketRefresher{..} = BucketRefresher | ||
102 | { refreshSearch = sch | ||
103 | , refreshPing = ping | ||
104 | , refreshInterval = refreshInterval | ||
105 | , refreshBuckets = refreshBuckets | ||
106 | , refreshQueue = refreshQueue | ||
107 | } | ||
108 | |||
75 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | 109 | -- | Fork a refresh loop. Kill the returned thread to terminate it. |
76 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | 110 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId |
77 | forkPollForRefresh BucketRefresher{ refreshInterval | 111 | forkPollForRefresh BucketRefresher{ refreshInterval |
diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index d3d36525..c1f57177 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs | |||
@@ -150,24 +150,24 @@ newRouting addr crypto update4 update6 = do | |||
150 | SockAddrInet {} -> return Nothing | 150 | SockAddrInet {} -> return Nothing |
151 | _ -> global6 | 151 | _ -> global6 |
152 | atomically $ do | 152 | atomically $ do |
153 | let nobkts = R.defaultBucketCount :: Int | 153 | -- We defer initializing the refreshSearch and refreshPing until we |
154 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 nobkts | 154 | -- have a client to send queries with. |
155 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts | 155 | let nullPing = const $ return False |
156 | nullSearch = Search | ||
157 | { searchSpace = toxSpace | ||
158 | , searchNodeAddress = nodeIP &&& nodePort | ||
159 | , searchQuery = \_ _ -> return Nothing | ||
160 | } | ||
161 | refresher4 <- newBucketRefresher toxSpace tentative_info nullSearch nullPing | ||
162 | refresher6 <- newBucketRefresher toxSpace tentative_info6 nullSearch nullPing | ||
163 | let tbl4 = refreshBuckets refresher4 | ||
164 | tbl6 = refreshBuckets refresher6 | ||
156 | committee4 <- newTriadCommittee (update4 tbl4) -- updateIPVote tbl4 addr4 | 165 | committee4 <- newTriadCommittee (update4 tbl4) -- updateIPVote tbl4 addr4 |
157 | committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 | 166 | committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 |
158 | sched4 <- newTVar Int.empty | ||
159 | sched6 <- newTVar Int.empty | ||
160 | return $ \client -> | 167 | return $ \client -> |
161 | let refresher sched bkts = BucketRefresher | 168 | -- Now we have a client, so tell the BucketRefresher how to search and ping. |
162 | { refreshInterval = 15 * 60 | 169 | let updIO r = updateRefresherIO (nodeSearch client) (ping client) r |
163 | , refreshQueue = sched | 170 | in Routing tentative_info committee4 committee6 (updIO refresher4) (updIO refresher6) |
164 | , refreshSearch = nodeSearch client | ||
165 | , refreshBuckets = bkts | ||
166 | , refreshPing = ping client | ||
167 | } | ||
168 | refresher4 = refresher sched4 tbl6 | ||
169 | refresher6 = refresher sched6 tbl6 | ||
170 | in Routing tentative_info committee4 committee6 refresher4 refresher6 | ||
171 | 171 | ||
172 | 172 | ||
173 | -- TODO: This should cover more cases | 173 | -- TODO: This should cover more cases |