diff options
-rw-r--r-- | examples/dhtd.hs | 9 | ||||
-rw-r--r-- | src/Network/BitTorrent/MainlineDHT.hs | 70 | ||||
-rw-r--r-- | src/Network/Kademlia/Bootstrap.hs | 227 | ||||
-rw-r--r-- | src/Network/Tox.hs | 30 | ||||
-rw-r--r-- | src/Network/Tox/DHT/Handlers.hs | 61 |
5 files changed, 240 insertions, 157 deletions
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index d1db1938..7d3661e6 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -61,7 +61,7 @@ import Network.Address hiding (NodeId, NodeInfo(..)) | |||
61 | import Network.QueryResponse | 61 | import Network.QueryResponse |
62 | import Network.StreamServer | 62 | import Network.StreamServer |
63 | import Network.Kademlia | 63 | import Network.Kademlia |
64 | import Network.Kademlia.Bootstrap (bootstrap) | 64 | import Network.Kademlia.Bootstrap |
65 | import Network.Kademlia.Search | 65 | import Network.Kademlia.Search |
66 | import qualified Network.BitTorrent.MainlineDHT as Mainline | 66 | import qualified Network.BitTorrent.MainlineDHT as Mainline |
67 | import qualified Network.Tox as Tox | 67 | import qualified Network.Tox as Tox |
@@ -1323,7 +1323,12 @@ main = do | |||
1323 | case Map.lookup "node" qrys of | 1323 | case Map.lookup "node" qrys of |
1324 | Just DHTQuery { qsearch = srch } -> do | 1324 | Just DHTQuery { qsearch = srch } -> do |
1325 | case eqT of | 1325 | case eqT of |
1326 | Just witness -> bootstrap (isNodesSearch witness srch) bkts ping btSaved fallbackNodes | 1326 | Just witness -> let strapper = BucketRefresher |
1327 | { refreshSearch = isNodesSearch witness srch | ||
1328 | , refreshBuckets = bkts | ||
1329 | , refreshPing = ping | ||
1330 | } | ||
1331 | in bootstrap strapper btSaved fallbackNodes | ||
1327 | _ -> error $ "Missing node-search for "++netname++"." | 1332 | _ -> error $ "Missing node-search for "++netname++"." |
1328 | saveNodes netname dht | 1333 | saveNodes netname dht |
1329 | Nothing -> return () | 1334 | Nothing -> return () |
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index c7ef560c..268cacfb 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -6,6 +6,7 @@ | |||
6 | {-# LANGUAGE FlexibleInstances #-} | 6 | {-# LANGUAGE FlexibleInstances #-} |
7 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 7 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
8 | {-# LANGUAGE LambdaCase #-} | 8 | {-# LANGUAGE LambdaCase #-} |
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
9 | {-# LANGUAGE PatternSynonyms #-} | 10 | {-# LANGUAGE PatternSynonyms #-} |
10 | {-# LANGUAGE StandaloneDeriving #-} | 11 | {-# LANGUAGE StandaloneDeriving #-} |
11 | {-# LANGUAGE TupleSections #-} | 12 | {-# LANGUAGE TupleSections #-} |
@@ -502,14 +503,24 @@ newSwarmsDatabase = do | |||
502 | 503 | ||
503 | data Routing = Routing | 504 | data Routing = Routing |
504 | { tentativeId :: NodeInfo | 505 | { tentativeId :: NodeInfo |
505 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) | ||
506 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) | ||
507 | , committee4 :: TriadCommittee NodeId SockAddr | 506 | , committee4 :: TriadCommittee NodeId SockAddr |
508 | , sched6 :: !( TVar (Int.PSQ POSIXTime) ) | ||
509 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | ||
510 | , committee6 :: TriadCommittee NodeId SockAddr | 507 | , committee6 :: TriadCommittee NodeId SockAddr |
508 | , refresher4 :: BucketRefresher NodeId NodeInfo | ||
509 | , refresher6 :: BucketRefresher NodeId NodeInfo | ||
511 | } | 510 | } |
512 | 511 | ||
512 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
513 | sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue | ||
514 | |||
515 | sched6 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
516 | sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue | ||
517 | |||
518 | routing4 :: Routing -> TVar (R.BucketList NodeInfo) | ||
519 | routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
520 | |||
521 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | ||
522 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
523 | |||
513 | traced :: Show tid => TableMethods t tid -> TableMethods t tid | 524 | traced :: Show tid => TableMethods t tid -> TableMethods t tid |
514 | traced (TableMethods ins del lkup) | 525 | traced (TableMethods ins del lkup) |
515 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) | 526 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) |
@@ -537,7 +548,7 @@ newClient swarms addr = do | |||
537 | <$> global6 | 548 | <$> global6 |
538 | addr4 <- atomically $ newTChan | 549 | addr4 <- atomically $ newTChan |
539 | addr6 <- atomically $ newTChan | 550 | addr6 <- atomically $ newTChan |
540 | routing <- atomically $ do | 551 | mkrouting <- atomically $ do |
541 | let nobkts = R.defaultBucketCount :: Int | 552 | let nobkts = R.defaultBucketCount :: Int |
542 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts | 553 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts |
543 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts | 554 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts |
@@ -558,9 +569,22 @@ newClient swarms addr = do | |||
558 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 | 569 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 |
559 | sched4 <- newTVar Int.empty | 570 | sched4 <- newTVar Int.empty |
560 | sched6 <- newTVar Int.empty | 571 | sched6 <- newTVar Int.empty |
561 | return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | 572 | return $ \client -> |
573 | let refresher sched bkts = BucketRefresher | ||
574 | { refreshInterval = 15 * 60 | ||
575 | , refreshQueue = sched | ||
576 | , refreshSearch = nodeSearch client | ||
577 | , refreshBuckets = bkts | ||
578 | , refreshPing = ping client | ||
579 | } | ||
580 | refresher4 = refresher sched4 tbl4 | ||
581 | refresher6 = refresher sched6 tbl6 | ||
582 | in Routing tentative_info committee4 committee6 refresher4 refresher6 | ||
562 | map_var <- atomically $ newTVar (0, mempty) | 583 | map_var <- atomically $ newTVar (0, mempty) |
563 | let net = onInbound (updateRouting outgoingClient routing) | 584 | |
585 | let routing = mkrouting outgoingClient | ||
586 | |||
587 | net = onInbound (updateRouting outgoingClient routing) | ||
564 | $ layerTransport parsePacket encodePacket | 588 | $ layerTransport parsePacket encodePacket |
565 | $ udp | 589 | $ udp |
566 | 590 | ||
@@ -625,14 +649,9 @@ newClient swarms addr = do | |||
625 | -- TODO: trigger bootstrap ipv6 | 649 | -- TODO: trigger bootstrap ipv6 |
626 | again | 650 | again |
627 | 651 | ||
628 | refresh_thread4 <- forkPollForRefresh | 652 | |
629 | (15*60) | 653 | refresh_thread4 <- forkPollForRefresh $ refresher4 routing |
630 | (sched4 routing) | 654 | refresh_thread6 <- forkPollForRefresh $ refresher6 routing |
631 | (refreshBucket (nodeSearch client) (routing4 routing)) | ||
632 | refresh_thread6 <- forkPollForRefresh | ||
633 | (15*60) | ||
634 | (sched6 routing) | ||
635 | (refreshBucket (nodeSearch client) (routing6 routing)) | ||
636 | 655 | ||
637 | return (client, routing) | 656 | return (client, routing) |
638 | 657 | ||
@@ -676,14 +695,17 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
676 | returnError :: NodeInfo -> BValue -> IO Error | 695 | returnError :: NodeInfo -> BValue -> IO Error |
677 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) | 696 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) |
678 | 697 | ||
679 | mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo | 698 | mainlineKademlia :: MainlineClient |
680 | mainlineKademlia client committee var sched | 699 | -> TriadCommittee NodeId SockAddr |
700 | -> BucketRefresher NodeId NodeInfo | ||
701 | -> Kademlia NodeId NodeInfo | ||
702 | mainlineKademlia client committee refresher | ||
681 | = Kademlia quietInsertions | 703 | = Kademlia quietInsertions |
682 | mainlineSpace | 704 | mainlineSpace |
683 | (vanillaIO var $ ping client) | 705 | (vanillaIO (refreshBuckets refresher) $ ping client) |
684 | { tblTransition = \tr -> do | 706 | { tblTransition = \tr -> do |
685 | io1 <- transitionCommittee committee tr | 707 | io1 <- transitionCommittee committee tr |
686 | io2 <- touchBucket mainlineSpace (15*60) var sched tr | 708 | io2 <- touchBucket refresher tr |
687 | return $ do | 709 | return $ do |
688 | io1 >> io2 | 710 | io1 >> io2 |
689 | {- noisy (timestamp updates are currently reported as transitions to Accepted) | 711 | {- noisy (timestamp updates are currently reported as transitions to Accepted) |
@@ -712,11 +734,11 @@ transitionCommittee committee _ = return $ return () | |||
712 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () | 734 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () |
713 | updateRouting client routing naddr msg = do | 735 | updateRouting client routing naddr msg = do |
714 | case prefer4or6 naddr Nothing of | 736 | case prefer4or6 naddr Nothing of |
715 | Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) | 737 | Want_IP4 -> go (committee4 routing) (refresher4 routing) |
716 | Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) | 738 | Want_IP6 -> go (committee6 routing) (refresher6 routing) |
717 | where | 739 | where |
718 | go tbl committee sched = do | 740 | go committee refresher = do |
719 | self <- atomically $ R.thisNode <$> readTVar tbl | 741 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) |
720 | when (nodeIP self /= nodeIP naddr) $ do | 742 | when (nodeIP self /= nodeIP naddr) $ do |
721 | case msg of | 743 | case msg of |
722 | R { rspReflectedIP = Just sockaddr } | 744 | R { rspReflectedIP = Just sockaddr } |
@@ -724,7 +746,7 @@ updateRouting client routing naddr msg = do | |||
724 | -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) | 746 | -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) |
725 | atomically $ addVote committee (nodeId naddr) sockaddr | 747 | atomically $ addVote committee (nodeId naddr) sockaddr |
726 | _ -> return () | 748 | _ -> return () |
727 | insertNode (mainlineKademlia client committee tbl sched) naddr | 749 | insertNode (mainlineKademlia client committee refresher) naddr |
728 | 750 | ||
729 | data Ping = Ping deriving Show | 751 | data Ping = Ping deriving Show |
730 | 752 | ||
diff --git a/src/Network/Kademlia/Bootstrap.hs b/src/Network/Kademlia/Bootstrap.hs index 283e054a..42bff665 100644 --- a/src/Network/Kademlia/Bootstrap.hs +++ b/src/Network/Kademlia/Bootstrap.hs | |||
@@ -1,11 +1,14 @@ | |||
1 | {-# LANGUAGE CPP #-} | 1 | {-# LANGUAGE CPP #-} |
2 | {-# LANGUAGE ConstraintKinds #-} | ||
2 | {-# LANGUAGE DeriveFunctor #-} | 3 | {-# LANGUAGE DeriveFunctor #-} |
3 | {-# LANGUAGE DeriveTraversable #-} | 4 | {-# LANGUAGE DeriveTraversable #-} |
4 | {-# LANGUAGE FlexibleContexts #-} | 5 | {-# LANGUAGE FlexibleContexts #-} |
5 | {-# LANGUAGE GADTs #-} | 6 | {-# LANGUAGE GADTs #-} |
6 | {-# LANGUAGE KindSignatures #-} | 7 | {-# LANGUAGE KindSignatures #-} |
8 | {-# LANGUAGE NamedFieldPuns #-} | ||
7 | {-# LANGUAGE PartialTypeSignatures #-} | 9 | {-# LANGUAGE PartialTypeSignatures #-} |
8 | {-# LANGUAGE PatternSynonyms #-} | 10 | {-# LANGUAGE PatternSynonyms #-} |
11 | {-# LANGUAGE RankNTypes #-} | ||
9 | {-# LANGUAGE ScopedTypeVariables #-} | 12 | {-# LANGUAGE ScopedTypeVariables #-} |
10 | module Network.Kademlia.Bootstrap where | 13 | module Network.Kademlia.Bootstrap where |
11 | 14 | ||
@@ -42,34 +45,90 @@ import Network.Kademlia.Search | |||
42 | import Control.Concurrent.Tasks | 45 | import Control.Concurrent.Tasks |
43 | import Network.Kademlia | 46 | import Network.Kademlia |
44 | 47 | ||
45 | 48 | -- From BEP 05: | |
46 | -- | > pollForRefresh interval queue refresh | 49 | -- |
50 | -- Each bucket should maintain a "last changed" property to indicate how | ||
51 | -- "fresh" the contents are. | ||
52 | -- | ||
53 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
54 | -- a priority search queue. | ||
55 | -- | ||
56 | -- When... | ||
57 | -- | ||
58 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
59 | -- >>> bucketEvents = | ||
60 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
61 | -- >>> | ||
62 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
63 | -- >>> | ||
64 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
65 | -- >>> , Applicant :--> Accepted -- with another node, | ||
66 | -- >>> ] | ||
67 | -- | ||
68 | -- the bucket's last changed property should be updated. Buckets | ||
69 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
70 | -- by picking a random ID in the range of the bucket and performing a | ||
71 | -- find_nodes search on it. | ||
72 | -- | ||
73 | -- The only other possible BucketTouchEvents are as follows: | ||
47 | -- | 74 | -- |
48 | -- Fork a refresh loop. Kill the returned thread to terminate it. The | 75 | -- >>> not_handled = |
49 | -- arguments are: a staleness threshold (if a bucket goes this long without | 76 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: |
50 | -- being touched, a refresh will be triggered), a TVar with the time-to-refresh | 77 | -- >>> -- (Applicant :--> Stranger) |
51 | -- schedule for each bucket, and a refresh action to be forked when a bucket | 78 | -- >>> -- (Applicant :--> Accepted) |
52 | -- excedes the staleness threshold. | 79 | -- >>> , Accepted :--> Applicant -- Never happens |
80 | -- >>> ] | ||
53 | -- | 81 | -- |
54 | -- TO "touch" a bucket and prevent it from being refreshed, reschedule it's | 82 | |
55 | -- refresh time to some time into the future by modifying the 'Int.PSQ' in the | 83 | type SensibleNodeId nid ni = |
56 | -- TVar. | 84 | ( Show nid |
57 | forkPollForRefresh :: POSIXTime -> TVar (Int.PSQ POSIXTime) -> (Int -> IO a) -> IO ThreadId | 85 | , Ord nid |
58 | forkPollForRefresh interval psq refresh = do | 86 | , Ord ni |
59 | fork $ do | 87 | , Hashable nid |
88 | , Hashable ni ) | ||
89 | |||
90 | data BucketRefresher nid ni = forall tok addr. Ord addr => BucketRefresher | ||
91 | { -- | A staleness threshold (if a bucket goes this long without being | ||
92 | -- touched, a refresh will be triggered). | ||
93 | refreshInterval :: POSIXTime | ||
94 | -- | A TVar with the time-to-refresh schedule for each bucket. | ||
95 | -- | ||
96 | -- To "touch" a bucket and prevent it from being refreshed, reschedule | ||
97 | -- it's refresh time to some time into the future by modifying the | ||
98 | -- 'Int.PSQ' in the TVar. (See 'touchBucket'). | ||
99 | , refreshQueue :: TVar (Int.PSQ POSIXTime) | ||
100 | -- | This is the kademlia node search specification. | ||
101 | , refreshSearch :: Search nid addr tok ni ni | ||
102 | -- | The current kademlia routing table buckets. | ||
103 | , refreshBuckets :: TVar (R.BucketList ni) | ||
104 | -- | Action to ping a node. This is used only during initial bootstrap | ||
105 | -- to get some nodes in our table. A 'True' result is interpreted as a a | ||
106 | -- pong, where 'False' is a non-response. | ||
107 | , refreshPing :: ni -> IO Bool | ||
108 | } | ||
109 | |||
110 | -- | Fork a refresh loop. Kill the returned thread to terminate it. | ||
111 | forkPollForRefresh :: SensibleNodeId nid ni => BucketRefresher nid ni -> IO ThreadId | ||
112 | forkPollForRefresh BucketRefresher{ refreshInterval | ||
113 | , refreshQueue | ||
114 | , refreshBuckets | ||
115 | , refreshSearch } = fork $ do | ||
60 | myThreadId >>= flip labelThread "pollForRefresh" | 116 | myThreadId >>= flip labelThread "pollForRefresh" |
61 | fix $ \again -> do | 117 | fix $ \again -> do |
62 | join $ atomically $ do | 118 | join $ atomically $ do |
63 | nextup <- Int.findMin <$> readTVar psq | 119 | nextup <- Int.findMin <$> readTVar refreshQueue |
64 | maybe retry (return . go again) nextup | 120 | maybe retry (return . go again) nextup |
65 | where | 121 | where |
122 | refresh :: Int -> IO Int | ||
123 | refresh = refreshBucket refreshSearch refreshBuckets | ||
124 | |||
66 | go again ( bktnum :-> refresh_time ) = do | 125 | go again ( bktnum :-> refresh_time ) = do |
67 | now <- getPOSIXTime | 126 | now <- getPOSIXTime |
68 | case fromEnum (refresh_time - now) of | 127 | case fromEnum (refresh_time - now) of |
69 | x | x <= 0 -> do -- Refresh time! | 128 | x | x <= 0 -> do -- Refresh time! |
70 | -- Move it to the back of the refresh queue. | 129 | -- Move it to the back of the refresh queue. |
71 | atomically $ modifyTVar' psq | 130 | atomically $ modifyTVar' refreshQueue |
72 | $ Int.insert bktnum (now + interval) | 131 | $ Int.insert bktnum (now + refreshInterval) |
73 | -- Now fork the refresh operation. | 132 | -- Now fork the refresh operation. |
74 | -- TODO: We should probably propogate the kill signal to this thread. | 133 | -- TODO: We should probably propogate the kill signal to this thread. |
75 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) | 134 | fork $ do myThreadId >>= flip labelThread ("refresh."++show bktnum) |
@@ -79,15 +138,37 @@ forkPollForRefresh interval psq refresh = do | |||
79 | seconds -> threadDelay ( seconds * 1000000 ) | 138 | seconds -> threadDelay ( seconds * 1000000 ) |
80 | again | 139 | again |
81 | 140 | ||
82 | refreshBucket :: forall nid tok ni addr. | 141 | |
83 | ( Show nid | 142 | -- | This is a helper to 'refreshBucket' which does some book keeping to decide |
84 | , Serialize nid | 143 | -- whether or not a bucket is sufficiently refreshed or not. It will return |
85 | , Ord nid | 144 | -- false when we can terminate a node search. |
86 | , Ord ni | 145 | checkBucketFull :: Ord ni => KademliaSpace nid ni -- ^ Obtain a node id from a node. |
87 | , Ord addr | 146 | -> TVar (BucketList ni) -- ^ The current routing table. |
88 | , Hashable nid | 147 | -> TVar (Set.Set ni) -- ^ In-range nodes found so far. |
89 | , Hashable ni ) => | 148 | -> TVar Bool -- ^ The result will also be written here. |
90 | Search nid addr tok ni ni -> TVar (BucketList ni) -> Int -> IO Int | 149 | -> Int -- ^ The bucket number of interest. |
150 | -> ni -- ^ A newly found node. | ||
151 | -> STM Bool | ||
152 | checkBucketFull space var resultCounter fin n found_node = do | ||
153 | let fullcount = R.defaultBucketSize | ||
154 | saveit True = writeTVar fin True >> return True | ||
155 | saveit _ = return False | ||
156 | tbl <- readTVar var | ||
157 | let counts = R.shape tbl | ||
158 | nid = kademliaLocation space found_node | ||
159 | -- Update the result set with every found node that is in the | ||
160 | -- bucket of interest. | ||
161 | when (n == R.bucketNumber space nid tbl) | ||
162 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
163 | resultCount <- readTVar resultCounter | ||
164 | saveit $ case drop (n - 1) counts of | ||
165 | (cnt:_) | cnt < fullcount -> True -- bucket not full, keep going | ||
166 | _ | Set.size resultCount < fullcount -> True -- we haven't got many results, keep going | ||
167 | _ -> False -- okay, good enough, let's quit. | ||
168 | |||
169 | |||
170 | refreshBucket :: (Hashable a, Hashable t, Ord t, Ord addr, Ord a, Show t) => | ||
171 | Search t addr tok a a -> TVar (BucketList a) -> Int -> IO Int | ||
91 | refreshBucket sch var n = do | 172 | refreshBucket sch var n = do |
92 | tbl <- atomically (readTVar var) | 173 | tbl <- atomically (readTVar var) |
93 | let count = bktCount tbl | 174 | let count = bktCount tbl |
@@ -100,27 +181,13 @@ refreshBucket sch var n = do | |||
100 | (bucketRange n (n + 1 < count)) | 181 | (bucketRange n (n + 1 < count)) |
101 | fin <- atomically $ newTVar False | 182 | fin <- atomically $ newTVar False |
102 | resultCounter <- atomically $ newTVar Set.empty | 183 | resultCounter <- atomically $ newTVar Set.empty |
103 | let fullcount = R.defaultBucketSize | ||
104 | saveit True = writeTVar fin True >> return True | ||
105 | saveit _ = return False | ||
106 | checkBucketFull :: ni -> STM Bool | ||
107 | checkBucketFull found_node = do | ||
108 | tbl <- readTVar var | ||
109 | let counts = R.shape tbl | ||
110 | when (n == R.bucketNumber (searchSpace sch) (kademliaLocation (searchSpace sch) found_node) tbl) | ||
111 | $ modifyTVar' resultCounter (Set.insert found_node) | ||
112 | resultCount <- readTVar resultCounter | ||
113 | saveit $ case drop (n - 1) counts of | ||
114 | (cnt:_) | cnt < fullcount -> True | ||
115 | _ | Set.size resultCount < fullcount -> True | ||
116 | _ -> False | ||
117 | 184 | ||
118 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) | 185 | hPutStrLn stderr $ "Start refresh " ++ show (n,sample) |
119 | 186 | ||
120 | -- Set 15 minute timeout in order to avoid overlapping refreshes. | 187 | -- Set 15 minute timeout in order to avoid overlapping refreshes. |
121 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount | 188 | s <- search sch tbl sample $ if n+1 == R.defaultBucketCount |
122 | then const $ return True -- Never short-circuit the last bucket. | 189 | then const $ return True -- Never short-circuit the last bucket. |
123 | else checkBucketFull | 190 | else checkBucketFull (searchSpace sch) var resultCounter fin n |
124 | _ <- timeout (15*60*1000000) $ do | 191 | _ <- timeout (15*60*1000000) $ do |
125 | atomically $ searchIsFinished s >>= check | 192 | atomically $ searchIsFinished s >>= check |
126 | atomically $ searchCancel s | 193 | atomically $ searchCancel s |
@@ -131,20 +198,11 @@ refreshBucket sch var n = do | |||
131 | return $ if b then 1 else c | 198 | return $ if b then 1 else c |
132 | return rcount | 199 | return rcount |
133 | 200 | ||
134 | 201 | bootstrap :: (Ord ni, Ord nid, Hashable nid, Hashable ni, Foldable t, Foldable t1, Show nid) => | |
135 | bootstrap :: | 202 | BucketRefresher nid ni -> t1 ni -> t ni -> IO () |
136 | ( Show nid | 203 | bootstrap BucketRefresher { refreshSearch = sch |
137 | , Serialize nid | 204 | , refreshBuckets = var |
138 | -- , FiniteBits nid | 205 | , refreshPing = ping } ns ns0 = do |
139 | , Hashable ni | ||
140 | , Hashable nid | ||
141 | , Ord ni | ||
142 | , Ord addr | ||
143 | , Ord nid | ||
144 | , Traversable t1 | ||
145 | , Traversable t | ||
146 | ) => Search nid addr tok ni ni -> TVar (BucketList ni) -> (ni -> IO Bool) -> t ni -> t1 ni -> IO () | ||
147 | bootstrap sch var ping ns ns0 = do | ||
148 | gotPing <- atomically $ newTVar False | 206 | gotPing <- atomically $ newTVar False |
149 | 207 | ||
150 | -- First, ping the given nodes so that they are added to | 208 | -- First, ping the given nodes so that they are added to |
@@ -193,6 +251,9 @@ bootstrap sch var ping ns ns0 = do | |||
193 | cnt <- refreshBucket sch var num | 251 | cnt <- refreshBucket sch var num |
194 | again cnt | 252 | again cnt |
195 | 253 | ||
254 | -- | Reschedule a bucket's refresh-time. It should be called whenever a bucket | ||
255 | -- changes. This will typically be invoked from 'tblTransition'. | ||
256 | -- | ||
196 | -- XXX: This will be redundantly triggered twice upon every node replacement | 257 | -- XXX: This will be redundantly triggered twice upon every node replacement |
197 | -- because we do not currently distinguish between standalone | 258 | -- because we do not currently distinguish between standalone |
198 | -- insertion/deletion events and an insertion/deletion pair constituting | 259 | -- insertion/deletion events and an insertion/deletion pair constituting |
@@ -201,55 +262,21 @@ bootstrap sch var ping ns ns0 = do | |||
201 | -- It might also be better to pass the timestamp of the transition here and | 262 | -- It might also be better to pass the timestamp of the transition here and |
202 | -- keep the refresh queue in better sync with the routing table by updating it | 263 | -- keep the refresh queue in better sync with the routing table by updating it |
203 | -- within the STM monad. | 264 | -- within the STM monad. |
204 | touchBucket :: KademliaSpace nid ni | 265 | touchBucket :: BucketRefresher nid ni |
205 | -> POSIXTime | 266 | -> RoutingTransition ni -- ^ What happened to the bucket? |
206 | -> TVar (BucketList ni) | ||
207 | -> TVar (Int.PSQ POSIXTime) | ||
208 | -> RoutingTransition ni | ||
209 | -> STM (IO ()) | 267 | -> STM (IO ()) |
210 | touchBucket space interval bkts psq tr | 268 | touchBucket BucketRefresher{ refreshSearch |
269 | , refreshInterval | ||
270 | , refreshBuckets | ||
271 | , refreshQueue | ||
272 | } | ||
273 | tr | ||
211 | | (transitionedTo tr == Applicant) | 274 | | (transitionedTo tr == Applicant) |
212 | = return $ return () | 275 | = return $ return () |
213 | | otherwise = return $ do | 276 | | otherwise = return $ do |
214 | now <- getPOSIXTime | 277 | now <- getPOSIXTime |
215 | atomically $ do | 278 | atomically $ do |
216 | let nid = kademliaLocation space (transitioningNode tr) | 279 | let space = searchSpace refreshSearch |
217 | num <- R.bucketNumber space nid <$> readTVar bkts | 280 | nid = kademliaLocation space (transitioningNode tr) |
218 | modifyTVar' psq $ Int.insert num (now + interval) | 281 | num <- R.bucketNumber space nid <$> readTVar refreshBuckets |
219 | 282 | modifyTVar' refreshQueue $ Int.insert num (now + refreshInterval) | |
220 | -- TODO: Bootstrap/Refresh | ||
221 | -- | ||
222 | -- From BEP 05: | ||
223 | -- | ||
224 | -- Each bucket should maintain a "last changed" property to indicate how | ||
225 | -- "fresh" the contents are. | ||
226 | -- | ||
227 | -- Note: We will use a "time to next refresh" property instead and store it in | ||
228 | -- a priority search queue. | ||
229 | -- | ||
230 | -- When... | ||
231 | -- | ||
232 | -- >>> data BucketTouchEvent = RoutingStatus :--> RoutingStatus | ||
233 | -- >>> bucketEvents = | ||
234 | -- >>> [ Applicant :--> Stranger -- a node in a bucket is pinged and it responds, | ||
235 | -- >>> | ||
236 | -- >>> , Stranger :--> Accepted -- or a node is added to a bucket, | ||
237 | -- >>> | ||
238 | -- >>> , Accepted :--> Stranger -- or a node in a bucket is replaced | ||
239 | -- >>> , Applicant :--> Accepted -- with another node, | ||
240 | -- >>> ] | ||
241 | -- | ||
242 | -- the bucket's last changed property should be updated. Buckets | ||
243 | -- that have not been changed in 15 minutes should be "refreshed." This is done | ||
244 | -- by picking a random ID in the range of the bucket and performing a | ||
245 | -- find_nodes search on it. | ||
246 | -- | ||
247 | -- The only other possible BucketTouchEvents are as follows: | ||
248 | -- | ||
249 | -- >>> not_handled = | ||
250 | -- >>> , Stranger :--> Applicant -- A ping is pending, it's result is covered: | ||
251 | -- >>> -- (Applicant :--> Stranger) | ||
252 | -- >>> -- (Applicant :--> Accepted) | ||
253 | -- >>> , Accepted :--> Applicant -- Never happens | ||
254 | -- >>> ] | ||
255 | -- | ||
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index 35eaebb5..15b00780 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs | |||
@@ -167,11 +167,15 @@ w64Key (DHT.TransactionId (Nonce8 w) _) = w | |||
167 | nonceKey :: DHT.TransactionId -> Nonce8 | 167 | nonceKey :: DHT.TransactionId -> Nonce8 |
168 | nonceKey (DHT.TransactionId n _) = n | 168 | nonceKey (DHT.TransactionId n _) = n |
169 | 169 | ||
170 | myAddr :: DHT.Routing -> Maybe NodeInfo -> IO NodeInfo | 170 | -- | Return my own address. |
171 | myAddr routing maddr = atomically $ do | 171 | myAddr :: TVar (R.BucketList NodeInfo) -- ^ IPv4 buckets |
172 | -> TVar (R.BucketList NodeInfo) -- ^ IPv6 buckets | ||
173 | -> Maybe NodeInfo -- ^ Interested remote address | ||
174 | -> IO NodeInfo | ||
175 | myAddr routing4 routing6 maddr = atomically $ do | ||
172 | let var = case flip DHT.prefer4or6 Nothing <$> maddr of | 176 | let var = case flip DHT.prefer4or6 Nothing <$> maddr of |
173 | Just Want_IP6 -> DHT.routing6 routing | 177 | Just Want_IP6 -> routing4 |
174 | _ -> DHT.routing4 routing | 178 | _ -> routing6 |
175 | a <- readTVar var | 179 | a <- readTVar var |
176 | return $ R.thisNode a | 180 | return $ R.thisNode a |
177 | 181 | ||
@@ -179,7 +183,7 @@ newClient :: (DRG g, Show addr, Show meth) => | |||
179 | g -> Transport String addr x | 183 | g -> Transport String addr x |
180 | -> (Client String meth DHT.TransactionId addr x -> x -> MessageClass String meth DHT.TransactionId addr x) | 184 | -> (Client String meth DHT.TransactionId addr x -> x -> MessageClass String meth DHT.TransactionId addr x) |
181 | -> (Maybe addr -> IO addr) | 185 | -> (Maybe addr -> IO addr) |
182 | -> (meth -> Maybe (MethodHandler String DHT.TransactionId addr x)) | 186 | -> (Client String meth DHT.TransactionId addr x -> meth -> Maybe (MethodHandler String DHT.TransactionId addr x)) |
183 | -> (forall d. TransactionMethods d DHT.TransactionId addr x -> TransactionMethods d DHT.TransactionId addr x) | 187 | -> (forall d. TransactionMethods d DHT.TransactionId addr x -> TransactionMethods d DHT.TransactionId addr x) |
184 | -> (Client String meth DHT.TransactionId addr x -> Transport String addr x -> Transport String addr x) | 188 | -> (Client String meth DHT.TransactionId addr x -> Transport String addr x -> Transport String addr x) |
185 | -> IO (Client String meth DHT.TransactionId addr x) | 189 | -> IO (Client String meth DHT.TransactionId addr x) |
@@ -207,7 +211,7 @@ newClient drg net classify selfAddr handlers modifytbl modifynet = do | |||
207 | mkclient (tbl,var) handlers = | 211 | mkclient (tbl,var) handlers = |
208 | let client = Client | 212 | let client = Client |
209 | { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net | 213 | { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net |
210 | , clientDispatcher = dispatch tbl var handlers client | 214 | , clientDispatcher = dispatch tbl var (handlers client) client |
211 | , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } | 215 | , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } |
212 | , clientPending = var | 216 | , clientPending = var |
213 | , clientAddress = selfAddr | 217 | , clientAddress = selfAddr |
@@ -308,13 +312,15 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do | |||
308 | drg <- drgNew | 312 | drg <- drgNew |
309 | let lookupClose _ = return Nothing | 313 | let lookupClose _ = return Nothing |
310 | 314 | ||
311 | routing <- DHT.newRouting addr crypto updateIP updateIP | 315 | mkrouting <- DHT.newRouting addr crypto updateIP updateIP |
312 | let ignoreErrors _ = return () -- Set this to (hPutStrLn stderr) to debug onion route building. | 316 | let ignoreErrors _ = return () -- Set this to (hPutStrLn stderr) to debug onion route building. |
313 | orouter <- newOnionRouter ignoreErrors | 317 | orouter <- newOnionRouter ignoreErrors |
314 | (dhtcrypt,onioncrypt,dtacrypt,cryptonet) <- toxTransport crypto orouter lookupClose udp | 318 | (dhtcrypt,onioncrypt,dtacrypt,cryptonet) <- toxTransport crypto orouter lookupClose udp |
315 | let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt | 319 | let dhtnet0 = layerTransportM (DHT.decrypt crypto) (DHT.encrypt crypto) dhtcrypt |
316 | dhtclient <- newClient drg dhtnet0 DHT.classify (myAddr routing) (DHT.handlers crypto routing) id | 320 | tbl4 = DHT.routing4 $ mkrouting (error "missing client") |
317 | $ \client net -> onInbound (DHT.updateRouting client routing orouter) net | 321 | tbl6 = DHT.routing6 $ mkrouting (error "missing client") |
322 | dhtclient <- newClient drg dhtnet0 DHT.classify (myAddr tbl4 tbl6) (DHT.handlers crypto . mkrouting) id | ||
323 | $ \client net -> onInbound (DHT.updateRouting client (mkrouting client) orouter) net | ||
318 | 324 | ||
319 | orouter <- forkRouteBuilder orouter $ \nid ni -> fmap (\(_,ns,_)->ns) <$> DHT.getNodes dhtclient nid ni | 325 | orouter <- forkRouteBuilder orouter $ \nid ni -> fmap (\(_,ns,_)->ns) <$> DHT.getNodes dhtclient nid ni |
320 | 326 | ||
@@ -324,8 +330,8 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do | |||
324 | oniondrg <- drgNew | 330 | oniondrg <- drgNew |
325 | let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt | 331 | let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt |
326 | onionclient <- newClient oniondrg onionnet (const Onion.classify) | 332 | onionclient <- newClient oniondrg onionnet (const Onion.classify) |
327 | (getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 routing)) | 333 | (getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 $ mkrouting dhtclient)) |
328 | (Onion.handlers onionnet routing toks keydb) | 334 | (const $ Onion.handlers onionnet (mkrouting dhtclient) toks keydb) |
329 | (hookQueries orouter DHT.transactionKey) | 335 | (hookQueries orouter DHT.transactionKey) |
330 | (const id) | 336 | (const id) |
331 | 337 | ||
@@ -337,7 +343,7 @@ newTox keydb addr mbSessionsState suppliedDHTKey = do | |||
337 | , toxCrypto = addHandler (hPutStrLn stderr) (cryptoNetHandler sessionsState) cryptonet | 343 | , toxCrypto = addHandler (hPutStrLn stderr) (cryptoNetHandler sessionsState) cryptonet |
338 | , toxCryptoSessions = sessionsState | 344 | , toxCryptoSessions = sessionsState |
339 | , toxCryptoKeys = crypto | 345 | , toxCryptoKeys = crypto |
340 | , toxRouting = routing | 346 | , toxRouting = mkrouting dhtclient |
341 | , toxTokens = toks | 347 | , toxTokens = toks |
342 | , toxAnnouncedKeys = keydb | 348 | , toxAnnouncedKeys = keydb |
343 | , toxOnionRoutes = orouter | 349 | , toxOnionRoutes = orouter |
diff --git a/src/Network/Tox/DHT/Handlers.hs b/src/Network/Tox/DHT/Handlers.hs index 494e319b..d3d36525 100644 --- a/src/Network/Tox/DHT/Handlers.hs +++ b/src/Network/Tox/DHT/Handlers.hs | |||
@@ -1,7 +1,8 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
3 | {-# LANGUAGE NamedFieldPuns #-} | ||
2 | {-# LANGUAGE PatternSynonyms #-} | 4 | {-# LANGUAGE PatternSynonyms #-} |
3 | {-# LANGUAGE TupleSections #-} | 5 | {-# LANGUAGE TupleSections #-} |
4 | {-# LANGUAGE CPP #-} | ||
5 | module Network.Tox.DHT.Handlers where | 6 | module Network.Tox.DHT.Handlers where |
6 | 7 | ||
7 | import Network.Tox.DHT.Transport as DHTTransport | 8 | import Network.Tox.DHT.Transport as DHTTransport |
@@ -11,7 +12,7 @@ import Crypto.Tox | |||
11 | import Network.Kademlia.Search | 12 | import Network.Kademlia.Search |
12 | import qualified Data.Wrapper.PSQInt as Int | 13 | import qualified Data.Wrapper.PSQInt as Int |
13 | import Network.Kademlia | 14 | import Network.Kademlia |
14 | import Network.Kademlia.Bootstrap (touchBucket) | 15 | import Network.Kademlia.Bootstrap |
15 | import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) | 16 | import Network.Address (WantIP (..), ipFamily, testIdBit,fromSockAddr, sockAddrPort) |
16 | import qualified Network.Kademlia.Routing as R | 17 | import qualified Network.Kademlia.Routing as R |
17 | import Control.TriadCommittee | 18 | import Control.TriadCommittee |
@@ -111,18 +112,28 @@ classify client msg = fromMaybe (IsUnknown "unknown") | |||
111 | 112 | ||
112 | data Routing = Routing | 113 | data Routing = Routing |
113 | { tentativeId :: NodeInfo | 114 | { tentativeId :: NodeInfo |
114 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) | ||
115 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) | ||
116 | , committee4 :: TriadCommittee NodeId SockAddr | 115 | , committee4 :: TriadCommittee NodeId SockAddr |
117 | , sched6 :: !( TVar (Int.PSQ POSIXTime) ) | ||
118 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | ||
119 | , committee6 :: TriadCommittee NodeId SockAddr | 116 | , committee6 :: TriadCommittee NodeId SockAddr |
117 | , refresher4 :: BucketRefresher NodeId NodeInfo | ||
118 | , refresher6 :: BucketRefresher NodeId NodeInfo | ||
120 | } | 119 | } |
121 | 120 | ||
121 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
122 | sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue | ||
123 | |||
124 | sched6 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
125 | sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue | ||
126 | |||
127 | routing4 :: Routing -> TVar (R.BucketList NodeInfo) | ||
128 | routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
129 | |||
130 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | ||
131 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
132 | |||
122 | newRouting :: SockAddr -> TransportCrypto | 133 | newRouting :: SockAddr -> TransportCrypto |
123 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change | 134 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change |
124 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change | 135 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change |
125 | -> IO Routing | 136 | -> IO (Client -> Routing) |
126 | newRouting addr crypto update4 update6 = do | 137 | newRouting addr crypto update4 update6 = do |
127 | let tentative_ip4 = fromMaybe (IPv4 $ toEnum 0) (IPv4 <$> fromSockAddr addr) | 138 | let tentative_ip4 = fromMaybe (IPv4 $ toEnum 0) (IPv4 <$> fromSockAddr addr) |
128 | tentative_ip6 = fromMaybe (IPv6 $ toEnum 0) (IPv6 <$> fromSockAddr addr) | 139 | tentative_ip6 = fromMaybe (IPv6 $ toEnum 0) (IPv6 <$> fromSockAddr addr) |
@@ -146,7 +157,17 @@ newRouting addr crypto update4 update6 = do | |||
146 | committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 | 157 | committee6 <- newTriadCommittee (update6 tbl6) -- updateIPVote tbl6 addr6 |
147 | sched4 <- newTVar Int.empty | 158 | sched4 <- newTVar Int.empty |
148 | sched6 <- newTVar Int.empty | 159 | sched6 <- newTVar Int.empty |
149 | return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | 160 | return $ \client -> |
161 | let refresher sched bkts = BucketRefresher | ||
162 | { refreshInterval = 15 * 60 | ||
163 | , refreshQueue = sched | ||
164 | , refreshSearch = nodeSearch client | ||
165 | , refreshBuckets = bkts | ||
166 | , refreshPing = ping client | ||
167 | } | ||
168 | refresher4 = refresher sched4 tbl6 | ||
169 | refresher6 = refresher sched6 tbl6 | ||
170 | in Routing tentative_info committee4 committee6 refresher4 refresher6 | ||
150 | 171 | ||
151 | 172 | ||
152 | -- TODO: This should cover more cases | 173 | -- TODO: This should cover more cases |
@@ -322,26 +343,28 @@ updateRouting client routing orouter naddr msg | |||
322 | | PacketKind 0x21 <- msgType msg = return () -- ignore lan discovery | 343 | | PacketKind 0x21 <- msgType msg = return () -- ignore lan discovery |
323 | | otherwise = do | 344 | | otherwise = do |
324 | case prefer4or6 naddr Nothing of | 345 | case prefer4or6 naddr Nothing of |
325 | Want_IP4 -> updateTable client naddr orouter (routing4 routing) (committee4 routing) (sched4 routing) | 346 | Want_IP4 -> updateTable client naddr orouter (committee4 routing) (refresher4 routing) |
326 | Want_IP6 -> updateTable client naddr orouter (routing6 routing) (committee6 routing) (sched6 routing) | 347 | Want_IP6 -> updateTable client naddr orouter (committee6 routing) (refresher4 routing) |
327 | Want_Both -> error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ | 348 | Want_Both -> error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ |
328 | 349 | ||
329 | updateTable :: Client -> NodeInfo -> OnionRouter -> TVar (R.BucketList NodeInfo) -> TriadCommittee NodeId SockAddr -> TVar (Int.PSQ POSIXTime) -> IO () | 350 | updateTable :: Client -> NodeInfo -> OnionRouter -> TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () |
330 | updateTable client naddr orouter tbl committee sched = do | 351 | updateTable client naddr orouter committee refresher = do |
331 | self <- atomically $ R.thisNode <$> readTVar tbl | 352 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) |
332 | when (nodeIP self /= nodeIP naddr) $ do | 353 | when (nodeIP self /= nodeIP naddr) $ do |
333 | -- TODO: IP address vote? | 354 | -- TODO: IP address vote? |
334 | insertNode (toxKademlia client committee orouter tbl sched) naddr | 355 | insertNode (toxKademlia client committee orouter refresher) naddr |
335 | 356 | ||
336 | toxKademlia :: Client -> TriadCommittee NodeId SockAddr -> OnionRouter -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo | 357 | toxKademlia :: Client -> TriadCommittee NodeId SockAddr -> OnionRouter |
337 | toxKademlia client committee orouter var sched | 358 | -> BucketRefresher NodeId NodeInfo |
359 | -> Kademlia NodeId NodeInfo | ||
360 | toxKademlia client committee orouter refresher | ||
338 | = Kademlia quietInsertions | 361 | = Kademlia quietInsertions |
339 | toxSpace | 362 | toxSpace |
340 | (vanillaIO var $ ping client) | 363 | (vanillaIO (refreshBuckets refresher) $ ping client) |
341 | { tblTransition = \tr -> do | 364 | { tblTransition = \tr -> do |
342 | io1 <- transitionCommittee committee tr | 365 | io1 <- transitionCommittee committee tr |
343 | io2 <- touchBucket toxSpace (15*60) var sched tr | 366 | io2 <- touchBucket refresher tr -- toxSpace (15*60) var sched tr |
344 | hookBucketList toxSpace var orouter tr | 367 | hookBucketList toxSpace (refreshBuckets refresher) orouter tr |
345 | return $ do | 368 | return $ do |
346 | io1 >> io2 | 369 | io1 >> io2 |
347 | {- | 370 | {- |