From 5181c77ce7dd73d622ff3921b90bf2741bedb646 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 17:12:14 -0500 Subject: QueryResponse: Use three-way sum to distinguish Canceled and Timedout. --- dht/src/Network/BitTorrent/MainlineDHT.hs | 21 ++++++++++--------- dht/src/Network/Tox/DHT/Handlers.hs | 8 +++---- dht/src/Network/Tox/Onion/Handlers.hs | 2 +- dht/src/Network/Tox/Onion/Routes.hs | 2 +- dht/src/Network/Tox/TCP.hs | 10 ++++----- server/src/Network/QueryResponse.hs | 35 +++++++++++++++++++++---------- 6 files changed, 46 insertions(+), 32 deletions(-) diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index bb556bc6..e604f5e5 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs @@ -1033,21 +1033,22 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do isReadonlyClient :: MainlineClient -> Bool isReadonlyClient client = False -- TODO -mainlineSend :: ( BEncode a - , BEncode a2 +mainlineSend :: ( BEncode xqry + , BEncode xrsp ) => Method - -> (a2 -> b) - -> (t -> a) + -> (xrsp -> rsp) + -> (qry -> xqry) -> MainlineClient - -> t + -> qry -> NodeInfo - -> IO (Maybe b) + -> IO (Maybe rsp) mainlineSend meth unwrap msg client nid addr = do reply <- sendQuery client serializer (msg nid) addr - -- sendQuery will return (Just (Left _)) on a parse error. We're going to - -- blow it away with the join-either sequence. - -- TODO: Do something with parse errors. - return $ join $ either (const Nothing) Just <$> reply + return $ case reply of + Success (Right x) -> Just x + Success (Left e) -> Nothing -- TODO: Do something with parse errors. + Canceled -> Nothing + TimedOut -> Nothing where serializer = MethodSerializer { methodTimeout = \ni -> return (ni, 5000000) diff --git a/dht/src/Network/Tox/DHT/Handlers.hs b/dht/src/Network/Tox/DHT/Handlers.hs index 7806da78..dc4ca5fa 100644 --- a/dht/src/Network/Tox/DHT/Handlers.hs +++ b/dht/src/Network/Tox/DHT/Handlers.hs @@ -353,7 +353,7 @@ ping client addr = do dput XPing $ show addr ++ " <-- ping" reply <- QR.sendQuery client (serializer PingType DHTPing unpong) Ping addr dput XPing $ show addr ++ " -pong-> " ++ show reply - maybe (return False) (\Pong -> return True) $ join reply + maybe (return False) (\Pong -> return True) $ join $ resultToMaybe reply saveCookieKey :: TVar [(SockAddr, (Int, PublicKey))] -> SockAddr -> PublicKey -> STM () @@ -396,7 +396,7 @@ cookieRequest crypto client localUserKey addr = do reply <- QR.sendQuery client cookieSerializer cookieRequest addr runlast dput XNetCrypto $ show addr ++ " -cookieResponse-> " ++ show reply - return $ join reply + return $ join $ resultToMaybe reply unCookie :: DHTMessage t -> Maybe (t (Cookie Encrypted)) unCookie (DHTCookie n24 fcookie) = Just fcookie @@ -415,7 +415,7 @@ getNodes client cbvar nid addr = do -- dput XMisc $ show addr ++ " <-- getnodes " ++ show nid reply <- QR.sendQuery client (serializer GetNodesType DHTGetNodes unsendNodes) (GetNodes nid) addr -- dput XMisc $ show addr ++ " -sendnodes-> " ++ show reply - forM_ (join reply) $ \(SendNodes ns) -> + forM_ (join $ resultToMaybe reply) $ \(SendNodes ns) -> forM_ ns $ \n -> do now <- getPOSIXTime atomically $ do @@ -423,7 +423,7 @@ getNodes client cbvar nid addr = do forM_ mcbs $ \cbs -> do forM_ cbs $ \cb -> do rumoredAddress cb now addr (udpNodeInfo n) - return $ fmap unwrapNodes $ join reply + return $ fmap unwrapNodes $ join $ resultToMaybe reply getNodesUDP :: Client -> TVar (HashMap NodeId [NodeInfoCallback]) -> NodeId -> NodeInfo -> IO (Maybe ([NodeInfo],[NodeInfo],Maybe ())) getNodesUDP client cbvar nid addr = getNodes client cbvar nid (Multi.UDP ==> addr) diff --git a/dht/src/Network/Tox/Onion/Handlers.hs b/dht/src/Network/Tox/Onion/Handlers.hs index 65ec846c..fa7bc83c 100644 --- a/dht/src/Network/Tox/Onion/Handlers.hs +++ b/dht/src/Network/Tox/Onion/Handlers.hs @@ -285,7 +285,7 @@ sendOnion getTimeout client req oaddr unwrap = forM_ mb $ \r -> dput XAnnounce $ show (onionNodeInfo oaddr) ++ " sent response: " ++ show r maybe (if n>0 then loop $! n - 1 else return Nothing) (return . Just . unwrap (onionNodeInfo oaddr)) - $ join mb + $ join $ resultToMaybe mb -- | Lookup the secret counterpart for a given alias key. diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index b20ad7dd..7c11227a 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs @@ -171,7 +171,7 @@ newOnionRouter crypto perror tcp_enabled = do ((tbl,(tcptbl,tcpcons,relaynet,onionnet)),tcp) <- do (tcptbl, client) <- TCP.newClient crypto id - (. (Just . (,) False)) + (. (Success . (,) False)) (lookupSender' pq rlog) (\_ (RouteId rid) -> atomically $ fmap storedRoute <$> readArray rm rid) diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 9f0af976..0850ce51 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs @@ -46,7 +46,7 @@ import DPut import Network.Address (setPort,PortNumber,localhost4,fromSockAddr,nullAddress4) import Network.Kademlia.Routing import Network.Kademlia.Search hiding (sendQuery) -import Network.QueryResponse +import Network.QueryResponse as QR import Network.QueryResponse.TCP import Network.Tox.TCP.NodeId () import Network.Tox.DHT.Transport (toxSpace) @@ -226,7 +226,7 @@ getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) getUDPNodes' tcp seeking dst0 = do mgateway <- atomically $ tcpGetGateway tcp dst0 - fmap join $ forM mgateway $ \gateway -> do + fmap (join . fmap resultToMaybe) $ forM mgateway $ \gateway -> do (b,c,n24) <- atomically $ do b <- transportNewKey (tcpCrypto tcp) c <- transportNewKey (tcpCrypto tcp) @@ -284,7 +284,7 @@ handle2route o src dst = do tcpPing :: Show addr => Client err PacketNumber Nonce8 addr (Bool,RelayPacket) -> addr -> IO (Maybe ()) tcpPing client dst = do dput XTCP $ "tcpPing " ++ show dst - sendQuery client meth () dst + resultToMaybe <$> sendQuery client meth () dst where meth = MethodSerializer { wrapQuery = \n8 src dst () -> (True,RelayPing n8) , unwrapResponse = \_ -> () @@ -295,7 +295,7 @@ tcpPing client dst = do tcpConnectionRequest_ :: Client err PacketNumber tid addr (Bool, RelayPacket) -> PublicKey -> addr -> IO (Maybe ConId) tcpConnectionRequest_ client pubkey ni = do - sendQuery client meth pubkey ni + resultToMaybe <$> sendQuery client meth pubkey ni where meth = MethodSerializer { wrapQuery = \n8 src dst pubkey -> (True,RoutingRequest pubkey) @@ -319,7 +319,7 @@ type RelayCache = TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacke -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state -- will be returned to the caller along with the new client. newClient :: TransportCrypto - -> ((Maybe (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query + -> ((QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query -> (a -> RelayPacket -> IO void) -- ^ load mvar for relay query -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index 20e7ecf0..cb65eb47 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs @@ -2,6 +2,9 @@ -- with Kademlia implementations in mind. {-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PartialTypeSignatures #-} @@ -32,6 +35,7 @@ import qualified Data.IntMap.Strict as IntMap import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Time.Clock.POSIX +import Data.Traversable (Traversable) import qualified Data.Word64Map as W64Map ;import Data.Word64Map (Word64Map) import Data.Word @@ -49,6 +53,15 @@ import DPut import DebugTag import Data.TableMethods +-- | The reply to a query to a remote server or the result of some other IO +-- process that can timeout or be canceled. +data Result a = Success a | TimedOut | Canceled + deriving (Functor, Foldable, Traversable, Eq, Ord, Show) + +resultToMaybe :: Result a -> Maybe a +resultToMaybe (Success a) = Just a +resultToMaybe _ = Nothing + -- | An inbound packet or condition raised while monitoring a connection. data Arrival err addr x = Terminated -- ^ Virtual message that signals EOF. @@ -310,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods -- that can be used to forget the 'MVar' if the remote peer is not -- responding. dispatchRegister :: POSIXTime -- time of expiry - -> (Maybe x -> IO ()) -- callback upon response (or timeout) + -> (Result x -> IO ()) -- callback upon response (or timeout) -> addr -> d -> STM (qid, d) @@ -394,7 +407,7 @@ asyncQuery_ :: Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr - -> (Maybe b -> IO ()) + -> (Result b -> IO ()) -> IO (tid,POSIXTime,Int) asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do now <- getPOSIXTime @@ -419,14 +432,14 @@ asyncQuery :: Show meth => Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr - -> (Maybe b -> IO ()) + -> (Result b -> IO ()) -> IO () asyncQuery client meth q addr withResponse0 = do tm <- getSystemTimerManager tidvar <- newEmptyMVar timedout <- registerTimeout tm 1000000 $ do dput XMisc $ "async TIMEDOUT " ++ show (method meth) - withResponse0 Nothing + withResponse0 TimedOut tid <- takeMVar tidvar dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) case client of @@ -448,16 +461,16 @@ sendQuery :: -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. - -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. + -> IO (Result b) -- ^ The response or failure condition. sendQuery c@(Client net d err pending whoami _) meth q addr0 = do mvar <- newEmptyMVar (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) mres <- timeout expiry $ takeMVar mvar case mres of - Just b -> return $ Just b + Just b -> return $ Success b Nothing -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending - return Nothing + return TimedOut contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) @@ -495,8 +508,8 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr = -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. transactionMethods' :: - ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry - -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry + ((Result x -> IO ()) -> a) -- ^ store MVar into table entry + -> (a -> Result x -> IO void) -- ^ load MVar from table entry -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t a) tid addr x @@ -509,7 +522,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t - in return ((g,t'),void $ load v $ Just x) + in return ((g,t'),void $ load v $ Success x) Nothing -> return ((g,t), return ()) } @@ -518,7 +531,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr transactionMethods :: TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x + -> TransactionMethods (g,t (Result x -> IO ())) tid addr x transactionMethods methods generate = transactionMethods' id id methods generate -- | Handle a single inbound packet and then invoke the given continuation. -- cgit v1.2.3