From 5181c77ce7dd73d622ff3921b90bf2741bedb646 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 17:12:14 -0500 Subject: QueryResponse: Use three-way sum to distinguish Canceled and Timedout. --- server/src/Network/QueryResponse.hs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) (limited to 'server') diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index 20e7ecf0..cb65eb47 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs @@ -2,6 +2,9 @@ -- with Kademlia implementations in mind. {-# LANGUAGE CPP #-} +{-# LANGUAGE DeriveFoldable #-} +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PartialTypeSignatures #-} @@ -32,6 +35,7 @@ import qualified Data.IntMap.Strict as IntMap import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import Data.Time.Clock.POSIX +import Data.Traversable (Traversable) import qualified Data.Word64Map as W64Map ;import Data.Word64Map (Word64Map) import Data.Word @@ -49,6 +53,15 @@ import DPut import DebugTag import Data.TableMethods +-- | The reply to a query to a remote server or the result of some other IO +-- process that can timeout or be canceled. +data Result a = Success a | TimedOut | Canceled + deriving (Functor, Foldable, Traversable, Eq, Ord, Show) + +resultToMaybe :: Result a -> Maybe a +resultToMaybe (Success a) = Just a +resultToMaybe _ = Nothing + -- | An inbound packet or condition raised while monitoring a connection. data Arrival err addr x = Terminated -- ^ Virtual message that signals EOF. @@ -310,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods -- that can be used to forget the 'MVar' if the remote peer is not -- responding. dispatchRegister :: POSIXTime -- time of expiry - -> (Maybe x -> IO ()) -- callback upon response (or timeout) + -> (Result x -> IO ()) -- callback upon response (or timeout) -> addr -> d -> STM (qid, d) @@ -394,7 +407,7 @@ asyncQuery_ :: Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr - -> (Maybe b -> IO ()) + -> (Result b -> IO ()) -> IO (tid,POSIXTime,Int) asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do now <- getPOSIXTime @@ -419,14 +432,14 @@ asyncQuery :: Show meth => Client err meth tid addr x -> MethodSerializer tid addr x meth a b -> a -> addr - -> (Maybe b -> IO ()) + -> (Result b -> IO ()) -> IO () asyncQuery client meth q addr withResponse0 = do tm <- getSystemTimerManager tidvar <- newEmptyMVar timedout <- registerTimeout tm 1000000 $ do dput XMisc $ "async TIMEDOUT " ++ show (method meth) - withResponse0 Nothing + withResponse0 TimedOut tid <- takeMVar tidvar dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) case client of @@ -448,16 +461,16 @@ sendQuery :: -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. - -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. + -> IO (Result b) -- ^ The response or failure condition. sendQuery c@(Client net d err pending whoami _) meth q addr0 = do mvar <- newEmptyMVar (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) mres <- timeout expiry $ takeMVar mvar case mres of - Just b -> return $ Just b + Just b -> return $ Success b Nothing -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending - return Nothing + return TimedOut contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) @@ -495,8 +508,8 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr = -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. transactionMethods' :: - ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry - -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry + ((Result x -> IO ()) -> a) -- ^ store MVar into table entry + -> (a -> Result x -> IO void) -- ^ load MVar from table entry -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t a) tid addr x @@ -509,7 +522,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t - in return ((g,t'),void $ load v $ Just x) + in return ((g,t'),void $ load v $ Success x) Nothing -> return ((g,t), return ()) } @@ -518,7 +531,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr transactionMethods :: TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x + -> TransactionMethods (g,t (Result x -> IO ())) tid addr x transactionMethods methods generate = transactionMethods' id id methods generate -- | Handle a single inbound packet and then invoke the given continuation. -- cgit v1.2.3