diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 21:27:50 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-07 13:24:59 -0500 |
commit | c7fb8cfe16f821e4e148d1855a18cb81255743bc (patch) | |
tree | c035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3 /dht | |
parent | 5ea2de4e858cc89282561922bae257b6f9041d2e (diff) |
Async search.
Diffstat (limited to 'dht')
-rw-r--r-- | dht/Announcer/Tox.hs | 28 | ||||
-rw-r--r-- | dht/TCPProber.hs | 28 | ||||
-rw-r--r-- | dht/examples/dhtd.hs | 16 | ||||
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 76 | ||||
-rw-r--r-- | dht/src/Network/Tox.hs | 2 | ||||
-rw-r--r-- | dht/src/Network/Tox/DHT/Handlers.hs | 58 | ||||
-rw-r--r-- | dht/src/Network/Tox/Onion/Handlers.hs | 49 | ||||
-rw-r--r-- | dht/src/Network/Tox/Onion/Routes.hs | 83 | ||||
-rw-r--r-- | dht/src/Network/Tox/TCP.hs | 49 |
9 files changed, 296 insertions, 93 deletions
diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs index e2459e0e..00eb219b 100644 --- a/dht/Announcer/Tox.hs +++ b/dht/Announcer/Tox.hs | |||
@@ -27,22 +27,23 @@ import Data.Time.Clock.POSIX | |||
27 | announceK :: Int | 27 | announceK :: Int |
28 | announceK = 8 | 28 | announceK = 8 |
29 | 29 | ||
30 | data AnnounceState = forall nid addr tok ni r. AnnounceState | 30 | data AnnounceState = forall nid addr tok ni r qk. AnnounceState |
31 | { aState :: SearchState nid addr tok ni r | 31 | { aState :: SearchState nid addr tok ni r qk |
32 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) | 32 | , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) |
33 | } | 33 | } |
34 | 34 | ||
35 | -- | This type specifies an item that can be announced on appropriate nodes in | 35 | -- | This type specifies an item that can be announced on appropriate nodes in |
36 | -- a Kademlia network. | 36 | -- a Kademlia network. |
37 | data AnnounceMethod r = forall nid ni sr addr tok a. | 37 | data AnnounceMethod r = forall nid ni sr addr tok a qk. |
38 | ( Show nid | 38 | ( Show nid |
39 | , Hashable nid | 39 | , Hashable nid |
40 | , Hashable ni | 40 | , Hashable ni |
41 | , Ord addr | 41 | , Ord addr |
42 | , Ord nid | 42 | , Ord nid |
43 | , Ord ni | 43 | , Ord ni |
44 | , Ord qk | ||
44 | ) => AnnounceMethod | 45 | ) => AnnounceMethod |
45 | { aSearch :: Search nid addr tok ni sr | 46 | { aSearch :: Search nid addr tok ni sr qk |
46 | -- ^ This is the Kademlia search to run repeatedly to find the | 47 | -- ^ This is the Kademlia search to run repeatedly to find the |
47 | -- nearby nodes. A new search is started whenever one is not | 48 | -- nearby nodes. A new search is started whenever one is not |
48 | -- already in progress at announce time. Repeated searches are | 49 | -- already in progress at announce time. Repeated searches are |
@@ -72,15 +73,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a. | |||
72 | } | 73 | } |
73 | 74 | ||
74 | -- | This type specifies a Kademlia search and an action to perform upon the result. | 75 | -- | This type specifies a Kademlia search and an action to perform upon the result. |
75 | data SearchMethod r = forall nid ni sr addr tok a. | 76 | data SearchMethod r = forall nid ni sr addr tok a qk. |
76 | ( Show nid | 77 | ( Show nid |
77 | , Hashable nid | 78 | , Hashable nid |
78 | , Hashable ni | 79 | , Hashable ni |
79 | , Ord addr | 80 | , Ord addr |
80 | , Ord nid | 81 | , Ord nid |
81 | , Ord ni | 82 | , Ord ni |
83 | , Ord qk | ||
82 | ) => SearchMethod | 84 | ) => SearchMethod |
83 | { sSearch :: Search nid addr tok ni sr | 85 | { sSearch :: Search nid addr tok ni sr qk |
84 | -- ^ This is the Kademlia search to run repeatedly to find the | 86 | -- ^ This is the Kademlia search to run repeatedly to find the |
85 | -- nearby nodes. A new search is started whenever one is not | 87 | -- nearby nodes. A new search is started whenever one is not |
86 | -- already in progress at announce time. Repeated searches are | 88 | -- already in progress at announce time. Repeated searches are |
@@ -155,8 +157,6 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar | |||
155 | publishToNodes is | 157 | publishToNodes is |
156 | onResult sr = return True | 158 | onResult sr = return True |
157 | searchAgain = do | 159 | searchAgain = do |
158 | -- Canceling a pending search here seems to make announcements more reliable. | ||
159 | searchCancel st | ||
160 | return $ void $ do | 160 | return $ void $ do |
161 | t <- fork search | 161 | t <- fork search |
162 | labelThread t ("scheduleAnnounce.sch." ++ show aTarget) | 162 | labelThread t ("scheduleAnnounce.sch." ++ show aTarget) |
@@ -164,7 +164,10 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar | |||
164 | got <- tryTakeMVar mutex | 164 | got <- tryTakeMVar mutex |
165 | case got of | 165 | case got of |
166 | Just () -> do | 166 | Just () -> do |
167 | atomically $ reset aNearestNodes aSearch aTarget st | 167 | me <- myThreadId |
168 | labelThread me "scheduleAnnounce.reset" | ||
169 | reset aNearestNodes aSearch aTarget st | ||
170 | labelThread me "scheduleAnnounce.searchLoop" | ||
168 | searchLoop aSearch aTarget onResult st | 171 | searchLoop aSearch aTarget onResult st |
169 | -- Announce to any nodes we haven't already announced to. | 172 | -- Announce to any nodes we haven't already announced to. |
170 | is <- atomically $ do | 173 | is <- atomically $ do |
@@ -202,8 +205,6 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge | |||
202 | return () | 205 | return () |
203 | return True -- True to keep searching. | 206 | return True -- True to keep searching. |
204 | searchAgain = do | 207 | searchAgain = do |
205 | -- Canceling a pending search here seems to make announcements more reliable. | ||
206 | searchCancel st | ||
207 | return $ void $ do | 208 | return $ void $ do |
208 | t <- fork search | 209 | t <- fork search |
209 | labelThread t ("scheduleSearch.sch." ++ show sTarget) | 210 | labelThread t ("scheduleSearch.sch." ++ show sTarget) |
@@ -211,7 +212,10 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge | |||
211 | got <- tryTakeMVar mutex | 212 | got <- tryTakeMVar mutex |
212 | case got of | 213 | case got of |
213 | Just () -> do | 214 | Just () -> do |
214 | atomically $ reset sNearestNodes sSearch sTarget st | 215 | me <- myThreadId |
216 | labelThread me "scheduleSearch.reset" | ||
217 | reset sNearestNodes sSearch sTarget st | ||
218 | labelThread me "scheduleSearch.searchLoop" | ||
215 | searchLoop sSearch sTarget onResult st | 219 | searchLoop sSearch sTarget onResult st |
216 | putMVar mutex () | 220 | putMVar mutex () |
217 | Nothing -> do | 221 | Nothing -> do |
diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs index 17b68f64..ccdbd8d1 100644 --- a/dht/TCPProber.hs +++ b/dht/TCPProber.hs | |||
@@ -176,11 +176,35 @@ getNodes prober tcp seeking dst = do | |||
176 | return $ Success ts | 176 | return $ Success ts |
177 | _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r | 177 | _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r |
178 | 178 | ||
179 | nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo | 179 | asyncGetNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo |
180 | -> (Nonce8 -> Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()) -> IO ()) | ||
181 | -> IO Nonce8 | ||
182 | asyncGetNodes prober tcp seeking dst withResponse = do | ||
183 | TCP.asyncUDPNodes tcp seeking (TCP.udpNodeInfo dst) $ \qid r -> do | ||
184 | dput XTCP $ "Got via TCP nodes: " ++ show r | ||
185 | let tcps (ns,_,mb) = (ns',ns',mb) | ||
186 | where ns' = do | ||
187 | n <- ns | ||
188 | [ TCP.NodeInfo n 0 ] | ||
189 | r' <- case r of | ||
190 | Success (ns,gw) -> do | ||
191 | let ts = tcps ns | ||
192 | if TCP.nodeId gw == TCP.nodeId dst | ||
193 | then return $ Success ts | ||
194 | else do | ||
195 | enqueueProbe prober (TCP.udpNodeInfo dst) | ||
196 | return $ Success ts | ||
197 | return $ Success ts | ||
198 | _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r | ||
199 | withResponse qid r' | ||
200 | |||
201 | |||
202 | nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo Nonce8 | ||
180 | nodeSearch prober tcp = Search | 203 | nodeSearch prober tcp = Search |
181 | { searchSpace = TCP.tcpSpace | 204 | { searchSpace = TCP.tcpSpace |
182 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort | 205 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort |
183 | , searchQuery = getNodes prober tcp | 206 | , searchQuery = asyncGetNodes prober tcp |
207 | , searchQueryCancel = cancelQuery (TCP.tcpClient tcp) | ||
184 | , searchAlpha = 8 | 208 | , searchAlpha = 8 |
185 | , searchK = 16 | 209 | , searchK = 16 |
186 | } | 210 | } |
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index 6b057af9..3078831d 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs | |||
@@ -811,8 +811,14 @@ clientSession s@Session{..} sock cnum h = do | |||
811 | where | 811 | where |
812 | go | null destination = fmap Right . qhandler self | 812 | go | null destination = fmap Right . qhandler self |
813 | | otherwise = case readEither destination of | 813 | | otherwise = case readEither destination of |
814 | Right ni -> fmap (maybe (Left "Timeout.") Right . resultToMaybe) | 814 | Right ni -> \nid -> do |
815 | . flip (searchQuery qsearch) ni -- TODO report canceled | 815 | v <- newEmptyMVar |
816 | _ <- searchQuery qsearch nid ni $ \_ r -> putMVar v r | ||
817 | r <- takeMVar v | ||
818 | return $ case r of | ||
819 | Success x -> Right x | ||
820 | Canceled -> Left "Canceled." | ||
821 | TimedOut -> Left "Timeout." | ||
816 | Left e -> const $ return $ Left ("Bad destination: "++e) | 822 | Left e -> const $ return $ Left ("Bad destination: "++e) |
817 | maybe (hPutClient h ("Unsupported method: "++method)) | 823 | maybe (hPutClient h ("Unsupported method: "++method)) |
818 | goQuery | 824 | goQuery |
@@ -938,14 +944,14 @@ clientSession s@Session{..} sock cnum h = do | |||
938 | , Typeable ptok | 944 | , Typeable ptok |
939 | , Typeable sni | 945 | , Typeable sni |
940 | , Typeable pni ) | 946 | , Typeable pni ) |
941 | => Search nid addr stok sni sr | 947 | => Search nid addr stok sni sr qk |
942 | -> (pr -> ptok -> Maybe pni -> IO (Maybe pubr)) | 948 | -> (pr -> ptok -> Maybe pni -> IO (Maybe pubr)) |
943 | -> Maybe (stok :~: ptok, sni :~: pni) | 949 | -> Maybe (stok :~: ptok, sni :~: pni) |
944 | matchingResult _ _ = liftA2 (,) eqT eqT | 950 | matchingResult _ _ = liftA2 (,) eqT eqT |
945 | matchingResult2 :: | 951 | matchingResult2 :: |
946 | ( Typeable sr | 952 | ( Typeable sr |
947 | , Typeable pr ) | 953 | , Typeable pr ) |
948 | => Search nid addr stok sni sr | 954 | => Search nid addr stok sni sr qk |
949 | -> (PublicKey -> pdta -> pr -> IO ()) | 955 | -> (PublicKey -> pdta -> pr -> IO ()) |
950 | -> (pdta -> nid) | 956 | -> (pdta -> nid) |
951 | -> Maybe (pr :~: sr) | 957 | -> Maybe (pr :~: sr) |
@@ -1913,7 +1919,7 @@ main = do | |||
1913 | btSaved <- loadNodes netname -- :: IO [Mainline.NodeInfo] | 1919 | btSaved <- loadNodes netname -- :: IO [Mainline.NodeInfo] |
1914 | putStrLn $ "Loaded "++show (length btSaved)++" nodes for "++netname++"." | 1920 | putStrLn $ "Loaded "++show (length btSaved)++" nodes for "++netname++"." |
1915 | fallbackNodes <- getBootstrapNodes | 1921 | fallbackNodes <- getBootstrapNodes |
1916 | let isNodesSearch :: ni :~: r -> Search nid addr tok ni r -> Search nid addr tok ni ni | 1922 | let isNodesSearch :: ni :~: r -> Search nid addr tok ni r qk -> Search nid addr tok ni ni qk |
1917 | isNodesSearch Refl sch = sch | 1923 | isNodesSearch Refl sch = sch |
1918 | ping = maybe (const $ return False) | 1924 | ping = maybe (const $ return False) |
1919 | (\DHTPing{pingQuery} -> fmap (maybe False (const True)) . pingQuery []) | 1925 | (\DHTPing{pingQuery} -> fmap (maybe False (const True)) . pingQuery []) |
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index 8532b492..d3904c40 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -512,8 +512,8 @@ data Routing = Routing | |||
512 | { tentativeId :: NodeInfo | 512 | { tentativeId :: NodeInfo |
513 | , committee4 :: TriadCommittee NodeId SockAddr | 513 | , committee4 :: TriadCommittee NodeId SockAddr |
514 | , committee6 :: TriadCommittee NodeId SockAddr | 514 | , committee6 :: TriadCommittee NodeId SockAddr |
515 | , refresher4 :: BucketRefresher NodeId NodeInfo | 515 | , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId |
516 | , refresher6 :: BucketRefresher NodeId NodeInfo | 516 | , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId |
517 | } | 517 | } |
518 | 518 | ||
519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | 519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) |
@@ -569,7 +569,6 @@ newClient swarms addr = do | |||
569 | -- We defer initializing the refreshSearch and refreshPing until we | 569 | -- We defer initializing the refreshSearch and refreshPing until we |
570 | -- have a client to send queries with. | 570 | -- have a client to send queries with. |
571 | let nullPing = const $ return False | 571 | let nullPing = const $ return False |
572 | nullSearch = mainlineSearch $ \_ _ -> return Canceled | ||
573 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount | 572 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount |
574 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | 573 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing |
575 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | 574 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount |
@@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
730 | 729 | ||
731 | mainlineKademlia :: MainlineClient | 730 | mainlineKademlia :: MainlineClient |
732 | -> TriadCommittee NodeId SockAddr | 731 | -> TriadCommittee NodeId SockAddr |
733 | -> BucketRefresher NodeId NodeInfo | 732 | -> BucketRefresher NodeId NodeInfo TransactionId |
734 | -> Kademlia NodeId NodeInfo | 733 | -> Kademlia NodeId NodeInfo |
735 | mainlineKademlia client committee refresher | 734 | mainlineKademlia client committee refresher |
736 | = Kademlia quietInsertions | 735 | = Kademlia quietInsertions |
@@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do | |||
1037 | isReadonlyClient :: MainlineClient -> Bool | 1036 | isReadonlyClient :: MainlineClient -> Bool |
1038 | isReadonlyClient client = False -- TODO | 1037 | isReadonlyClient client = False -- TODO |
1039 | 1038 | ||
1039 | mainlineAsync :: ( BEncode xqry | ||
1040 | , BEncode xrsp | ||
1041 | ) => Method | ||
1042 | -> (xrsp -> rsp) | ||
1043 | -> (qry -> xqry) | ||
1044 | -> MainlineClient | ||
1045 | -> qry | ||
1046 | -> NodeInfo | ||
1047 | -> (TransactionId -> QR.Result rsp -> IO ()) | ||
1048 | -> IO TransactionId | ||
1049 | mainlineAsync meth unwrap msg client nid addr withResult = do | ||
1050 | asyncQuery client serializer (msg nid) addr $ \qid reply -> do | ||
1051 | withResult qid $ case reply of | ||
1052 | Success (Right x) -> Success x | ||
1053 | Success (Left e) -> Canceled -- TODO: Do something with parse errors. | ||
1054 | Canceled -> Canceled | ||
1055 | TimedOut -> TimedOut | ||
1056 | where | ||
1057 | serializer = MethodSerializer | ||
1058 | { methodTimeout = \ni -> return (ni, 5000000) | ||
1059 | , method = meth | ||
1060 | , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) | ||
1061 | , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) | ||
1062 | (Right . unwrap) | ||
1063 | . BE.fromBEncode) | ||
1064 | . rspPayload | ||
1065 | } | ||
1066 | |||
1067 | |||
1040 | mainlineSend :: ( BEncode xqry | 1068 | mainlineSend :: ( BEncode xqry |
1041 | , BEncode xrsp | 1069 | , BEncode xrsp |
1042 | ) => Method | 1070 | ) => Method |
@@ -1073,30 +1101,54 @@ ping client addr = | |||
1073 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) | 1101 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) |
1074 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | 1102 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) |
1075 | 1103 | ||
1104 | asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
1105 | -> IO TransactionId | ||
1106 | asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | ||
1107 | |||
1076 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) | 1108 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) |
1077 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) | 1109 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) |
1078 | 1110 | ||
1079 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) | 1111 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) |
1080 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | 1112 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce |
1081 | 1113 | ||
1114 | asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ()) | ||
1115 | -> IO TransactionId | ||
1116 | asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | ||
1117 | |||
1082 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) | 1118 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) |
1083 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) | 1119 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) |
1084 | 1120 | ||
1085 | mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) | 1121 | nullTransactionId :: TransactionId |
1086 | -> Search NodeId (IP, PortNumber) tok NodeInfo r | 1122 | nullTransactionId = TransactionId B.empty |
1087 | mainlineSearch qry = Search | 1123 | |
1124 | nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1125 | nullSearch = Search | ||
1126 | { searchSpace = mainlineSpace | ||
1127 | , searchNodeAddress = nodeIP &&& nodePort | ||
1128 | , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId | ||
1129 | , searchQueryCancel = \_ _ -> return () | ||
1130 | , searchAlpha = 8 | ||
1131 | , searchK = 16 | ||
1132 | } | ||
1133 | |||
1134 | mainlineSearch :: MainlineClient | ||
1135 | -> (MainlineClient -> NodeId -> NodeInfo | ||
1136 | -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId) | ||
1137 | -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1138 | mainlineSearch client qry = Search | ||
1088 | { searchSpace = mainlineSpace | 1139 | { searchSpace = mainlineSpace |
1089 | , searchNodeAddress = nodeIP &&& nodePort | 1140 | , searchNodeAddress = nodeIP &&& nodePort |
1090 | , searchQuery = qry | 1141 | , searchQuery = qry client |
1142 | , searchQueryCancel = cancelQuery client | ||
1091 | , searchAlpha = 8 | 1143 | , searchAlpha = 8 |
1092 | , searchK = 16 | 1144 | , searchK = 16 |
1093 | } | 1145 | } |
1094 | 1146 | ||
1095 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo | 1147 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId |
1096 | nodeSearch client = mainlineSearch (getNodes client) | 1148 | nodeSearch client = mainlineSearch client asyncGetNodes |
1097 | 1149 | ||
1098 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr | 1150 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId |
1099 | peerSearch client = mainlineSearch (getPeers client) | 1151 | peerSearch client = mainlineSearch client asyncGetPeers |
1100 | 1152 | ||
1101 | -- | List of bootstrap nodes maintained by different bittorrent | 1153 | -- | List of bootstrap nodes maintained by different bittorrent |
1102 | -- software authors. | 1154 | -- software authors. |
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index f9f35ea4..4aed1c43 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs | |||
@@ -480,6 +480,6 @@ announceToLan sock nid = do | |||
480 | saferSendTo sock bs broadcast | 480 | saferSendTo sock bs broadcast |
481 | 481 | ||
482 | 482 | ||
483 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous | 483 | toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous DHT.TransactionId |
484 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) | 484 | toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) |
485 | 485 | ||
diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs index d132da88..f4563a3b 100644 --- a/dht/src/Network/Tox/DHT/Handlers.hs +++ b/dht/src/Network/Tox/DHT/Handlers.hs | |||
@@ -133,8 +133,8 @@ data Routing = Routing | |||
133 | { tentativeId :: NodeInfo | 133 | { tentativeId :: NodeInfo |
134 | , committee4 :: TriadCommittee NodeId SockAddr | 134 | , committee4 :: TriadCommittee NodeId SockAddr |
135 | , committee6 :: TriadCommittee NodeId SockAddr | 135 | , committee6 :: TriadCommittee NodeId SockAddr |
136 | , refresher4 :: BucketRefresher NodeId NodeInfo | 136 | , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId |
137 | , refresher6 :: BucketRefresher NodeId NodeInfo | 137 | , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId |
138 | , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) | 138 | , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) |
139 | } | 139 | } |
140 | 140 | ||
@@ -172,6 +172,20 @@ routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBu | |||
172 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | 172 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) |
173 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | 173 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets |
174 | 174 | ||
175 | nullTransactionId :: TransactionId | ||
176 | nullTransactionId = TransactionId (Nonce8 0) (Nonce24 zeros24) | ||
177 | |||
178 | nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
179 | nullSearch = Search | ||
180 | { searchSpace = toxSpace | ||
181 | , searchNodeAddress = nodeIP &&& nodePort | ||
182 | , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId | ||
183 | , searchQueryCancel = \_ _ -> return () | ||
184 | , searchAlpha = 1 | ||
185 | , searchK = 2 | ||
186 | } | ||
187 | |||
188 | |||
175 | newRouting :: SockAddr -> TransportCrypto | 189 | newRouting :: SockAddr -> TransportCrypto |
176 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change | 190 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change |
177 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change | 191 | -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change |
@@ -195,13 +209,6 @@ newRouting addr crypto update4 update6 = do | |||
195 | -- We defer initializing the refreshSearch and refreshPing until we | 209 | -- We defer initializing the refreshSearch and refreshPing until we |
196 | -- have a client to send queries with. | 210 | -- have a client to send queries with. |
197 | let nullPing = const $ return False | 211 | let nullPing = const $ return False |
198 | nullSearch = Search | ||
199 | { searchSpace = toxSpace | ||
200 | , searchNodeAddress = nodeIP &&& nodePort | ||
201 | , searchQuery = \_ _ -> return Canceled | ||
202 | , searchAlpha = 1 | ||
203 | , searchK = 2 | ||
204 | } | ||
205 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount | 212 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount |
206 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | 213 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount |
207 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | 214 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing |
@@ -432,6 +439,30 @@ getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> N | |||
432 | -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) | 439 | -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) |
433 | getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) | 440 | getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) |
434 | 441 | ||
442 | asyncGetNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo | ||
443 | -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
444 | -> IO TransactionId | ||
445 | asyncGetNodes client cbvar nid addr withResult = do | ||
446 | QR.asyncQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr $ | ||
447 | \qid reply -> do | ||
448 | forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) -> | ||
449 | forM_ ns $ \n -> do | ||
450 | now <- getPOSIXTime | ||
451 | atomically $ do | ||
452 | mcbs <- HashMap.lookup (nodeId . udpNodeInfo $ n) <$> readTVar cbvar | ||
453 | forM_ mcbs $ \cbs -> do | ||
454 | forM_ cbs $ \cb -> do | ||
455 | rumoredAddress cb now addr (udpNodeInfo n) | ||
456 | withResult qid $ case reply of | ||
457 | Success x -> maybe Canceled (Success . unwrapNodes) x | ||
458 | _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply | ||
459 | |||
460 | asyncGetNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo | ||
461 | -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
462 | -> IO TransactionId | ||
463 | asyncGetNodesUDP client cbvar nid addr go = asyncGetNodes client cbvar nid (Multi.UDP ==> addr) go | ||
464 | |||
465 | |||
435 | updateRouting :: Client -> Routing | 466 | updateRouting :: Client -> Routing |
436 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | 467 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) |
437 | -> Multi.NodeInfo | 468 | -> Multi.NodeInfo |
@@ -462,7 +493,7 @@ updateTable client routing orouter naddr = do | |||
462 | Want_Both -> do dput XMisc "BUG:unreachable" | 493 | Want_Both -> do dput XMisc "BUG:unreachable" |
463 | error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ | 494 | error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ |
464 | where | 495 | where |
465 | go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () | 496 | go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> IO () |
466 | go committee refresher = do | 497 | go committee refresher = do |
467 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) | 498 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) |
468 | -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) | 499 | -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) |
@@ -473,7 +504,7 @@ updateTable client routing orouter naddr = do | |||
473 | toxKademlia :: Client | 504 | toxKademlia :: Client |
474 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) | 505 | -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) |
475 | -> TriadCommittee NodeId SockAddr | 506 | -> TriadCommittee NodeId SockAddr |
476 | -> BucketRefresher NodeId NodeInfo | 507 | -> BucketRefresher NodeId NodeInfo TransactionId |
477 | -> Kademlia NodeId NodeInfo | 508 | -> Kademlia NodeId NodeInfo |
478 | toxKademlia client orouter committee refresher | 509 | toxKademlia client orouter committee refresher |
479 | = Kademlia quietInsertions | 510 | = Kademlia quietInsertions |
@@ -541,11 +572,12 @@ handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieReques | |||
541 | handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH | 572 | handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH |
542 | handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ | 573 | handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ |
543 | 574 | ||
544 | nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo | 575 | nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo TransactionId |
545 | nodeSearch client cbvar = Search | 576 | nodeSearch client cbvar = Search |
546 | { searchSpace = toxSpace | 577 | { searchSpace = toxSpace |
547 | , searchNodeAddress = nodeIP &&& nodePort | 578 | , searchNodeAddress = nodeIP &&& nodePort |
548 | , searchQuery = getNodesUDP client cbvar | 579 | , searchQuery = asyncGetNodesUDP client cbvar |
580 | , searchQueryCancel = cancelQuery client | ||
549 | , searchAlpha = 8 | 581 | , searchAlpha = 8 |
550 | , searchK = 16 | 582 | , searchK = 16 |
551 | } | 583 | } |
diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs index 015c758c..45795312 100644 --- a/dht/src/Network/Tox/Onion/Handlers.hs +++ b/dht/src/Network/Tox/Onion/Handlers.hs | |||
@@ -218,13 +218,14 @@ handlers net _ _ keydb _ = Just $ NoReply Right $ dataToRouteH keydb net | |||
218 | toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) | 218 | toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) |
219 | -> TransportCrypto | 219 | -> TransportCrypto |
220 | -> Client r | 220 | -> Client r |
221 | -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous | 221 | -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous TransactionId |
222 | toxidSearch getTimeout crypto client = Search | 222 | toxidSearch getTimeout crypto client = Search |
223 | { searchSpace = toxSpace | 223 | { searchSpace = toxSpace |
224 | , searchNodeAddress = nodeIP &&& nodePort | 224 | , searchNodeAddress = nodeIP &&& nodePort |
225 | , searchQuery = getRendezvous getTimeout crypto client | 225 | , searchQuery = asyncGetRendezvous getTimeout crypto client |
226 | , searchAlpha = 3 | 226 | , searchQueryCancel = cancelQuery client |
227 | , searchK = 6 | 227 | , searchAlpha = 3 |
228 | , searchK = 6 | ||
228 | } | 229 | } |
229 | 230 | ||
230 | announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) | 231 | announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) |
@@ -289,6 +290,25 @@ sendOnion getTimeout client req oaddr unwrap = | |||
289 | Canceled -> return Canceled | 290 | Canceled -> return Canceled |
290 | TimedOut -> re | 291 | TimedOut -> re |
291 | 292 | ||
293 | asyncOnion :: (OnionDestination r -> STM (OnionDestination r, Int)) | ||
294 | -> Client r | ||
295 | -> AnnounceRequest | ||
296 | -> OnionDestination r | ||
297 | -> (NodeInfo -> AnnounceResponse -> t) | ||
298 | -> (TransactionId -> QR.Result t -> IO ()) | ||
299 | -> IO TransactionId | ||
300 | asyncOnion getTimeout client req oaddr unwrap withResult = do | ||
301 | -- TODO: Restore "Four tries and then we tap out" behavior. | ||
302 | qid <- QR.asyncQuery client (announceSerializer getTimeout) req oaddr $ \k mb -> do | ||
303 | forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " async sent response: " ++ show r | ||
304 | withResult k $ case mb of | ||
305 | Success x -> maybe (TimedOut) | ||
306 | (Success . unwrap (onionNodeInfo oaddr)) | ||
307 | (x :: Maybe AnnounceResponse) | ||
308 | Canceled -> Canceled | ||
309 | TimedOut -> TimedOut | ||
310 | return qid | ||
311 | |||
292 | 312 | ||
293 | -- | Lookup the secret counterpart for a given alias key. | 313 | -- | Lookup the secret counterpart for a given alias key. |
294 | getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) | 314 | getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) |
@@ -308,6 +328,27 @@ getRendezvous getTimeout crypto client nid ni = do | |||
308 | oaddr | 328 | oaddr |
309 | (unwrapAnnounceResponse rkey) | 329 | (unwrapAnnounceResponse rkey) |
310 | 330 | ||
331 | asyncGetRendezvous :: | ||
332 | (OnionDestination r -> STM (OnionDestination r, Int)) | ||
333 | -> TransportCrypto | ||
334 | -> Client r | ||
335 | -> NodeId | ||
336 | -> NodeInfo | ||
337 | -> (TransactionId -> Result ([NodeInfo],[Rendezvous],Maybe Nonce32) -> IO ()) | ||
338 | -> IO TransactionId | ||
339 | asyncGetRendezvous getTimeout crypto client nid ni withResult = do | ||
340 | asel <- atomically $ selectAlias crypto nid | ||
341 | let oaddr = OnionDestination asel ni Nothing | ||
342 | rkey = case asel of | ||
343 | SearchingAlias -> Nothing | ||
344 | _ -> Just $ key2id $ rendezvousPublic crypto | ||
345 | asyncOnion getTimeout client | ||
346 | (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey) | ||
347 | oaddr | ||
348 | (unwrapAnnounceResponse rkey) | ||
349 | withResult | ||
350 | |||
351 | |||
311 | putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) | 352 | putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) |
312 | -> TransportCrypto | 353 | -> TransportCrypto |
313 | -> Client r | 354 | -> Client r |
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index 9ce4e316..2f13a513 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs | |||
@@ -88,7 +88,7 @@ data OnionRouter = OnionRouter | |||
88 | , tcpProber :: TCP.TCPProber | 88 | , tcpProber :: TCP.TCPProber |
89 | , tcpProberThread :: ThreadId | 89 | , tcpProberThread :: ThreadId |
90 | -- | Kademlia table of TCP relays. | 90 | -- | Kademlia table of TCP relays. |
91 | , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo | 91 | , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo Nonce8 |
92 | , tcpRelayPinger :: RelayPinger | 92 | , tcpRelayPinger :: RelayPinger |
93 | -- | Debug prints are written to this channel which is then flushed to | 93 | -- | Debug prints are written to this channel which is then flushed to |
94 | -- 'routeLogger'. | 94 | -- 'routeLogger'. |
@@ -601,44 +601,49 @@ hookQueries or t8 tmethods = TransactionMethods | |||
601 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) | 601 | modifyTVar' (pendingQueries or) (W64.insert w8 pq) |
602 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] | 602 | writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] |
603 | return (tid,d') | 603 | return (tid,d') |
604 | , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) | 604 | , dispatchResponse = \tid rx d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) |
605 | let Nonce8 w8 = t8 tid | 605 | case rx of |
606 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) | 606 | Success x -> do |
607 | modifyTVar' (pendingQueries or) (W64.delete w8) | 607 | let Nonce8 w8 = t8 tid |
608 | forM_ mb $ \pq -> do | 608 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) |
609 | let od = pendingDestination pq | 609 | modifyTVar' (pendingQueries or) (W64.delete w8) |
610 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) | 610 | forM_ mb $ \pq -> do |
611 | $ onionRouteSpec od | 611 | let od = pendingDestination pq |
612 | modifyArray (routeMap or) (fmap gotResponse) rid | 612 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) |
613 | writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) | 613 | $ onionRouteSpec od |
614 | dispatchResponse tmethods tid x d | 614 | modifyArray (routeMap or) (fmap gotResponse) rid |
615 | , dispatchCancel = \tid d -> {-# SCC "hookQ.dispatchCancel" #-} do -- :: tid -> d -> STM d | 615 | writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) |
616 | let Nonce8 w8 = t8 tid | 616 | dispatchResponse tmethods tid rx d |
617 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) | 617 | _ -> do -- Timed out or canceled... |
618 | modifyTVar' (pendingQueries or) (W64.delete w8) | 618 | let Nonce8 w8 = t8 tid |
619 | forM_ mb $ \pq -> do | 619 | mb <- W64.lookup w8 <$> readTVar (pendingQueries or) |
620 | let od = pendingDestination pq | 620 | modifyTVar' (pendingQueries or) (W64.delete w8) |
621 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) | 621 | forM_ mb $ \pq -> do |
622 | $ onionRouteSpec od | 622 | let od = pendingDestination pq |
623 | mrr <- readArray (routeMap or) rid | 623 | RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) |
624 | forM_ mrr $ \rr -> do | 624 | $ onionRouteSpec od |
625 | when (routeVersion rr == pendingVersion pq) $ do | 625 | mrr <- readArray (routeMap or) rid |
626 | let expireRoute = modifyArray (pendingRoutes or) expire rid | 626 | forM_ mrr $ \rr -> do |
627 | expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) | 627 | when (routeVersion rr == pendingVersion pq) $ do |
628 | | otherwise = ver | 628 | let expireRoute = modifyArray (pendingRoutes or) expire rid |
629 | modifyArray (routeMap or) (fmap gotTimeout) rid | 629 | expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) |
630 | case rr of | 630 | | otherwise = ver |
631 | RouteRecord{ responseCount = 0 | 631 | case rx of |
632 | , timeoutCount = c | 632 | TimedOut -> do |
633 | , routeVersion = v } | c >= 5 -> expireRoute | 633 | modifyArray (routeMap or) (fmap gotTimeout) rid |
634 | RouteRecord{ responseCount = 1 | 634 | case rr of |
635 | , timeoutCount = c | 635 | RouteRecord{ responseCount = 0 |
636 | , routeVersion = v } | c >= 10 -> expireRoute | 636 | , timeoutCount = c |
637 | RouteRecord{ timeoutCount = c | 637 | , routeVersion = v } | c >= 5 -> expireRoute |
638 | , routeVersion = v } | c >= 20 -> expireRoute | 638 | RouteRecord{ responseCount = 1 |
639 | _ -> return () | 639 | , timeoutCount = c |
640 | writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) | 640 | , routeVersion = v } | c >= 10 -> expireRoute |
641 | dispatchCancel tmethods tid d | 641 | RouteRecord{ timeoutCount = c |
642 | , routeVersion = v } | c >= 20 -> expireRoute | ||
643 | _ -> return () | ||
644 | _ -> return () -- Don't penalize route for canceled queries. | ||
645 | writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) | ||
646 | dispatchResponse tmethods tid rx d | ||
642 | } | 647 | } |
643 | 648 | ||
644 | 649 | ||
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 932b4ab3..a37c0310 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs | |||
@@ -223,11 +223,53 @@ getTCPNodes tcp seeking dst = do | |||
223 | getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) | 223 | getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) |
224 | getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst | 224 | getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst |
225 | 225 | ||
226 | |||
226 | getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) | 227 | getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) |
227 | getUDPNodes' tcp seeking dst0 = do | 228 | getUDPNodes' tcp seeking dst0 = do |
229 | goGetUDPNodes tcp seeking dst0 (return Canceled) $ \meth gateway dst -> do | ||
230 | r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway | ||
231 | forM r $ \response -> do | ||
232 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
233 | return ( (ns,ns, const () <$> mb), gateway ) | ||
234 | |||
235 | -- Failure case, currently not treated as special. | ||
236 | -- The current searchQuery type demands a valid Nonce8 is returned | ||
237 | -- even if we were unable to send a query. | ||
238 | fixmeNonce :: Nonce8 | ||
239 | fixmeNonce = Nonce8 0 | ||
240 | |||
241 | asyncUDPNodes :: TCPClient err Nonce8 | ||
242 | -> NodeId | ||
243 | -> UDP.NodeInfo | ||
244 | -> (Nonce8 | ||
245 | -> QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo) | ||
246 | -> IO ()) | ||
247 | -> IO Nonce8 | ||
248 | asyncUDPNodes tcp seeking dst0 withResult = | ||
249 | goGetUDPNodes tcp seeking dst0 (return fixmeNonce) $ \meth gateway dst -> do | ||
250 | asyncQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway $ | ||
251 | \qid response -> do | ||
252 | let wut response = | ||
253 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
254 | in ( (ns,ns, const () <$> mb), gateway ) | ||
255 | withResult qid $ fmap wut response | ||
256 | |||
257 | type Meth x = MethodSerializer | ||
258 | Nonce8 | ||
259 | x -- NodeInfo | ||
260 | (Bool, RelayPacket) | ||
261 | PacketNumber | ||
262 | AnnounceRequest | ||
263 | (Either String AnnounceResponse) | ||
264 | |||
265 | goGetUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo | ||
266 | -> IO a | ||
267 | -> (Meth x -> NodeInfo -> UDP.NodeInfo -> IO a) | ||
268 | -> IO a | ||
269 | goGetUDPNodes tcp seeking dst0 fail go = do | ||
228 | mgateway <- atomically $ tcpGetGateway tcp dst0 | 270 | mgateway <- atomically $ tcpGetGateway tcp dst0 |
229 | case mgateway of | 271 | case mgateway of |
230 | Nothing -> return Canceled | 272 | Nothing -> fail |
231 | Just gateway -> do | 273 | Just gateway -> do |
232 | (b,c,n24) <- atomically $ do | 274 | (b,c,n24) <- atomically $ do |
233 | b <- transportNewKey (tcpCrypto tcp) | 275 | b <- transportNewKey (tcpCrypto tcp) |
@@ -267,10 +309,7 @@ getUDPNodes' tcp seeking dst0 = do | |||
267 | -> decrypt (wrap0 n24') r >>= decodePlain | 309 | -> decrypt (wrap0 n24') r >>= decodePlain |
268 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x | 310 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x |
269 | } | 311 | } |
270 | r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway | 312 | go meth gateway dst |
271 | forM r $ \response -> do | ||
272 | let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response | ||
273 | return ( (ns,ns, const () <$> mb), gateway ) | ||
274 | 313 | ||
275 | 314 | ||
276 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) | 315 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) |