diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 21:27:50 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-07 13:24:59 -0500 |
commit | c7fb8cfe16f821e4e148d1855a18cb81255743bc (patch) | |
tree | c035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3 /dht/src | |
parent | 5ea2de4e858cc89282561922bae257b6f9041d2e (diff) |
Async search.
Diffstat (limited to 'dht/src')
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 76 | ||||
-rw-r--r-- | dht/src/Network/Tox.hs | 2 | ||||
-rw-r--r-- | dht/src/Network/Tox/DHT/Handlers.hs | 58 | ||||
-rw-r--r-- | dht/src/Network/Tox/Onion/Handlers.hs | 49 | ||||
-rw-r--r-- | dht/src/Network/Tox/Onion/Routes.hs | 83 | ||||
-rw-r--r-- | dht/src/Network/Tox/TCP.hs | 49 |
6 files changed, 243 insertions, 74 deletions
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index 8532b492..d3904c40 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -512,8 +512,8 @@ data Routing = Routing | |||
512 | { tentativeId :: NodeInfo | 512 | { tentativeId :: NodeInfo |
513 | , committee4 :: TriadCommittee NodeId SockAddr | 513 | , committee4 :: TriadCommittee NodeId SockAddr |
514 | , committee6 :: TriadCommittee NodeId SockAddr | 514 | , committee6 :: TriadCommittee NodeId SockAddr |
515 | , refresher4 :: BucketRefresher NodeId NodeInfo | 515 | , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId |
516 | , refresher6 :: BucketRefresher NodeId NodeInfo | 516 | , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId |
517 | } | 517 | } |
518 | 518 | ||
519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | 519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) |
@@ -569,7 +569,6 @@ newClient swarms addr = do | |||
569 | -- We defer initializing the refreshSearch and refreshPing until we | 569 | -- We defer initializing the refreshSearch and refreshPing until we |
570 | -- have a client to send queries with. | 570 | -- have a client to send queries with. |
571 | let nullPing = const $ return False | 571 | let nullPing = const $ return False |
572 | nullSearch = mainlineSearch $ \_ _ -> return Canceled | ||
573 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount | 572 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount |
574 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | 573 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing |
575 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | 574 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount |
@@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
730 | 729 | ||
731 | mainlineKademlia :: MainlineClient | 730 | mainlineKademlia :: MainlineClient |
732 | -> TriadCommittee NodeId SockAddr | 731 | -> TriadCommittee NodeId SockAddr |
733 | -> BucketRefresher NodeId NodeInfo | 732 | -> BucketRefresher NodeId NodeInfo TransactionId |
734 | -> Kademlia NodeId NodeInfo | 733 | -> Kademlia NodeId NodeInfo |
735 | mainlineKademlia client committee refresher | 734 | mainlineKademlia client committee refresher |
736 | = Kademlia quietInsertions | 735 | = Kademlia quietInsertions |
@@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do | |||
1037 | isReadonlyClient :: MainlineClient -> Bool | 1036 | isReadonlyClient :: MainlineClient -> Bool |
1038 | isReadonlyClient client = False -- TODO | 1037 | isReadonlyClient client = False -- TODO |
1039 | 1038 | ||
1039 | mainlineAsync :: ( BEncode xqry | ||
1040 | , BEncode xrsp | ||
1041 | ) => Method | ||
1042 | -> (xrsp -> rsp) | ||
1043 | -> (qry -> xqry) | ||
1044 | -> MainlineClient | ||
1045 | -> qry | ||
1046 | -> NodeInfo | ||
1047 | -> (TransactionId -> QR.Result rsp -> IO ()) | ||
1048 | -> IO TransactionId | ||
1049 | mainlineAsync meth unwrap msg client nid addr withResult = do | ||
1050 | asyncQuery client serializer (msg nid) addr $ \qid reply -> do | ||
1051 | withResult qid $ case reply of | ||
1052 | Success (Right x) -> Success x | ||
1053 | Success (Left e) -> Canceled -- TODO: Do something with parse errors. | ||
1054 | Canceled -> Canceled | ||
1055 | TimedOut -> TimedOut | ||
1056 | where | ||
1057 | serializer = MethodSerializer | ||
1058 | { methodTimeout = \ni -> return (ni, 5000000) | ||
1059 | , method = meth | ||
1060 | , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) | ||
1061 | , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) | ||
1062 | (Right . unwrap) | ||
1063 | . BE.fromBEncode) | ||
1064 | . rspPayload | ||
1065 | } | ||
1066 | |||
1067 | |||
1040 | mainlineSend :: ( BEncode xqry | 1068 | mainlineSend :: ( BEncode xqry |
1041 | , BEncode xrsp | 1069 | , BEncode xrsp |
1042 | ) => Method | 1070 | ) => Method |
@@ -1073,30 +1101,54 @@ ping client addr = | |||
1073 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) | 1101 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) |
1074 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | 1102 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) |
1075 | 1103 | ||
1104 | asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
1105 | -> IO TransactionId | ||
1106 | asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | ||
1107 | |||
1076 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) | 1108 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) |
1077 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) | 1109 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) |
1078 | 1110 | ||
1079 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) | 1111 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) |
1080 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | 1112 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce |
1081 | 1113 | ||
1114 | asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ()) | ||
1115 | -> IO TransactionId | ||
1116 | asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | ||
1117 | |||
1082 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) | 1118 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) |
1083 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) | 1119 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) |
1084 | 1120 | ||
1085 | mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) | 1121 | nullTransactionId :: TransactionId |
1086 | -> Search NodeId (IP, PortNumber) tok NodeInfo r | 1122 | nullTransactionId = TransactionId B.empty |
1087 | mainlineSearch qry = Search | 1123 | |
1124 | nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1125 | nullSearch = Search | ||
1126 | { searchSpace = mainlineSpace | ||
1127 | , searchNodeAddress = nodeIP &&& nodePort | ||
1128 | , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId | ||
1129 | , searchQueryCancel = \_ _ -> return () | ||
1130 | , searchAlpha = 8 | ||
1131 | , searchK = 16 | ||
1132 | } | ||
1133 | |||
1134 | mainlineSearch :: MainlineClient | ||
1135 | -> (MainlineClient -> NodeId -> NodeInfo | ||
1136 | -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId) | ||
1137 | -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1138 | mainlineSearch client qry = Search | ||
1088 | { searchSpace = mainlineSpace | 1139 | { searchSpace = mainlineSpace |
1089 | , searchNodeAddress = nodeIP &&& nodePort | 1140 | , searchNodeAddress = nodeIP &&& nodePort |
1090 | , searchQuery = qry | 1141 | , searchQuery = qry client |
1142 | , searchQueryCancel = cancelQuery client | ||
1091 | , searchAlpha = 8 | 1143 | , searchAlpha = 8 |
1092 | , searchK = 16 | 1144 | , searchK = 16 |
1093 | } | 1145 | } |
1094 | 1146 | ||
1095 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo | 1147 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId |
1096 | nodeSearch client = mainlineSearch (getNodes client) | 1148 | nodeSearch client = mainlineSearch client asyncGetNodes |
1097 | 1149 | ||
1098 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr | 1150 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId |
1099 | peerSearch client = mainlineSearch (getPeers client) | 1151 | peerSearch client = mainlineSearch client asyncGetPeers |
1100 | 1152 | ||
1101 | -- | List of bootstrap nodes maintained by different bittorrent | 1153 | -- | List of bootstrap nodes maintained by different bittorrent |
1102 | -- software authors. | 1154 | -- software authors. |
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index f9f35ea4..4aed1c43 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs | |||
@@ -480,6 +480,6 @@ announceToLan sock nid = do | |||
480 | saferSendTo sock bs broadcast | 480 | saferSendTo sock bs broadcast |
481 | 481 | ||
482 | 482 | ||
483 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous | 483 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous DHT.TransactionId |
484 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) | 484 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) |
485 | 485 | ||
diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs index d132da88..f4563a3b 100644 --- a/dht/src/Network/Tox/DHT/Handlers.hs +++ b/dht/src/Network/Tox/DHT/Handlers.hs | |||
@@ -133,8 +133,8 @@ data Routing = Routing | |||
133 | { tentativeId :: NodeInfo | 133 | { tentativeId :: NodeInfo |
134 | , committee4 :: TriadCommittee NodeId SockAddr | 134 | , committee4 :: TriadCommittee NodeId SockAddr |
135 | , committee6 :: TriadCommittee NodeId SockAddr | 135 | , committee6 :: TriadCommittee NodeId SockAddr |
136 | , refresher4 :: BucketRefresher NodeId NodeInfo | 136 | , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId |
137 | , refresher6 :: BucketRefresher NodeId NodeInfo | 137 | , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId |
138 | , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) | 138 | , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) |
139 | } | 139 | } |
140 | 140 | ||
@@ -172,6 +172,20 @@ routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBu | |||
172 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | 172 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) |
173 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | 173 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets |
174 | 174 | ||
175 | nullTransactionId :: TransactionId | ||
176 | nullTransactionId = TransactionId (Nonce8 0) (Nonce24 zeros24) | ||
177 | |||
178 | nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
179 | nullSearch = Search | ||
180 | { searchSpace = toxSpace | ||
181 | , searchNodeAddress = nodeIP &&& nodePort | ||
182 | , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId | ||
183 | , searchQueryCancel = \_ _ -> return () | ||
184 | , searchAlpha = 1 | ||
185 | , searchK = 2 | ||
186 | } | ||
187 | |||
188 | |||
175 | newRouting :: SockAddr -> TransportCrypto | 189 | newRouting :: SockAddr -> TransportCrypto |
176 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change | 190 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change |
177 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change | 191 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change |
@@ -195,13 +209,6 @@ newRouting addr crypto update4 update6 = do | |||
195 | -- We defer initializing the refreshSearch and refreshPing until we | 209 | -- We defer initializing the refreshSearch and refreshPing until we |
196 | -- have a client to send queries with. | 210 | -- have a client to send queries with. |
197 | let nullPing = const $ return False | 211 | let nullPing = const $ return False |
198 | nullSearch = Search | ||
199 | { searchSpace = toxSpace | ||
200 | , searchNodeAddress = nodeIP &&& nodePort | ||
201 | , searchQuery = \_ _ -> return Canceled | ||
202 | , searchAlpha = 1 | ||
203 | , searchK = 2 | ||
204 | } | ||
205 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount | 212 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount |
206 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | 213 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount |
207 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | 214 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing |
@@ -432,6 +439,30 @@ getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> N | |||
432 | -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) | 439 | -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) |
433 | getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) | 440 | getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) |
434 | 441 | ||
442 | asyncGetNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo | ||
443 | -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
444 | -> IO TransactionId | ||
445 | asyncGetNodes client cbvar nid addr withResult = do | ||
446 | QR.asyncQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr $ | ||
447 | \qid reply -> do | ||
448 | forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) -> | ||
449 | forM_ ns $ \n -> do | ||
450 | now <- getPOSIXTime | ||
451 | atomically $ do | ||
452 | mcbs <- HashMap.lookup (nodeId . udpNodeInfo $ n) <$> readTVar cbvar | ||
453 | forM_ mcbs $ \cbs -> do | ||
454 | forM_ cbs $ \cb -> do | ||
455 | rumoredAddress cb now addr (udpNodeInfo n) | ||
456 | withResult qid $ case reply of | ||
457 | Success x -> maybe Canceled (Success . unwrapNodes) x | ||
458 | _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply | ||
459 | |||
460 | asyncGetNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo | ||
461 | -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
462 | -> IO TransactionId | ||
463 | asyncGetNodesUDP client cbvar nid addr go = asyncGetNodes client cbvar nid (Multi.UDP ==> addr) go | ||
464 | |||
465 | |||
435 | updateRouting :: Client -> Routing | 466 | updateRouting :: Client -> Routing |
436 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | 467 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) |
437 | -> Multi.NodeInfo | 468 | -> Multi.NodeInfo |
@@ -462,7 +493,7 @@ updateTable client routing orouter naddr = do | |||
462 | Want_Both -> do dput XMisc "BUG:unreachable" | 493 | Want_Both -> do dput XMisc "BUG:unreachable" |
463 | error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ | 494 | error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ |
464 | where | 495 | where |
465 | go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () | 496 | go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> IO () |
466 | go committee refresher = do | 497 | go committee refresher = do |
467 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) | 498 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) |
468 | -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) | 499 | -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) |
@@ -473,7 +504,7 @@ updateTable client routing orouter naddr = do | |||
473 | toxKademlia :: Client | 504 | toxKademlia :: Client |
474 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | 505 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) |
475 | -> TriadCommittee NodeId SockAddr | 506 | -> TriadCommittee NodeId SockAddr |
476 | -> BucketRefresher NodeId NodeInfo | 507 | -> BucketRefresher NodeId NodeInfo TransactionId |
477 | -> Kademlia NodeId NodeInfo | 508 | -> Kademlia NodeId NodeInfo |
478 | toxKademlia client orouter committee refresher | 509 | toxKademlia client orouter committee refresher |
479 | = Kademlia quietInsertions | 510 | = Kademlia quietInsertions |
@@ -541,11 +572,12 @@ handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieReques | |||
541 | handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH | 572 | handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH |
542 | handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ | 573 | handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ |
543 | 574 | ||
544 | nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo | 575 | nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo TransactionId |
545 | nodeSearch client cbvar = Search | 576 | nodeSearch client cbvar = Search |
546 | { searchSpace = toxSpace | 577 | { searchSpace = toxSpace |
547 | , searchNodeAddress = nodeIP &&& nodePort | 578 | , searchNodeAddress = nodeIP &&& nodePort |
548 | , searchQuery = getNodesUDP client cbvar | 579 | , searchQuery = asyncGetNodesUDP client cbvar |
580 | , searchQueryCancel = cancelQuery client | ||
549 | , searchAlpha = 8 | 581 | , searchAlpha = 8 |
550 | , searchK = 16 | 582 | , searchK = 16 |
551 | } | 583 | } |
diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs index 015c758c..45795312 100644 --- a/dht/src/Network/Tox/Onion/Handlers.hs +++ b/dht/src/Network/Tox/Onion/Handlers.hs | |||
@@ -218,13 +218,14 @@ handlers net _ _ keydb _ = Just $ NoReply Right $ dataToRouteH keydb net | |||
218 | toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) | 218 | toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) |
219 | -> TransportCrypto | 219 | -> TransportCrypto |
220 | -> Client r | 220 | -> Client r |
221 | -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous | 221 | -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous TransactionId |
222 | toxidSearch getTimeout crypto client = Search | 222 | toxidSearch getTimeout crypto client = Search |
223 | { searchSpace = toxSpace | 223 | { searchSpace = toxSpace |
224 | , searchNodeAddress = nodeIP &&& nodePort | 224 | , searchNodeAddress = nodeIP &&& nodePort |
225 | , searchQuery = getRendezvous getTimeout crypto client | 225 | , searchQuery = asyncGetRendezvous getTimeout crypto client |
226 | , searchAlpha = 3 | 226 | , searchQueryCancel = cancelQuery client |
227 | , searchK = 6 | 227 | , searchAlpha = 3 |
228 | , searchK = 6 | ||
228 | } | 229 | } |
229 | 230 | ||
230 | announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) | 231 | announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) |
@@ -289,6 +290,25 @@ sendOnion getTimeout client req oaddr unwrap = | |||
289 | Canceled -> return Canceled | 290 | Canceled -> return Canceled |
290 | TimedOut -> re | 291 | TimedOut -> re |
291 | 292 | ||
293 | asyncOnion :: (OnionDestination r -> STM (OnionDestination r, Int)) | ||
294 | -> Client r | ||
295 | -> AnnounceRequest | ||
296 | -> OnionDestination r | ||
297 | -> (NodeInfo -> AnnounceResponse -> t) | ||
298 | -> (TransactionId -> QR.Result t -> IO ()) | ||
299 | -> IO TransactionId | ||
300 | asyncOnion getTimeout client req oaddr unwrap withResult = do | ||
301 | -- TODO: Restore "Four tries and then we tap out" behavior. | ||
302 | qid <- QR.asyncQuery client (announceSerializer getTimeout) req oaddr $ \k mb -> do | ||
303 | forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " async sent response: " ++ show r | ||
304 | withResult k $ case mb of | ||
305 | Success x -> maybe (TimedOut) | ||
306 | (Success . unwrap (onionNodeInfo oaddr)) | ||
307 | (x :: Maybe AnnounceResponse) | ||
308 | Canceled -> Canceled | ||
309 | TimedOut -> TimedOut | ||
310 | return qid | ||
311 | |||
292 | 312 | ||
293 | -- | Lookup the secret counterpart for a given alias key. | 313 | -- | Lookup the secret counterpart for a given alias key. |
294 | getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) | 314 | getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) |
@@ -308,6 +328,27 @@ getRendezvous getTimeout crypto client nid ni = do | |||
308 | oaddr | 328 | oaddr |
309 | (unwrapAnnounceResponse rkey) | 329 | (unwrapAnnounceResponse rkey) |
310 | 330 | ||
331 | asyncGetRendezvous :: | ||
332 | (OnionDestination r -> STM (OnionDestination r, Int)) | ||
333 | -> TransportCrypto | ||
334 | -> Client r | ||
335 | -> NodeId | ||
336 | -> NodeInfo | ||
337 | -> (TransactionId -> Result ([NodeInfo],[Rendezvous],Maybe Nonce32) -> IO ()) | ||
338 | -> IO TransactionId | ||
339 | asyncGetRendezvous getTimeout crypto client nid ni withResult = do | ||
340 | asel <- atomically $ selectAlias crypto nid | ||
341 | let oaddr = OnionDestination asel ni Nothing | ||
342 | rkey = case asel of | ||
343 | SearchingAlias -> Nothing | ||
344 | _ -> Just $ key2id $ rendezvousPublic crypto | ||
345 | asyncOnion getTimeout client | ||
346 | (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey) | ||
347 | oaddr | ||
348 | (unwrapAnnounceResponse rkey) | ||
349 | withResult | ||
350 | |||
351 | |||
311 | putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) | 352 | putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) |
312 | -> TransportCrypto | 353 | -> TransportCrypto |
313 | -> Client r | 354 | -> Client r |
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index 9ce4e316..2f13a513 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs | |||
@@ -88,7 +88,7 @@ data OnionRouter = OnionRouter | |||
88 | , tcpProber :: TCP.TCPProber | 88 | , tcpProber :: TCP.TCPProber |
89 | , tcpProberThread :: ThreadId | 89 | , tcpProberThread :: ThreadId |
90 | -- | Kademlia table of TCP relays. | 90 | -- | Kademlia table of TCP relays. |
91 | , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo | 91 | , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo Nonce8 |
92 | , tcpRelayPinger :: RelayPinger | 92 | , tcpRelayPinger :: RelayPinger |
93 | -- | Debug prints are written to this channel which is then flushed to | 93 | -- | Debug prints are written to this channel which is then flushed to |
94 | -- 'routeLogger'. | 94 | -- 'routeLogger'. |
@@ -601,44 +601,49 @@ hookQueries or t8 tmethods = TransactionMethods | |||
601 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) | 601 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) |
602 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] | 602 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] |
603 | return (tid,d') | 603 | return (tid,d') |
604 | , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) | 604 | , dispatchResponse = \tid rx d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) |
605 | let Nonce8 w8 = t8 tid | 605 | case rx of |
606 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) | 606 | Success x -> do |
607 | modifyTVar' (pendingQueries or) (W64.delete w8) | 607 | let Nonce8 w8 = t8 tid |
608 | forM_ mb $ \pq -> do | 608 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) |
609 | let od = pendingDestination pq | 609 | modifyTVar' (pendingQueries or) (W64.delete w8) |
610 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) | 610 | forM_ mb $ \pq -> do |
611 | $ onionRouteSpec od | 611 | let od = pendingDestination pq |
612 | modifyArray (routeMap or) (fmap gotResponse) rid | 612 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) |
613 | writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) | 613 | $ onionRouteSpec od |
614 | dispatchResponse tmethods tid x d | 614 | modifyArray (routeMap or) (fmap gotResponse) rid |
615 | , dispatchCancel = \tid d -> {-# SCC "hookQ.dispatchCancel" #-} do -- :: tid -> d -> STM d | 615 | writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) |
616 | let Nonce8 w8 = t8 tid | 616 | dispatchResponse tmethods tid rx d |
617 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) | 617 | _ -> do -- Timed out or canceled... |
618 | modifyTVar' (pendingQueries or) (W64.delete w8) | 618 | let Nonce8 w8 = t8 tid |
619 | forM_ mb $ \pq -> do | 619 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) |
620 | let od = pendingDestination pq | 620 | modifyTVar' (pendingQueries or) (W64.delete w8) |
621 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) | 621 | forM_ mb $ \pq -> do |
622 | $ onionRouteSpec od | 622 | let od = pendingDestination pq |
623 | mrr <- readArray (routeMap or) rid | 623 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) |
624 | forM_ mrr $ \rr -> do | 624 | $ onionRouteSpec od |
625 | when (routeVersion rr == pendingVersion pq) $ do | 625 | mrr <- readArray (routeMap or) rid |
626 | let expireRoute = modifyArray (pendingRoutes or) expire rid | 626 | forM_ mrr $ \rr -> do |
627 | expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) | 627 | when (routeVersion rr == pendingVersion pq) $ do |
628 | | otherwise = ver | 628 | let expireRoute = modifyArray (pendingRoutes or) expire rid |
629 | modifyArray (routeMap or) (fmap gotTimeout) rid | 629 | expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) |
630 | case rr of | 630 | | otherwise = ver |
631 | RouteRecord{ responseCount = 0 | 631 | case rx of |
632 | , timeoutCount = c | 632 | TimedOut -> do |
633 | , routeVersion = v } | c >= 5 -> expireRoute | 633 | modifyArray (routeMap or) (fmap gotTimeout) rid |
634 | RouteRecord{ responseCount = 1 | 634 | case rr of |
635 | , timeoutCount = c | 635 | RouteRecord{ responseCount = 0 |
636 | , routeVersion = v } | c >= 10 -> expireRoute | 636 | , timeoutCount = c |
637 | RouteRecord{ timeoutCount = c | 637 | , routeVersion = v } | c >= 5 -> expireRoute |
638 | , routeVersion = v } | c >= 20 -> expireRoute | 638 | RouteRecord{ responseCount = 1 |
639 | _ -> return () | 639 | , timeoutCount = c |
640 | writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) | 640 | , routeVersion = v } | c >= 10 -> expireRoute |
641 | dispatchCancel tmethods tid d | 641 | RouteRecord{ timeoutCount = c |
642 | , routeVersion = v } | c >= 20 -> expireRoute | ||
643 | _ -> return () | ||
644 | _ -> return () -- Don't penalize route for canceled queries. | ||
645 | writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) | ||
646 | dispatchResponse tmethods tid rx d | ||
642 | } | 647 | } |
643 | 648 | ||
644 | 649 | ||
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 932b4ab3..a37c0310 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs | |||
@@ -223,11 +223,53 @@ getTCPNodes tcp seeking dst = do | |||
223 | getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) | 223 | getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) |
224 | getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst | 224 | getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst |
225 | 225 | ||
226 | |||
226 | getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) | 227 | getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) |
227 | getUDPNodes' tcp seeking dst0 = do | 228 | getUDPNodes' tcp seeking dst0 = do |
229 | goGetUDPNodes tcp seeking dst0 (return Canceled) $ \meth gateway dst -> do | ||
230 | r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway | ||
231 | forM r $ \response -> do | ||
232 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
233 | return ( (ns,ns, const () <$> mb), gateway ) | ||
234 | |||
235 | -- Failure case, currently not treated as special. | ||
236 | -- The current searchQuery type demands a valid Nonce8 is returned | ||
237 | -- even if we were unable to send a query. | ||
238 | fixmeNonce :: Nonce8 | ||
239 | fixmeNonce = Nonce8 0 | ||
240 | |||
241 | asyncUDPNodes :: TCPClient err Nonce8 | ||
242 | -> NodeId | ||
243 | -> UDP.NodeInfo | ||
244 | -> (Nonce8 | ||
245 | -> QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo) | ||
246 | -> IO ()) | ||
247 | -> IO Nonce8 | ||
248 | asyncUDPNodes tcp seeking dst0 withResult = | ||
249 | goGetUDPNodes tcp seeking dst0 (return fixmeNonce) $ \meth gateway dst -> do | ||
250 | asyncQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway $ | ||
251 | \qid response -> do | ||
252 | let wut response = | ||
253 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
254 | in ( (ns,ns, const () <$> mb), gateway ) | ||
255 | withResult qid $ fmap wut response | ||
256 | |||
257 | type Meth x = MethodSerializer | ||
258 | Nonce8 | ||
259 | x -- NodeInfo | ||
260 | (Bool, RelayPacket) | ||
261 | PacketNumber | ||
262 | AnnounceRequest | ||
263 | (Either String AnnounceResponse) | ||
264 | |||
265 | goGetUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo | ||
266 | -> IO a | ||
267 | -> (Meth x -> NodeInfo -> UDP.NodeInfo -> IO a) | ||
268 | -> IO a | ||
269 | goGetUDPNodes tcp seeking dst0 fail go = do | ||
228 | mgateway <- atomically $ tcpGetGateway tcp dst0 | 270 | mgateway <- atomically $ tcpGetGateway tcp dst0 |
229 | case mgateway of | 271 | case mgateway of |
230 | Nothing -> return Canceled | 272 | Nothing -> fail |
231 | Just gateway -> do | 273 | Just gateway -> do |
232 | (b,c,n24) <- atomically $ do | 274 | (b,c,n24) <- atomically $ do |
233 | b <- transportNewKey (tcpCrypto tcp) | 275 | b <- transportNewKey (tcpCrypto tcp) |
@@ -267,10 +309,7 @@ getUDPNodes' tcp seeking dst0 = do | |||
267 | -> decrypt (wrap0 n24') r >>= decodePlain | 309 | -> decrypt (wrap0 n24') r >>= decodePlain |
268 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x | 310 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x |
269 | } | 311 | } |
270 | r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway | 312 | go meth gateway dst |
271 | forM r $ \response -> do | ||
272 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
273 | return ( (ns,ns, const () <$> mb), gateway ) | ||
274 | 313 | ||
275 | 314 | ||
276 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) | 315 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) |