From 15ab3290ad04280764968ba4760474a8c0cbfa52 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 18:22:16 -0500 Subject: Modify kademlia search to distinguish a Canceled from timed-out query. --- dht/TCPProber.hs | 13 ++++++++----- dht/examples/dhtd.hs | 4 ++-- dht/src/Network/BitTorrent/MainlineDHT.hs | 26 +++++++++++++------------- dht/src/Network/Tox.hs | 2 +- dht/src/Network/Tox/DHT/Handlers.hs | 12 ++++++++---- dht/src/Network/Tox/Onion/Handlers.hs | 19 +++++++++++-------- dht/src/Network/Tox/TCP.hs | 8 +++++--- kad/kad.cabal | 1 + kad/src/Network/Kademlia/Search.hs | 11 +++++++---- 9 files changed, 56 insertions(+), 40 deletions(-) diff --git a/dht/TCPProber.hs b/dht/TCPProber.hs index faf8b35c..17b68f64 100644 --- a/dht/TCPProber.hs +++ b/dht/TCPProber.hs @@ -26,6 +26,7 @@ import Data.Wrapper.PSQ as PSQ import Network.Kademlia.Search import Network.Tox.NodeId import qualified Network.Tox.TCP as TCP +import Network.QueryResponse as QR -- Probe TCP ports in a staggered fashion to up the odds of discovering -- a higher priority port like 443. @@ -156,7 +157,7 @@ runProbeQueue prober client maxjobs = do loop -getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Maybe ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) +getNodes :: TCPProber -> TCP.TCPClient err Nonce8 -> NodeId -> TCP.NodeInfo -> IO (Result ([TCP.NodeInfo],[TCP.NodeInfo],Maybe ())) getNodes prober tcp seeking dst = do r <- TCP.getUDPNodes' tcp seeking (TCP.udpNodeInfo dst) dput XTCP $ "Got via TCP nodes: " ++ show r @@ -164,14 +165,16 @@ getNodes prober tcp seeking dst = do where ns' = do n <- ns [ TCP.NodeInfo n 0 ] - fmap join $ forM r $ \(ns,gw) -> do + case r of + Success (ns,gw) -> do let ts = tcps ns if TCP.nodeId gw == TCP.nodeId dst - then return $ Just ts + then return $ Success ts else do enqueueProbe prober (TCP.udpNodeInfo dst) - return $ Just ts - return $ Just ts + return $ Success ts + return $ Success ts + _ -> return $ fmap (const $ error "TCPProber.getNodes: The impossible happened!") r nodeSearch :: TCPProber -> TCP.TCPClient err Nonce8 -> Search NodeId (IP, PortNumber) () TCP.NodeInfo TCP.NodeInfo nodeSearch prober tcp = Search diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index 26f3f149..6b057af9 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs @@ -811,8 +811,8 @@ clientSession s@Session{..} sock cnum h = do where go | null destination = fmap Right . qhandler self | otherwise = case readEither destination of - Right ni -> fmap (maybe (Left "Timeout.") Right) - . flip (searchQuery qsearch) ni + Right ni -> fmap (maybe (Left "Timeout.") Right . resultToMaybe) + . flip (searchQuery qsearch) ni -- TODO report canceled Left e -> const $ return $ Left ("Bad destination: "++e) maybe (hPutClient h ("Unsupported method: "++method)) goQuery diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index fc69fedd..8532b492 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs @@ -70,7 +70,7 @@ import Network.Kademlia.Search (Search (..)) import Network.BitTorrent.DHT.Token as Token import qualified Network.Kademlia.Routing as R ;import Network.Kademlia.Routing (getTimestamp) -import Network.QueryResponse +import Network.QueryResponse as QR import Network.Socket import System.IO.Error import System.IO.Unsafe (unsafeInterleaveIO) @@ -569,7 +569,7 @@ newClient swarms addr = do -- We defer initializing the refreshSearch and refreshPing until we -- have a client to send queries with. let nullPing = const $ return False - nullSearch = mainlineSearch $ \_ _ -> return Nothing + nullSearch = mainlineSearch $ \_ _ -> return Canceled tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount refresher4 <- newBucketRefresher tbl4 nullSearch nullPing tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount @@ -1045,14 +1045,14 @@ mainlineSend :: ( BEncode xqry -> MainlineClient -> qry -> NodeInfo - -> IO (Maybe rsp) + -> IO (QR.Result rsp) mainlineSend meth unwrap msg client nid addr = do reply <- sendQuery client serializer (msg nid) addr return $ case reply of - Success (Right x) -> Just x - Success (Left e) -> Nothing -- TODO: Do something with parse errors. - Canceled -> Nothing - TimedOut -> Nothing + Success (Right x) -> Success x + Success (Left e) -> Canceled -- TODO: Do something with parse errors. + Canceled -> Canceled + TimedOut -> TimedOut where serializer = MethodSerializer { methodTimeout = \ni -> return (ni, 5000000) @@ -1066,23 +1066,23 @@ mainlineSend meth unwrap msg client nid addr = do ping :: MainlineClient -> NodeInfo -> IO Bool ping client addr = - fromMaybe False + fromMaybe False . resultToMaybe <$> mainlineSend (Method "ping") (\Pong -> True) (const Ping) client () addr -- searchQuery :: ni -> IO (Maybe [ni], [r], tok)) -getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) +getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) -getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[PeerAddr],Maybe Token)) +getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) -mainlineSearch :: (NodeId -> NodeInfo -> IO (Maybe ([NodeInfo], [r], Maybe tok))) +mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) -> Search NodeId (IP, PortNumber) tok NodeInfo r mainlineSearch qry = Search { searchSpace = mainlineSpace @@ -1140,5 +1140,5 @@ resolve want hostAndPort = do announce :: MainlineClient -> Announce -> NodeInfo -> IO (Maybe Announced) -announce client msg addr = do - mainlineSend (Method "announce_peer") id (\() -> msg) client () addr +announce client msg addr = + resultToMaybe <$> mainlineSend (Method "announce_peer") id (\() -> msg) client () addr diff --git a/dht/src/Network/Tox.hs b/dht/src/Network/Tox.hs index f17bad2c..f9f35ea4 100644 --- a/dht/src/Network/Tox.hs +++ b/dht/src/Network/Tox.hs @@ -349,7 +349,7 @@ newToxOverTransport keydb addr onNewSession (crypto,roster) udp tcp = do -- TODO: Refactor so that these threads are forked when 'forkTox' is invoked. -- This function should only initialize state. orouter' <- forkRouteBuilder orouter - $ \nid ni -> fmap (\(_,ns,_)->ns) + $ \nid ni -> fmap (\(_,ns,_)->ns) . resultToMaybe <$> DHT.getNodes dhtclient (DHT.nodesOfInterest $ mkrouting dhtclient) nid (Multi.UDP ==> ni) toks <- do diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs index dc4ca5fa..d132da88 100644 --- a/dht/src/Network/Tox/DHT/Handlers.hs +++ b/dht/src/Network/Tox/DHT/Handlers.hs @@ -198,7 +198,7 @@ newRouting addr crypto update4 update6 = do nullSearch = Search { searchSpace = toxSpace , searchNodeAddress = nodeIP &&& nodePort - , searchQuery = \_ _ -> return Nothing + , searchQuery = \_ _ -> return Canceled , searchAlpha = 1 , searchK = 2 } @@ -410,7 +410,8 @@ unsendNodes _ = Nothing unwrapNodes :: SendNodes -> ( [NodeInfo], [NodeInfo], Maybe () ) unwrapNodes (SendNodes ns) = (map udpNodeInfo ns,map udpNodeInfo ns,Just ()) -getNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) +getNodes :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> Multi.NodeInfo + -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodes client cbvar nid addr = do -- dput XMisc $ show addr ++ " <-- getnodes " ++ show nid reply <- QR.sendQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr @@ -423,9 +424,12 @@ getNodes client cbvar nid addr = do forM_ mcbs $ \cbs -> do forM_ cbs $ \cb -> do rumoredAddress cb now addr (udpNodeInfo n) - return $ fmap unwrapNodes $ join $ resultToMaybe reply + return $ case reply of + Success x -> maybe Canceled (Success . unwrapNodes) x + _ -> fmap (error "Network.Tox.DHT.Handlers.getNodes: the impossible happened!") reply -getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) +getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo + -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) updateRouting :: Client -> Routing diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs index fa7bc83c..015c758c 100644 --- a/dht/src/Network/Tox/Onion/Handlers.hs +++ b/dht/src/Network/Tox/Onion/Handlers.hs @@ -277,15 +277,17 @@ sendOnion :: (OnionDestination r -> STM (OnionDestination r, Int)) -> AnnounceRequest -> OnionDestination r -> (NodeInfo -> AnnounceResponse -> t) - -> IO (Maybe t) + -> IO (QR.Result t) sendOnion getTimeout client req oaddr unwrap = -- Four tries and then we tap out. flip fix 4 $ \loop n -> do - mb <- QR.sendQuery client (announceSerializer getTimeout) req oaddr - forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " sent response: " ++ show r - maybe (if n>0 then loop $! n - 1 else return Nothing) - (return . Just . unwrap (onionNodeInfo oaddr)) - $ join $ resultToMaybe mb + mb <- QR.sendQuery client (announceSerializer getTimeout) req oaddr + forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " sent response: " ++ show r + let re = if n>0 then loop $! n - 1 else return Canceled + case mb of + Success x -> maybe re (return . Success . unwrap (onionNodeInfo oaddr)) x + Canceled -> return Canceled + TimedOut -> re -- | Lookup the secret counterpart for a given alias key. @@ -294,7 +296,7 @@ getRendezvous :: (OnionDestination r -> STM (OnionDestination r, Int)) -> Client r -> NodeId -> NodeInfo - -> IO (Maybe ([NodeInfo],[Rendezvous],Maybe Nonce32)) + -> IO (Result ([NodeInfo],[Rendezvous],Maybe Nonce32)) getRendezvous getTimeout crypto client nid ni = do asel <- atomically $ selectAlias crypto nid let oaddr = OnionDestination asel ni Nothing @@ -319,5 +321,6 @@ putRendezvous getTimeout crypto client pubkey nonce32 ni = do rendezvousKey = key2id rkey asel <- atomically $ selectAlias crypto longTermKey let oaddr = OnionDestination asel ni Nothing - sendOnion getTimeout client (AnnounceRequest nonce32 longTermKey rendezvousKey) oaddr + fmap resultToMaybe + $ sendOnion getTimeout client (AnnounceRequest nonce32 longTermKey rendezvousKey) oaddr $ \ni resp -> (Rendezvous rkey ni, resp) diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 385da35b..932b4ab3 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs @@ -221,12 +221,14 @@ getTCPNodes tcp seeking dst = do -} getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) -getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst +getUDPNodes tcp seeking dst = fmap fst . resultToMaybe <$> getUDPNodes' tcp seeking dst -getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) +getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (QR.Result (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) getUDPNodes' tcp seeking dst0 = do mgateway <- atomically $ tcpGetGateway tcp dst0 - fmap (join . fmap resultToMaybe) $ forM mgateway $ \gateway -> do + case mgateway of + Nothing -> return Canceled + Just gateway -> do (b,c,n24) <- atomically $ do b <- transportNewKey (tcpCrypto tcp) c <- transportNewKey (tcpCrypto tcp) diff --git a/kad/kad.cabal b/kad/kad.cabal index 4a86bc4f..7c92f809 100644 --- a/kad/kad.cabal +++ b/kad/kad.cabal @@ -86,6 +86,7 @@ library , network-addr , cereal , tasks + , server hs-source-dirs: src default-language: Haskell2010 diff --git a/kad/src/Network/Kademlia/Search.hs b/kad/src/Network/Kademlia/Search.hs index 03c18d0e..8d9c997b 100644 --- a/kad/src/Network/Kademlia/Search.hs +++ b/kad/src/Network/Kademlia/Search.hs @@ -29,7 +29,8 @@ import qualified Data.MinMaxPSQ as MM ;import Data.MinMaxPSQ (MinMaxPSQ, MinMaxPSQ') import qualified Data.Wrapper.PSQ as PSQ ;import Data.Wrapper.PSQ (pattern (:->), Binding, pattern Binding, Binding', PSQKey) -import Network.Kademlia.Routing as R +import Network.Kademlia.Routing as R +import Network.QueryResponse (Result(..)) #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else @@ -40,7 +41,7 @@ import GHC.Conc (labelThread) data Search nid addr tok ni r = Search { searchSpace :: KademliaSpace nid ni , searchNodeAddress :: ni -> addr - , searchQuery :: nid -> ni -> IO (Maybe ([ni], [r], Maybe tok)) + , searchQuery :: nid -> ni -> IO (Result ([ni], [r], Maybe tok)) , searchAlpha :: Int -- α = 8 -- | 'searchK' should be larger than 'searchAlpha'. How much larger depends on -- how fast the queries are. For Tox's much slower onion-routed queries, we @@ -138,12 +139,14 @@ sendQuery :: forall addr nid tok ni r. -> IO () sendQuery Search{..} searchTarget searchResult sch@SearchState{..} (ni :-> d) = do myThreadId >>= flip labelThread ("searchQuery." ++ show searchTarget) - reply <- searchQuery searchTarget ni `catchIOError` const (return Nothing) + reply <- searchQuery searchTarget ni `catchIOError` const (return Canceled) -- (ns,rs) let tok = error "TODO: token" atomically $ do modifyTVar searchPendingCount pred - maybe (return ()) go reply + case reply of + Success x -> go x + _ -> return () where go (ns,rs,tok) = do vs <- readTVar searchVisited -- cgit v1.2.3