From c7fb8cfe16f821e4e148d1855a18cb81255743bc Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 21:27:50 -0500 Subject: Async search. --- dht/Announcer/Tox.hs | 28 ++++++----- dht/TCPProber.hs | 28 ++++++++++- dht/examples/dhtd.hs | 16 ++++-- dht/src/Network/BitTorrent/MainlineDHT.hs | 76 +++++++++++++++++++++++----- dht/src/Network/Tox.hs | 2 +- dht/src/Network/Tox/DHT/Handlers.hs | 58 ++++++++++++++++----- dht/src/Network/Tox/Onion/Handlers.hs | 49 ++++++++++++++++-- dht/src/Network/Tox/Onion/Routes.hs | 83 ++++++++++++++++--------------- dht/src/Network/Tox/TCP.hs | 49 ++++++++++++++++-- kad/src/Network/Kademlia/Search.hs | 35 +++++++------ kad/tests/searchCancel.hs | 2 +- server/src/Network/QueryResponse.hs | 10 ++-- 12 files changed, 320 insertions(+), 116 deletions(-) diff --git a/dht/Announcer/Tox.hs b/dht/Announcer/Tox.hs index e2459e0e..00eb219b 100644 --- a/dht/Announcer/Tox.hs +++ b/dht/Announcer/Tox.hs @@ -27,22 +27,23 @@ import Data.Time.Clock.POSIX announceK :: Int announceK = 8 -data AnnounceState = forall nid addr tok ni r. AnnounceState - { aState :: SearchState nid addr tok ni r +data AnnounceState = forall nid addr tok ni r qk. AnnounceState + { aState :: SearchState nid addr tok ni r qk , aStoringNodes :: TVar (MM.MinMaxPSQ ni (Down POSIXTime)) } -- | This type specifies an item that can be announced on appropriate nodes in -- a Kademlia network. -data AnnounceMethod r = forall nid ni sr addr tok a. +data AnnounceMethod r = forall nid ni sr addr tok a qk. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni + , Ord qk ) => AnnounceMethod - { aSearch :: Search nid addr tok ni sr + { aSearch :: Search nid addr tok ni sr qk -- ^ This is the Kademlia search to run repeatedly to find the -- nearby nodes. A new search is started whenever one is not -- already in progress at announce time. Repeated searches are @@ -72,15 +73,16 @@ data AnnounceMethod r = forall nid ni sr addr tok a. } -- | This type specifies a Kademlia search and an action to perform upon the result. -data SearchMethod r = forall nid ni sr addr tok a. +data SearchMethod r = forall nid ni sr addr tok a qk. ( Show nid , Hashable nid , Hashable ni , Ord addr , Ord nid , Ord ni + , Ord qk ) => SearchMethod - { sSearch :: Search nid addr tok ni sr + { sSearch :: Search nid addr tok ni sr qk -- ^ This is the Kademlia search to run repeatedly to find the -- nearby nodes. A new search is started whenever one is not -- already in progress at announce time. Repeated searches are @@ -155,8 +157,6 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar publishToNodes is onResult sr = return True searchAgain = do - -- Canceling a pending search here seems to make announcements more reliable. - searchCancel st return $ void $ do t <- fork search labelThread t ("scheduleAnnounce.sch." ++ show aTarget) @@ -164,7 +164,10 @@ scheduleAnnounce announcer k AnnounceMethod{aSearch,aPublish,aNearestNodes,aTar got <- tryTakeMVar mutex case got of Just () -> do - atomically $ reset aNearestNodes aSearch aTarget st + me <- myThreadId + labelThread me "scheduleAnnounce.reset" + reset aNearestNodes aSearch aTarget st + labelThread me "scheduleAnnounce.searchLoop" searchLoop aSearch aTarget onResult st -- Announce to any nodes we haven't already announced to. is <- atomically $ do @@ -202,8 +205,6 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge return () return True -- True to keep searching. searchAgain = do - -- Canceling a pending search here seems to make announcements more reliable. - searchCancel st return $ void $ do t <- fork search labelThread t ("scheduleSearch.sch." ++ show sTarget) @@ -211,7 +212,10 @@ scheduleSearch announcer k SearchMethod{sSearch,sWithResult,sNearestNodes,sTarge got <- tryTakeMVar mutex case got of Just () -> do - atomically $ reset sNearestNodes sSearch sTarget st + me <- myThreadId + labelThread me "scheduleSearch.reset" + reset sNearestNodes sSearch sTarget st + labelThread me "scheduleSearch.searchLoop" searchLoop sSearch sTarget onResult st putMVar mutex () Nothing -> do diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs index 17b68f64..ccdbd8d1 100644 --- a/dht/TCPProber.hs +++ b/dht/TCPProber.hs @@ -176,11 +176,35 @@ getNodes prober tcp seeking dst = do return $ Success ts _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r -nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo +asyncGetNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo + -> (Nonce8 -> Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ()) -> IO ()) + -> IO Nonce8 +asyncGetNodes prober tcp seeking dst withResponse = do + TCP.asyncUDPNodes tcp seeking (TCP.udpNodeInfo dst) $ \qid r -> do + dput XTCP $ "Got via TCP nodes: " ++ show r + let tcps (ns,_,mb) = (ns',ns',mb) + where ns' = do + n <- ns + [ TCP.NodeInfo n 0 ] + r' <- case r of + Success (ns,gw) -> do + let ts = tcps ns + if TCP.nodeId gw == TCP.nodeId dst + then return $ Success ts + else do + enqueueProbe prober (TCP.udpNodeInfo dst) + return $ Success ts + return $ Success ts + _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r + withResponse qid r' + + +nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo Nonce8 nodeSearch prober tcp = Search { searchSpace = TCP.tcpSpace , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort - , searchQuery = getNodes prober tcp + , searchQuery = asyncGetNodes prober tcp + , searchQueryCancel = cancelQuery (TCP.tcpClient tcp) , searchAlpha = 8 , searchK = 16 } diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index 6b057af9..3078831d 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs @@ -811,8 +811,14 @@ clientSession s@Session{..} sock cnum h = do where go | null destination = fmap Right . qhandler self | otherwise = case readEither destination of - Right ni -> fmap (maybe (Left "Timeout.") Right . resultToMaybe) - . flip (searchQuery qsearch) ni -- TODO report canceled + Right ni -> \nid -> do + v <- newEmptyMVar + _ <- searchQuery qsearch nid ni $ \_ r -> putMVar v r + r <- takeMVar v + return $ case r of + Success x -> Right x + Canceled -> Left "Canceled." + TimedOut -> Left "Timeout." Left e -> const $ return $ Left ("Bad destination: "++e) maybe (hPutClient h ("Unsupported method: "++method)) goQuery @@ -938,14 +944,14 @@ clientSession s@Session{..} sock cnum h = do , Typeable ptok , Typeable sni , Typeable pni ) - => Search nid addr stok sni sr + => Search nid addr stok sni sr qk -> (pr -> ptok -> Maybe pni -> IO (Maybe pubr)) -> Maybe (stok :~: ptok, sni :~: pni) matchingResult _ _ = liftA2 (,) eqT eqT matchingResult2 :: ( Typeable sr , Typeable pr ) - => Search nid addr stok sni sr + => Search nid addr stok sni sr qk -> (PublicKey -> pdta -> pr -> IO ()) -> (pdta -> nid) -> Maybe (pr :~: sr) @@ -1913,7 +1919,7 @@ main = do btSaved <- loadNodes netname -- :: IO [Mainline.NodeInfo] putStrLn $ "Loaded "++show (length btSaved)++" nodes for "++netname++"." fallbackNodes <- getBootstrapNodes - let isNodesSearch :: ni :~: r -> Search nid addr tok ni r -> Search nid addr tok ni ni + let isNodesSearch :: ni :~: r -> Search nid addr tok ni r qk -> Search nid addr tok ni ni qk isNodesSearch Refl sch = sch ping = maybe (const $ return False) (\DHTPing{pingQuery} -> fmap (maybe False (const True)) . pingQuery []) diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index 8532b492..d3904c40 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs @@ -512,8 +512,8 @@ data Routing = Routing { tentativeId :: NodeInfo , committee4 :: TriadCommittee NodeId SockAddr , committee6 :: TriadCommittee NodeId SockAddr - , refresher4 :: BucketRefresher NodeId NodeInfo - , refresher6 :: BucketRefresher NodeId NodeInfo + , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId + , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId } sched4 :: Routing -> TVar (Int.PSQ POSIXTime) @@ -569,7 +569,6 @@ newClient swarms addr = do -- We defer initializing the refreshSearch and refreshPing until we -- have a client to send queries with. let nullPing = const $ return False - nullSearch = mainlineSearch $ \_ _ -> return Canceled tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount refresher4 <- newBucketRefresher tbl4 nullSearch nullPing tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount @@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr - -> BucketRefresher NodeId NodeInfo + -> BucketRefresher NodeId NodeInfo TransactionId -> Kademlia NodeId NodeInfo mainlineKademlia client committee refresher = Kademlia quietInsertions @@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do isReadonlyClient :: MainlineClient -> Bool isReadonlyClient client = False -- TODO +mainlineAsync :: ( BEncode xqry + , BEncode xrsp + ) => Method + -> (xrsp -> rsp) + -> (qry -> xqry) + -> MainlineClient + -> qry + -> NodeInfo + -> (TransactionId -> QR.Result rsp -> IO ()) + -> IO TransactionId +mainlineAsync meth unwrap msg client nid addr withResult = do + asyncQuery client serializer (msg nid) addr $ \qid reply -> do + withResult qid $ case reply of + Success (Right x) -> Success x + Success (Left e) -> Canceled -- TODO: Do something with parse errors. + Canceled -> Canceled + TimedOut -> TimedOut + where + serializer = MethodSerializer + { methodTimeout = \ni -> return (ni, 5000000) + , method = meth + , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) + , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) + (Right . unwrap) + . BE.fromBEncode) + . rspPayload + } + + mainlineSend :: ( BEncode xqry , BEncode xrsp ) => Method @@ -1073,30 +1101,54 @@ ping client addr = getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) +asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) + -> IO TransactionId +asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) + unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce +asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ()) + -> IO TransactionId +asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce + unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) -mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) - -> Search NodeId (IP, PortNumber) tok NodeInfo r -mainlineSearch qry = Search +nullTransactionId :: TransactionId +nullTransactionId = TransactionId B.empty + +nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId +nullSearch = Search + { searchSpace = mainlineSpace + , searchNodeAddress = nodeIP &&& nodePort + , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId + , searchQueryCancel = \_ _ -> return () + , searchAlpha = 8 + , searchK = 16 + } + +mainlineSearch :: MainlineClient + -> (MainlineClient -> NodeId -> NodeInfo + -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId) + -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId +mainlineSearch client qry = Search { searchSpace = mainlineSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = qry + , searchQuery = qry client + , searchQueryCancel = cancelQuery client , searchAlpha = 8 , searchK = 16 } -nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo -nodeSearch client = mainlineSearch (getNodes client) +nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId +nodeSearch client = mainlineSearch client asyncGetNodes -peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr -peerSearch client = mainlineSearch (getPeers client) +peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId +peerSearch client = mainlineSearch client asyncGetPeers -- | List of bootstrap nodes maintained by different bittorrent -- software authors. diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index f9f35ea4..4aed1c43 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs @@ -480,6 +480,6 @@ announceToLan sock nid = do saferSendTo sock bs broadcast -toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous +toxQSearch :: Tox extra -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Onion.Rendezvous DHT.TransactionId toxQSearch tox = Onion.toxidSearch (onionTimeout tox) (toxCryptoKeys tox) (toxOnion tox) diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs index d132da88..f4563a3b 100644 --- a/dht/src/Network/Tox/DHT/Handlers.hs +++ b/dht/src/Network/Tox/DHT/Handlers.hs @@ -133,8 +133,8 @@ data Routing = Routing { tentativeId :: NodeInfo , committee4 :: TriadCommittee NodeId SockAddr , committee6 :: TriadCommittee NodeId SockAddr - , refresher4 :: BucketRefresher NodeId NodeInfo - , refresher6 :: BucketRefresher NodeId NodeInfo + , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId + , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId , nodesOfInterest :: TVar (HashMap NodeId [NodeInfoCallback]) } @@ -172,6 +172,20 @@ routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBu routing6 :: Routing -> TVar (R.BucketList NodeInfo) routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets +nullTransactionId :: TransactionId +nullTransactionId = TransactionId (Nonce8 0) (Nonce24 zeros24) + +nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId +nullSearch = Search + { searchSpace = toxSpace + , searchNodeAddress = nodeIP &&& nodePort + , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId + , searchQueryCancel = \_ _ -> return () + , searchAlpha = 1 + , searchK = 2 + } + + newRouting :: SockAddr -> TransportCrypto -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv4 change -> (TVar (R.BucketList NodeInfo) -> SockAddr -> STM ()) -- ^ invoked on IPv6 change @@ -195,13 +209,6 @@ newRouting addr crypto update4 update6 = do -- We defer initializing the refreshSearch and refreshPing until we -- have a client to send queries with. let nullPing = const $ return False - nullSearch = Search - { searchSpace = toxSpace - , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = \_ _ -> return Canceled - , searchAlpha = 1 - , searchK = 2 - } tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info4 R.defaultBucketCount tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount refresher4 <- newBucketRefresher tbl4 nullSearch nullPing @@ -432,6 +439,30 @@ getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> N -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) +asyncGetNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo + -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) + -> IO TransactionId +asyncGetNodes client cbvar nid addr withResult = do + QR.asyncQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr $ + \qid reply -> do + forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) -> + forM_ ns $ \n -> do + now <- getPOSIXTime + atomically $ do + mcbs <- HashMap.lookup (nodeId . udpNodeInfo $ n) <$> readTVar cbvar + forM_ mcbs $ \cbs -> do + forM_ cbs $ \cb -> do + rumoredAddress cb now addr (udpNodeInfo n) + withResult qid $ case reply of + Success x -> maybe Canceled (Success . unwrapNodes) x + _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply + +asyncGetNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo + -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) + -> IO TransactionId +asyncGetNodesUDP client cbvar nid addr go = asyncGetNodes client cbvar nid (Multi.UDP ==> addr) go + + updateRouting :: Client -> Routing -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) -> Multi.NodeInfo @@ -462,7 +493,7 @@ updateTable client routing orouter naddr = do Want_Both -> do dput XMisc "BUG:unreachable" error $ "BUG:unreachable at " ++ __FILE__ ++ ":" ++ show __LINE__ where - go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo -> IO () + go :: TriadCommittee NodeId SockAddr -> BucketRefresher NodeId NodeInfo TransactionId -> IO () go committee refresher = do self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) -- dput XMisc $ "(tox)updateRouting: " ++ show (nodeIP self, nodeIP naddr) @@ -473,7 +504,7 @@ updateTable client routing orouter naddr = do toxKademlia :: Client -> (TVar (R.BucketList NodeInfo) -> RoutingTransition NodeInfo -> STM ()) -> TriadCommittee NodeId SockAddr - -> BucketRefresher NodeId NodeInfo + -> BucketRefresher NodeId NodeInfo TransactionId -> Kademlia NodeId NodeInfo toxKademlia client orouter committee refresher = Kademlia quietInsertions @@ -541,11 +572,12 @@ handlers crypto _ CookieRequestType = Just $ MethodHandler (isCookieReques handlers _ _ DHTRequestType = Just $ NoReply (isDHTRequest snd) $ dhtRequestH handlers _ _ typ = error $ "TODO DHT handlers " ++ show typ -nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo +nodeSearch :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> Search NodeId (IP,PortNumber) () NodeInfo NodeInfo TransactionId nodeSearch client cbvar = Search { searchSpace = toxSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = getNodesUDP client cbvar + , searchQuery = asyncGetNodesUDP client cbvar + , searchQueryCancel = cancelQuery client , searchAlpha = 8 , searchK = 16 } diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs index 015c758c..45795312 100644 --- a/dht/src/Network/Tox/Onion/Handlers.hs +++ b/dht/src/Network/Tox/Onion/Handlers.hs @@ -218,13 +218,14 @@ handlers net _ _ keydb _ = Just $ NoReply Right $ dataToRouteH keydb net toxidSearch :: (OnionDestination r -> STM (OnionDestination r, Int)) -> TransportCrypto -> Client r - -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous + -> Search NodeId (IP, PortNumber) Nonce32 NodeInfo Rendezvous TransactionId toxidSearch getTimeout crypto client = Search { searchSpace = toxSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = getRendezvous getTimeout crypto client - , searchAlpha = 3 - , searchK = 6 + , searchQuery = asyncGetRendezvous getTimeout crypto client + , searchQueryCancel = cancelQuery client + , searchAlpha = 3 + , searchK = 6 } announceSerializer :: (OnionDestination r -> STM (OnionDestination r, Int)) @@ -289,6 +290,25 @@ sendOnion getTimeout client req oaddr unwrap = Canceled -> return Canceled TimedOut -> re +asyncOnion :: (OnionDestination r -> STM (OnionDestination r, Int)) + -> Client r + -> AnnounceRequest + -> OnionDestination r + -> (NodeInfo -> AnnounceResponse -> t) + -> (TransactionId -> QR.Result t -> IO ()) + -> IO TransactionId +asyncOnion getTimeout client req oaddr unwrap withResult = do + -- TODO: Restore "Four tries and then we tap out" behavior. + qid <- QR.asyncQuery client (announceSerializer getTimeout) req oaddr $ \k mb -> do + forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " async sent response: " ++ show r + withResult k $ case mb of + Success x -> maybe (TimedOut) + (Success . unwrap (onionNodeInfo oaddr)) + (x :: Maybe AnnounceResponse) + Canceled -> Canceled + TimedOut -> TimedOut + return qid + -- | Lookup the secret counterpart for a given alias key. getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) @@ -308,6 +328,27 @@ getRendezvous getTimeout crypto client nid ni = do oaddr (unwrapAnnounceResponse rkey) +asyncGetRendezvous :: + (OnionDestination r -> STM (OnionDestination r, Int)) + -> TransportCrypto + -> Client r + -> NodeId + -> NodeInfo + -> (TransactionId -> Result ([NodeInfo],[Rendezvous],Maybe Nonce32) -> IO ()) + -> IO TransactionId +asyncGetRendezvous getTimeout crypto client nid ni withResult = do + asel <- atomically $ selectAlias crypto nid + let oaddr = OnionDestination asel ni Nothing + rkey = case asel of + SearchingAlias -> Nothing + _ -> Just $ key2id $ rendezvousPublic crypto + asyncOnion getTimeout client + (AnnounceRequest zeros32 nid $ fromMaybe zeroID rkey) + oaddr + (unwrapAnnounceResponse rkey) + withResult + + putRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) -> TransportCrypto -> Client r diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index 9ce4e316..2f13a513 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs @@ -88,7 +88,7 @@ data OnionRouter = OnionRouter , tcpProber :: TCP.TCPProber , tcpProberThread :: ThreadId -- | Kademlia table of TCP relays. - , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo + , tcpBucketRefresher :: BucketRefresher NodeId TCP.NodeInfo Nonce8 , tcpRelayPinger :: RelayPinger -- | Debug prints are written to this channel which is then flushed to -- 'routeLogger'. @@ -601,44 +601,49 @@ hookQueries or t8 tmethods = TransactionMethods modifyTVar' (pendingQueries or) (W64.insert w8 pq) writeTChan (routeLog or) $ "ONION query add " ++ unwords [ show (Just $ pendingVersion pq,w8), ":=", show ni ] return (tid,d') - , dispatchResponse = \tid x d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) - let Nonce8 w8 = t8 tid - mb <- W64.lookup w8 <$> readTVar (pendingQueries or) - modifyTVar' (pendingQueries or) (W64.delete w8) - forM_ mb $ \pq -> do - let od = pendingDestination pq - RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) - $ onionRouteSpec od - modifyArray (routeMap or) (fmap gotResponse) rid - writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) - dispatchResponse tmethods tid x d - , dispatchCancel = \tid d -> {-# SCC "hookQ.dispatchCancel" #-} do -- :: tid -> d -> STM d - let Nonce8 w8 = t8 tid - mb <- W64.lookup w8 <$> readTVar (pendingQueries or) - modifyTVar' (pendingQueries or) (W64.delete w8) - forM_ mb $ \pq -> do - let od = pendingDestination pq - RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) - $ onionRouteSpec od - mrr <- readArray (routeMap or) rid - forM_ mrr $ \rr -> do - when (routeVersion rr == pendingVersion pq) $ do - let expireRoute = modifyArray (pendingRoutes or) expire rid - expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) - | otherwise = ver - modifyArray (routeMap or) (fmap gotTimeout) rid - case rr of - RouteRecord{ responseCount = 0 - , timeoutCount = c - , routeVersion = v } | c >= 5 -> expireRoute - RouteRecord{ responseCount = 1 - , timeoutCount = c - , routeVersion = v } | c >= 10 -> expireRoute - RouteRecord{ timeoutCount = c - , routeVersion = v } | c >= 20 -> expireRoute - _ -> return () - writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) - dispatchCancel tmethods tid d + , dispatchResponse = \tid rx d -> {-# SCC "hookQ.dispatchResponse" #-} do -- :: tid -> x -> d -> STM (d, IO ()) + case rx of + Success x -> do + let Nonce8 w8 = t8 tid + mb <- W64.lookup w8 <$> readTVar (pendingQueries or) + modifyTVar' (pendingQueries or) (W64.delete w8) + forM_ mb $ \pq -> do + let od = pendingDestination pq + RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) + $ onionRouteSpec od + modifyArray (routeMap or) (fmap gotResponse) rid + writeTChan (routeLog or) $ "ONION query del " ++ show (fmap pendingVersion mb, w8) + dispatchResponse tmethods tid rx d + _ -> do -- Timed out or canceled... + let Nonce8 w8 = t8 tid + mb <- W64.lookup w8 <$> readTVar (pendingQueries or) + modifyTVar' (pendingQueries or) (W64.delete w8) + forM_ mb $ \pq -> do + let od = pendingDestination pq + RouteId rid = fromMaybe (routeId (nodeId (onionNodeInfo od))) + $ onionRouteSpec od + mrr <- readArray (routeMap or) rid + forM_ mrr $ \rr -> do + when (routeVersion rr == pendingVersion pq) $ do + let expireRoute = modifyArray (pendingRoutes or) expire rid + expire ver | ver <= succ (pendingVersion pq) = succ (pendingVersion pq) + | otherwise = ver + case rx of + TimedOut -> do + modifyArray (routeMap or) (fmap gotTimeout) rid + case rr of + RouteRecord{ responseCount = 0 + , timeoutCount = c + , routeVersion = v } | c >= 5 -> expireRoute + RouteRecord{ responseCount = 1 + , timeoutCount = c + , routeVersion = v } | c >= 10 -> expireRoute + RouteRecord{ timeoutCount = c + , routeVersion = v } | c >= 20 -> expireRoute + _ -> return () + _ -> return () -- Don't penalize route for canceled queries. + writeTChan (routeLog or) $ "ONION query can " ++ show (fmap pendingVersion mb, w8) + dispatchResponse tmethods tid rx d } diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 932b4ab3..a37c0310 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs @@ -223,11 +223,53 @@ getTCPNodes tcp seeking dst = do getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst + getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) getUDPNodes' tcp seeking dst0 = do + goGetUDPNodes tcp seeking dst0 (return Canceled) $ \meth gateway dst -> do + r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway + forM r $ \response -> do + let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response + return ( (ns,ns, const () <$> mb), gateway ) + +-- Failure case, currently not treated as special. +-- The current searchQuery type demands a valid Nonce8 is returned +-- even if we were unable to send a query. +fixmeNonce :: Nonce8 +fixmeNonce = Nonce8 0 + +asyncUDPNodes :: TCPClient err Nonce8 + -> NodeId + -> UDP.NodeInfo + -> (Nonce8 + -> QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo) + -> IO ()) + -> IO Nonce8 +asyncUDPNodes tcp seeking dst0 withResult = + goGetUDPNodes tcp seeking dst0 (return fixmeNonce) $ \meth gateway dst -> do + asyncQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway $ + \qid response -> do + let wut response = + let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response + in ( (ns,ns, const () <$> mb), gateway ) + withResult qid $ fmap wut response + +type Meth x = MethodSerializer + Nonce8 + x -- NodeInfo + (Bool, RelayPacket) + PacketNumber + AnnounceRequest + (Either String AnnounceResponse) + +goGetUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo + -> IO a + -> (Meth x -> NodeInfo -> UDP.NodeInfo -> IO a) + -> IO a +goGetUDPNodes tcp seeking dst0 fail go = do mgateway <- atomically $ tcpGetGateway tcp dst0 case mgateway of - Nothing -> return Canceled + Nothing -> fail Just gateway -> do (b,c,n24) <- atomically $ do b <- transportNewKey (tcpCrypto tcp) @@ -267,10 +309,7 @@ getUDPNodes' tcp seeking dst0 = do -> decrypt (wrap0 n24') r >>= decodePlain x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x } - r <- sendQuery (tcpClient tcp) meth (AnnounceRequest zeros32 seeking UDP.zeroID) gateway - forM r $ \response -> do - let (ns,_,mb) = either (const ([],[],Nothing)) (unwrapAnnounceResponse Nothing dst) $ response - return ( (ns,ns, const () <$> mb), gateway ) + go meth gateway dst handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index e30c40da..19d0df69 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs @@ -44,7 +44,7 @@ data Search nid addr tok ni r qk = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr , searchQuery :: nid -> ni -> (qk -> Result ([ni], [r], Maybe tok) -> IO ()) -> IO qk - , searchQueryCancel :: qk -> STM () + , searchQueryCancel :: (IO () -> STM ()) -> qk -> STM () , searchAlpha :: Int -- α = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we @@ -118,19 +118,21 @@ reset :: (Ord ni, Ord nid, Hashable ni, Hashable nid) => -> Search nid addr1 tok1 ni r1 qk -> nid -> SearchState nid addr tok ni r qk - -> STM (SearchState nid addr tok ni r qk) + -> IO (SearchState nid addr tok ni r qk) reset nearestNodes qsearch target st = do - pc <- readTVar (searchPendingCount st) - check (pc == 0) - searchIsFinished st >>= check -- Wait for a search to finish before resetting. - bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) - <$> nearestNodes target - priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) - writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes - writeTVar (searchInformant st) MM.empty - writeTVar (searchVisited st) Set.empty - writeTVar (searchPendingCount st) 0 - return st + atomically $ searchCancel st + atomically $ do + pc <- readTVar (searchPendingCount st) + check (pc == 0) + searchIsFinished st >>= check -- Wait for a search to finish before resetting. + bktNodes <- map (\ni -> ni :-> kademliaLocation (searchSpace qsearch) ni) + <$> nearestNodes target + priorInformants <- map stripValue . MM.toList <$> readTVar (searchInformant st) + writeTVar (searchQueued st) $ Just $ MM.fromList $ priorInformants ++ bktNodes + writeTVar (searchInformant st) MM.empty + writeTVar (searchVisited st) Set.empty + writeTVar (searchPendingCount st) 0 + return st grokQuery :: forall addr nid tok ni r qk. ( Ord addr @@ -233,8 +235,9 @@ searchLoop :: ( Ord addr, Ord nid, Ord ni, Show nid, Hashable nid, Hashable ni, -> IO () searchLoop sch@Search{..} target result s@SearchState{..} = do myThreadId >>= flip labelThread ("search."++show target) - withTaskGroup ("search.g."++show target) searchAlpha $ \g -> fix $ \again -> do - join $ atomically $ do + iochan <- atomically newTChan + fix $ \again -> do + join $ atomically $ orElse (fmap (>> again) $ readTChan iochan) $ do cnt <- readTVar $ searchPendingCount check (cnt <= searchAlpha) -- Only searchAlpha pending queries at a time. informants <- readTVar searchInformant @@ -256,6 +259,6 @@ searchLoop sch@Search{..} target result s@SearchState{..} = do return $ do qk <- searchQuery target ni $ \qk reply -> grokQuery sch target result s (ni :-> d) qk reply - atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel qk) + atomically $ modifyTVar' searchPending $ Map.insert qk (searchQueryCancel (writeTChan iochan) qk) again _ -> searchIsFinished s >>= check >> return (return ()) diff --git a/kad/tests/searchCancel.hs b/kad/tests/searchCancel.hs index 33860a2f..e8aa33c7 100644 --- a/kad/tests/searchCancel.hs +++ b/kad/tests/searchCancel.hs @@ -52,7 +52,7 @@ sch mbv var = Search let qk = maybe 0 (\(ns,_,_) -> head ns) r f qk $ maybe TimedOut Success r return qk - , searchQueryCancel = \_ -> return () + , searchQueryCancel = \_ _ -> return () , searchAlpha = 4 , searchK = 8 } diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index 4f14ea3c..94eb4796 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs @@ -332,9 +332,6 @@ data TransactionMethods d qid addr x = TransactionMethods -- will write the packet to the correct 'MVar' thus completing the -- dispatch. , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ()) - -- | When a timeout interval elapses, this method is called to remove the - -- transaction from the table. - , dispatchCancel :: qid -> d -> STM d } -- | A set of methods necessary for dispatching incoming packets. @@ -429,7 +426,9 @@ asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do tm_key <- registerTimeout tm expiry $ do atomically $ do tbl <- readTVar pending - v <- dispatchCancel (tableMethods d) qid tbl + -- Below, we discard the returned IO action since we will call + -- withResponse directly later. + (v,_) <- dispatchResponse (tableMethods d) qid TimedOut tbl writeTVar pending v m <- takeMVar keyvar forM_ m $ \_ -> withResponse qid TimedOut @@ -505,8 +504,7 @@ transactionMethods' :: -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t a) qid addr x transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods - { dispatchCancel = \tid (g,t) -> return (g, delete tid t) - , dispatchRegister = \nowPlusExpiry v a (g,t) -> do + { dispatchRegister = \nowPlusExpiry v a (g,t) -> do let (tid,g') = generate g let t' = insert tid (store v) nowPlusExpiry t -- (now + microsecondsDiff expiry) t return ( tid, (g',t') ) -- cgit v1.2.3