summaryrefslogtreecommitdiff
path: root/dht/src
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 21:27:50 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commitc7fb8cfe16f821e4e148d1855a18cb81255743bc (patch)
treec035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3 /dht/src
parent5ea2de4e858cc89282561922bae257b6f9041d2e (diff)
Async search.
Diffstat (limited to 'dht/src')
-rw-r--r--dht/src/Network/BitTorrent/MainlineDHT.hs76
-rw-r--r--dht/src/Network/Tox.hs2
-rw-r--r--dht/src/Network/Tox/DHT/Handlers.hs58
-rw-r--r--dht/src/Network/Tox/Onion/Handlers.hs49
-rw-r--r--dht/src/Network/Tox/Onion/Routes.hs83
-rw-r--r--dht/src/Network/Tox/TCP.hs49
6 files changed, 243 insertions, 74 deletions
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs
index 8532b492..d3904c40 100644
--- a/dht/src/Network/BitTorrent/MainlineDHT.hs
+++ b/dht/src/Network/BitTorrent/MainlineDHT.hs
@@ -512,8 +512,8 @@ data Routing = Routing
512 { tentativeId :: NodeInfo 512 { tentativeId :: NodeInfo
513 , committee4 :: TriadCommittee NodeId SockAddr 513 , committee4 :: TriadCommittee NodeId SockAddr
514 , committee6 :: TriadCommittee NodeId SockAddr 514 , committee6 :: TriadCommittee NodeId SockAddr
515 , refresher4 :: BucketRefresher NodeId NodeInfo 515 , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId
516 , refresher6 :: BucketRefresher NodeId NodeInfo 516 , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId
517 } 517 }
518 518
519sched4 :: Routing -> TVar (Int.PSQ POSIXTime) 519sched4 :: Routing -> TVar (Int.PSQ POSIXTime)
@@ -569,7 +569,6 @@ newClient swarms addr = do
569 -- We defer initializing the refreshSearch and refreshPing until we 569 -- We defer initializing the refreshSearch and refreshPing until we
570 -- have a client to send queries with. 570 -- have a client to send queries with.
571 let nullPing = const $ return False 571 let nullPing = const $ return False
572 nullSearch = mainlineSearch $ \_ _ -> return Canceled
573 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount 572 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount
574 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing 573 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
575 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount 574 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
@@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError
730 729
731mainlineKademlia :: MainlineClient 730mainlineKademlia :: MainlineClient
732 -> TriadCommittee NodeId SockAddr 731 -> TriadCommittee NodeId SockAddr
733 -> BucketRefresher NodeId NodeInfo 732 -> BucketRefresher NodeId NodeInfo TransactionId
734 -> Kademlia NodeId NodeInfo 733 -> Kademlia NodeId NodeInfo
735mainlineKademlia client committee refresher 734mainlineKademlia client committee refresher
736 = Kademlia quietInsertions 735 = Kademlia quietInsertions
@@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do
1037isReadonlyClient :: MainlineClient -> Bool 1036isReadonlyClient :: MainlineClient -> Bool
1038isReadonlyClient client = False -- TODO 1037isReadonlyClient client = False -- TODO
1039 1038
1039mainlineAsync :: ( BEncode xqry
1040 , BEncode xrsp
1041 ) => Method
1042 -> (xrsp -> rsp)
1043 -> (qry -> xqry)
1044 -> MainlineClient
1045 -> qry
1046 -> NodeInfo
1047 -> (TransactionId -> QR.Result rsp -> IO ())
1048 -> IO TransactionId
1049mainlineAsync meth unwrap msg client nid addr withResult = do
1050 asyncQuery client serializer (msg nid) addr $ \qid reply -> do
1051 withResult qid $ case reply of
1052 Success (Right x) -> Success x
1053 Success (Left e) -> Canceled -- TODO: Do something with parse errors.
1054 Canceled -> Canceled
1055 TimedOut -> TimedOut
1056 where
1057 serializer = MethodSerializer
1058 { methodTimeout = \ni -> return (ni, 5000000)
1059 , method = meth
1060 , wrapQuery = encodeQueryPayload meth (isReadonlyClient client)
1061 , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack)
1062 (Right . unwrap)
1063 . BE.fromBEncode)
1064 . rspPayload
1065 }
1066
1067
1040mainlineSend :: ( BEncode xqry 1068mainlineSend :: ( BEncode xqry
1041 , BEncode xrsp 1069 , BEncode xrsp
1042 ) => Method 1070 ) => Method
@@ -1073,30 +1101,54 @@ ping client addr =
1073getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) 1101getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ()))
1074getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) 1102getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1075 1103
1104asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
1105 -> IO TransactionId
1106asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both)
1107
1076unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) 1108unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ())
1077unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) 1109unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ())
1078 1110
1079getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) 1111getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token))
1080getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce 1112getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1081 1113
1114asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ())
1115 -> IO TransactionId
1116asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce
1117
1082unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) 1118unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token)
1083unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) 1119unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok)
1084 1120
1085mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) 1121nullTransactionId :: TransactionId
1086 -> Search NodeId (IP, PortNumber) tok NodeInfo r 1122nullTransactionId = TransactionId B.empty
1087mainlineSearch qry = Search 1123
1124nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
1125nullSearch = Search
1126 { searchSpace = mainlineSpace
1127 , searchNodeAddress = nodeIP &&& nodePort
1128 , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId
1129 , searchQueryCancel = \_ _ -> return ()
1130 , searchAlpha = 8
1131 , searchK = 16
1132 }
1133
1134mainlineSearch :: MainlineClient
1135 -> (MainlineClient -> NodeId -> NodeInfo
1136 -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId)
1137 -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
1138mainlineSearch client qry = Search
1088 { searchSpace = mainlineSpace 1139 { searchSpace = mainlineSpace
1089 , searchNodeAddress = nodeIP &&& nodePort 1140 , searchNodeAddress = nodeIP &&& nodePort
1090 , searchQuery = qry 1141 , searchQuery = qry client
1142 , searchQueryCancel = cancelQuery client
1091 , searchAlpha = 8 1143 , searchAlpha = 8
1092 , searchK = 16 1144 , searchK = 16
1093 } 1145 }
1094 1146
1095nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo 1147nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId
1096nodeSearch client = mainlineSearch (getNodes client) 1148nodeSearch client = mainlineSearch client asyncGetNodes
1097 1149
1098peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr 1150peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId
1099peerSearch client = mainlineSearch (getPeers client) 1151peerSearch client = mainlineSearch client asyncGetPeers
1100 1152
1101-- | List of bootstrap nodes maintained by different bittorrent 1153-- | List of bootstrap nodes maintained by different bittorrent
1102-- software authors. 1154-- software authors.
diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs
index f9f35ea4..4aed1c43 100644
--- a/dht/src/Network/Tox.hs
+++ b/dht/src/Network/Tox.hs
@@ -480,6 +480,6 @@ announceToLan sock nid = do
480 saferSendTo sock bs broadcast 480 saferSendTo sock bs broadcast
481 481
482 482
483toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous 483toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous DHT.TransactionId
484toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) 484toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox)
485 485
diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs
index d132da88..f4563a3b 100644
--- a/dht/src/Network/Tox/DHT/Handlers.hs
+++ b/dht/src/Network/Tox/DHT/Handlers.hs
@@ -133,8 +133,8 @@ data Routing = Routing
133 { tentativeId :: NodeInfo 133 { tentativeId :: NodeInfo
134 , committee4 :: TriadCommittee NodeId SockAddr 134 , committee4 :: TriadCommittee NodeId SockAddr
135 , committee6 :: TriadCommittee NodeId SockAddr 135 , committee6 :: TriadCommittee NodeId SockAddr
136 , refresher4 :: BucketRefresher NodeId NodeInfo 136 , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId
137 , refresher6 :: BucketRefresher NodeId NodeInfo 137 , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId
138 , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) 138 , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback])
139 } 139 }
140 140
@@ -172,6 +172,20 @@ routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBu
172routing6 :: Routing -> TVar (R.BucketList NodeInfo) 172routing6 :: Routing -> TVar (R.BucketList NodeInfo)
173routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets 173routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets
174 174
175nullTransactionId :: TransactionId
176nullTransactionId = TransactionId (Nonce8 0) (Nonce24 zeros24)
177
178nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId
179nullSearch = Search
180 { searchSpace = toxSpace
181 , searchNodeAddress = nodeIP &&& nodePort
182 , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId
183 , searchQueryCancel = \_ _ -> return ()
184 , searchAlpha = 1
185 , searchK = 2
186 }
187
188
175newRouting :: SockAddr -> TransportCrypto 189newRouting :: SockAddr -> TransportCrypto
176 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change 190 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change
177 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change 191 -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change
@@ -195,13 +209,6 @@ newRouting addr crypto update4 update6 = do
195 -- We defer initializing the refreshSearch and refreshPing until we 209 -- We defer initializing the refreshSearch and refreshPing until we
196 -- have a client to send queries with. 210 -- have a client to send queries with.
197 let nullPing = const $ return False 211 let nullPing = const $ return False
198 nullSearch = Search
199 { searchSpace = toxSpace
200 , searchNodeAddress = nodeIP &&& nodePort
201 , searchQuery = \_ _ -> return Canceled
202 , searchAlpha = 1
203 , searchK = 2
204 }
205 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount 212 tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount
206 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount 213 tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount
207 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing 214 refresher4 <- newBucketRefresher tbl4 nullSearch nullPing
@@ -432,6 +439,30 @@ getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> N
432 -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) 439 -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ()))
433getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) 440getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr)
434 441
442asyncGetNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo
443 -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
444 -> IO TransactionId
445asyncGetNodes client cbvar nid addr withResult = do
446 QR.asyncQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr $
447 \qid reply -> do
448 forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) ->
449 forM_ ns $ \n -> do
450 now <- getPOSIXTime
451 atomically $ do
452 mcbs <- HashMap.lookup (nodeId . udpNodeInfo $ n) <$> readTVar cbvar
453 forM_ mcbs $ \cbs -> do
454 forM_ cbs $ \cb -> do
455 rumoredAddress cb now addr (udpNodeInfo n)
456 withResult qid $ case reply of
457 Success x -> maybe Canceled (Success . unwrapNodes) x
458 _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply
459
460asyncGetNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo
461 -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ())
462 -> IO TransactionId
463asyncGetNodesUDP client cbvar nid addr go = asyncGetNodes client cbvar nid (Multi.UDP ==> addr) go
464
465
435updateRouting :: Client -> Routing 466updateRouting :: Client -> Routing
436 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) 467 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ())
437 -> Multi.NodeInfo 468 -> Multi.NodeInfo
@@ -462,7 +493,7 @@ updateTable client routing orouter naddr = do
462 Want_Both -> do dput XMisc "BUG:unreachable" 493 Want_Both -> do dput XMisc "BUG:unreachable"
463 error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ 494 error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__
464 where 495 where
465 go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () 496 go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> IO ()
466 go committee refresher = do 497 go committee refresher = do
467 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) 498 self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher)
468 -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) 499 -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr)
@@ -473,7 +504,7 @@ updateTable client routing orouter naddr = do
473toxKademlia :: Client 504toxKademlia :: Client
474 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) 505 -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ())
475 -> TriadCommittee NodeId SockAddr 506 -> TriadCommittee NodeId SockAddr
476 -> BucketRefresher NodeId NodeInfo 507 -> BucketRefresher NodeId NodeInfo TransactionId
477 -> Kademlia NodeId NodeInfo 508 -> Kademlia NodeId NodeInfo
478toxKademlia client orouter committee refresher 509toxKademlia client orouter committee refresher
479 = Kademlia quietInsertions 510 = Kademlia quietInsertions
@@ -541,11 +572,12 @@ handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieReques
541handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH 572handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH
542handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ 573handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ
543 574
544nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo 575nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo TransactionId
545nodeSearch client cbvar = Search 576nodeSearch client cbvar = Search
546 { searchSpace = toxSpace 577 { searchSpace = toxSpace
547 , searchNodeAddress = nodeIP &&& nodePort 578 , searchNodeAddress = nodeIP &&& nodePort
548 , searchQuery = getNodesUDP client cbvar 579 , searchQuery = asyncGetNodesUDP client cbvar
580 , searchQueryCancel = cancelQuery client
549 , searchAlpha = 8 581 , searchAlpha = 8
550 , searchK = 16 582 , searchK = 16
551 } 583 }
diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs
index 015c758c..45795312 100644
--- a/dht/src/Network/Tox/Onion/Handlers.hs
+++ b/dht/src/Network/Tox/Onion/Handlers.hs
@@ -218,13 +218,14 @@ handlers net _ _ keydb _ = Just $ NoReply Right $ dataToRouteH keydb net
218toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) 218toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int))
219 -> TransportCrypto 219 -> TransportCrypto
220 -> Client r 220 -> Client r
221 -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous 221 -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous TransactionId
222toxidSearch getTimeout crypto client = Search 222toxidSearch getTimeout crypto client = Search
223 { searchSpace = toxSpace 223 { searchSpace = toxSpace
224 , searchNodeAddress = nodeIP &&& nodePort 224 , searchNodeAddress = nodeIP &&& nodePort
225 , searchQuery = getRendezvous getTimeout crypto client 225 , searchQuery = asyncGetRendezvous getTimeout crypto client
226 , searchAlpha = 3 226 , searchQueryCancel = cancelQuery client
227 , searchK = 6 227 , searchAlpha = 3
228 , searchK = 6
228 } 229 }
229 230
230announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) 231announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int))
@@ -289,6 +290,25 @@ sendOnion getTimeout client req oaddr unwrap =
289 Canceled -> return Canceled 290 Canceled -> return Canceled
290 TimedOut -> re 291 TimedOut -> re
291 292
293asyncOnion :: (OnionDestination r -> STM (OnionDestination r, Int))
294 -> Client r
295 -> AnnounceRequest
296 -> OnionDestination r
297 -> (NodeInfo -> AnnounceResponse -> t)
298 -> (TransactionId -> QR.Result t -> IO ())
299 -> IO TransactionId
300asyncOnion getTimeout client req oaddr unwrap withResult = do
301 -- TODO: Restore "Four tries and then we tap out" behavior.
302 qid <- QR.asyncQuery client (announceSerializer getTimeout) req oaddr $ \k mb -> do
303 forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " async sent response: " ++ show r
304 withResult k $ case mb of
305 Success x -> maybe (TimedOut)
306 (Success . unwrap (onionNodeInfo oaddr))
307 (x :: Maybe AnnounceResponse)
308 Canceled -> Canceled
309 TimedOut -> TimedOut
310 return qid
311
292 312
293-- | Lookup the secret counterpart for a given alias key. 313-- | Lookup the secret counterpart for a given alias key.
294getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) 314getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int))
@@ -308,6 +328,27 @@ getRendezvous getTimeout crypto client nid ni = do
308 oaddr 328 oaddr
309 (unwrapAnnounceResponse rkey) 329 (unwrapAnnounceResponse rkey)
310 330
331asyncGetRendezvous ::
332 (OnionDestination r -> STM (OnionDestination r, Int))
333 -> TransportCrypto
334 -> Client r
335 -> NodeId
336 -> NodeInfo
337 -> (TransactionId -> Result ([NodeInfo],[Rendezvous],Maybe Nonce32) -> IO ())
338 -> IO TransactionId
339asyncGetRendezvous getTimeout crypto client nid ni withResult = do
340 asel <- atomically $ selectAlias crypto nid
341 let oaddr = OnionDestination asel ni Nothing
342 rkey = case asel of
343 SearchingAlias -> Nothing
344 _ -> Just $ key2id $ rendezvousPublic crypto
345 asyncOnion getTimeout client
346 (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey)
347 oaddr
348 (unwrapAnnounceResponse rkey)
349 withResult
350
351
311putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) 352putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int))
312 -> TransportCrypto 353 -> TransportCrypto
313 -> Client r 354 -> Client r
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs
index 9ce4e316..2f13a513 100644
--- a/dht/src/Network/Tox/Onion/Routes.hs
+++ b/dht/src/Network/Tox/Onion/Routes.hs
@@ -88,7 +88,7 @@ data OnionRouter = OnionRouter
88 , tcpProber :: TCP.TCPProber 88 , tcpProber :: TCP.TCPProber
89 , tcpProberThread :: ThreadId 89 , tcpProberThread :: ThreadId
90 -- | Kademlia table of TCP relays. 90 -- | Kademlia table of TCP relays.
91 , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo 91 , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo Nonce8
92 , tcpRelayPinger :: RelayPinger 92 , tcpRelayPinger :: RelayPinger
93 -- | Debug prints are written to this channel which is then flushed to 93 -- | Debug prints are written to this channel which is then flushed to
94 -- 'routeLogger'. 94 -- 'routeLogger'.
@@ -601,44 +601,49 @@ hookQueries or t8 tmethods = TransactionMethods
601 modifyTVar' (pendingQueries or) (W64.insert w8 pq) 601 modifyTVar' (pendingQueries or) (W64.insert w8 pq)
602 writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] 602 writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ]
603 return (tid,d') 603 return (tid,d')
604 , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) 604 , dispatchResponse = \tid rx d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ())
605 let Nonce8 w8 = t8 tid 605 case rx of
606 mb <- W64.lookup w8 <$> readTVar (pendingQueries or) 606 Success x -> do
607 modifyTVar' (pendingQueries or) (W64.delete w8) 607 let Nonce8 w8 = t8 tid
608 forM_ mb $ \pq -> do 608 mb <- W64.lookup w8 <$> readTVar (pendingQueries or)
609 let od = pendingDestination pq 609 modifyTVar' (pendingQueries or) (W64.delete w8)
610 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) 610 forM_ mb $ \pq -> do
611 $ onionRouteSpec od 611 let od = pendingDestination pq
612 modifyArray (routeMap or) (fmap gotResponse) rid 612 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od)))
613 writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) 613 $ onionRouteSpec od
614 dispatchResponse tmethods tid x d 614 modifyArray (routeMap or) (fmap gotResponse) rid
615 , dispatchCancel = \tid d -> {-# SCC "hookQ.dispatchCancel" #-} do -- :: tid -> d -> STM d 615 writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8)
616 let Nonce8 w8 = t8 tid 616 dispatchResponse tmethods tid rx d
617 mb <- W64.lookup w8 <$> readTVar (pendingQueries or) 617 _ -> do -- Timed out or canceled...
618 modifyTVar' (pendingQueries or) (W64.delete w8) 618 let Nonce8 w8 = t8 tid
619 forM_ mb $ \pq -> do 619 mb <- W64.lookup w8 <$> readTVar (pendingQueries or)
620 let od = pendingDestination pq 620 modifyTVar' (pendingQueries or) (W64.delete w8)
621 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) 621 forM_ mb $ \pq -> do
622 $ onionRouteSpec od 622 let od = pendingDestination pq
623 mrr <- readArray (routeMap or) rid 623 RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od)))
624 forM_ mrr $ \rr -> do 624 $ onionRouteSpec od
625 when (routeVersion rr == pendingVersion pq) $ do 625 mrr <- readArray (routeMap or) rid
626 let expireRoute = modifyArray (pendingRoutes or) expire rid 626 forM_ mrr $ \rr -> do
627 expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) 627 when (routeVersion rr == pendingVersion pq) $ do
628 | otherwise = ver 628 let expireRoute = modifyArray (pendingRoutes or) expire rid
629 modifyArray (routeMap or) (fmap gotTimeout) rid 629 expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq)
630 case rr of 630 | otherwise = ver
631 RouteRecord{ responseCount = 0 631 case rx of
632 , timeoutCount = c 632 TimedOut -> do
633 , routeVersion = v } | c >= 5 -> expireRoute 633 modifyArray (routeMap or) (fmap gotTimeout) rid
634 RouteRecord{ responseCount = 1 634 case rr of
635 , timeoutCount = c 635 RouteRecord{ responseCount = 0
636 , routeVersion = v } | c >= 10 -> expireRoute 636 , timeoutCount = c
637 RouteRecord{ timeoutCount = c 637 , routeVersion = v } | c >= 5 -> expireRoute
638 , routeVersion = v } | c >= 20 -> expireRoute 638 RouteRecord{ responseCount = 1
639 _ -> return () 639 , timeoutCount = c
640 writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) 640 , routeVersion = v } | c >= 10 -> expireRoute
641 dispatchCancel tmethods tid d 641 RouteRecord{ timeoutCount = c
642 , routeVersion = v } | c >= 20 -> expireRoute
643 _ -> return ()
644 _ -> return () -- Don't penalize route for canceled queries.
645 writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8)
646 dispatchResponse tmethods tid rx d
642 } 647 }
643 648
644 649
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs
index 932b4ab3..a37c0310 100644
--- a/dht/src/Network/Tox/TCP.hs
+++ b/dht/src/Network/Tox/TCP.hs
@@ -223,11 +223,53 @@ getTCPNodes tcp seeking dst = do
223getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) 223getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()))
224getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst 224getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst
225 225
226
226getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) 227getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo))
227getUDPNodes' tcp seeking dst0 = do 228getUDPNodes' tcp seeking dst0 = do
229 goGetUDPNodes tcp seeking dst0 (return Canceled) $ \meth gateway dst -> do
230 r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway
231 forM r $ \response -> do
232 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
233 return ( (ns,ns, const () <$> mb), gateway )
234
235-- Failure case, currently not treated as special.
236-- The current searchQuery type demands a valid Nonce8 is returned
237-- even if we were unable to send a query.
238fixmeNonce :: Nonce8
239fixmeNonce = Nonce8 0
240
241asyncUDPNodes :: TCPClient err Nonce8
242 -> NodeId
243 -> UDP.NodeInfo
244 -> (Nonce8
245 -> QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)
246 -> IO ())
247 -> IO Nonce8
248asyncUDPNodes tcp seeking dst0 withResult =
249 goGetUDPNodes tcp seeking dst0 (return fixmeNonce) $ \meth gateway dst -> do
250 asyncQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway $
251 \qid response -> do
252 let wut response =
253 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
254 in ( (ns,ns, const () <$> mb), gateway )
255 withResult qid $ fmap wut response
256
257type Meth x = MethodSerializer
258 Nonce8
259 x -- NodeInfo
260 (Bool, RelayPacket)
261 PacketNumber
262 AnnounceRequest
263 (Either String AnnounceResponse)
264
265goGetUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo
266 -> IO a
267 -> (Meth x -> NodeInfo -> UDP.NodeInfo -> IO a)
268 -> IO a
269goGetUDPNodes tcp seeking dst0 fail go = do
228 mgateway <- atomically $ tcpGetGateway tcp dst0 270 mgateway <- atomically $ tcpGetGateway tcp dst0
229 case mgateway of 271 case mgateway of
230 Nothing -> return Canceled 272 Nothing -> fail
231 Just gateway -> do 273 Just gateway -> do
232 (b,c,n24) <- atomically $ do 274 (b,c,n24) <- atomically $ do
233 b <- transportNewKey (tcpCrypto tcp) 275 b <- transportNewKey (tcpCrypto tcp)
@@ -267,10 +309,7 @@ getUDPNodes' tcp seeking dst0 = do
267 -> decrypt (wrap0 n24') r >>= decodePlain 309 -> decrypt (wrap0 n24') r >>= decodePlain
268 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x 310 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x
269 } 311 }
270 r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway 312 go meth gateway dst
271 forM r $ \response -> do
272 let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response
273 return ( (ns,ns, const () <$> mb), gateway )
274 313
275 314
276handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) 315handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x))