diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 17:12:14 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-03 17:26:06 -0500 |
commit | 5181c77ce7dd73d622ff3921b90bf2741bedb646 (patch) | |
tree | 16ba93b83ad0c137a013e47f593d7d40ace68ce6 /server | |
parent | 31b799222cb76cd0002d9a3cc5b340a7b6fed139 (diff) |
QueryResponse: Use three-way sum to distinguish Canceled and Timedout.
Diffstat (limited to 'server')
-rw-r--r-- | server/src/Network/QueryResponse.hs | 35 |
1 files changed, 24 insertions, 11 deletions
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index 20e7ecf0..cb65eb47 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs | |||
@@ -2,6 +2,9 @@ | |||
2 | -- with Kademlia implementations in mind. | 2 | -- with Kademlia implementations in mind. |
3 | 3 | ||
4 | {-# LANGUAGE CPP #-} | 4 | {-# LANGUAGE CPP #-} |
5 | {-# LANGUAGE DeriveFoldable #-} | ||
6 | {-# LANGUAGE DeriveFunctor #-} | ||
7 | {-# LANGUAGE DeriveTraversable #-} | ||
5 | {-# LANGUAGE GADTs #-} | 8 | {-# LANGUAGE GADTs #-} |
6 | {-# LANGUAGE LambdaCase #-} | 9 | {-# LANGUAGE LambdaCase #-} |
7 | {-# LANGUAGE PartialTypeSignatures #-} | 10 | {-# LANGUAGE PartialTypeSignatures #-} |
@@ -32,6 +35,7 @@ import qualified Data.IntMap.Strict as IntMap | |||
32 | import qualified Data.Map.Strict as Map | 35 | import qualified Data.Map.Strict as Map |
33 | ;import Data.Map.Strict (Map) | 36 | ;import Data.Map.Strict (Map) |
34 | import Data.Time.Clock.POSIX | 37 | import Data.Time.Clock.POSIX |
38 | import Data.Traversable (Traversable) | ||
35 | import qualified Data.Word64Map as W64Map | 39 | import qualified Data.Word64Map as W64Map |
36 | ;import Data.Word64Map (Word64Map) | 40 | ;import Data.Word64Map (Word64Map) |
37 | import Data.Word | 41 | import Data.Word |
@@ -49,6 +53,15 @@ import DPut | |||
49 | import DebugTag | 53 | import DebugTag |
50 | import Data.TableMethods | 54 | import Data.TableMethods |
51 | 55 | ||
56 | -- | The reply to a query to a remote server or the result of some other IO | ||
57 | -- process that can timeout or be canceled. | ||
58 | data Result a = Success a | TimedOut | Canceled | ||
59 | deriving (Functor, Foldable, Traversable, Eq, Ord, Show) | ||
60 | |||
61 | resultToMaybe :: Result a -> Maybe a | ||
62 | resultToMaybe (Success a) = Just a | ||
63 | resultToMaybe _ = Nothing | ||
64 | |||
52 | -- | An inbound packet or condition raised while monitoring a connection. | 65 | -- | An inbound packet or condition raised while monitoring a connection. |
53 | data Arrival err addr x | 66 | data Arrival err addr x |
54 | = Terminated -- ^ Virtual message that signals EOF. | 67 | = Terminated -- ^ Virtual message that signals EOF. |
@@ -310,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods | |||
310 | -- that can be used to forget the 'MVar' if the remote peer is not | 323 | -- that can be used to forget the 'MVar' if the remote peer is not |
311 | -- responding. | 324 | -- responding. |
312 | dispatchRegister :: POSIXTime -- time of expiry | 325 | dispatchRegister :: POSIXTime -- time of expiry |
313 | -> (Maybe x -> IO ()) -- callback upon response (or timeout) | 326 | -> (Result x -> IO ()) -- callback upon response (or timeout) |
314 | -> addr | 327 | -> addr |
315 | -> d | 328 | -> d |
316 | -> STM (qid, d) | 329 | -> STM (qid, d) |
@@ -394,7 +407,7 @@ asyncQuery_ :: Client err meth tid addr x | |||
394 | -> MethodSerializer tid addr x meth a b | 407 | -> MethodSerializer tid addr x meth a b |
395 | -> a | 408 | -> a |
396 | -> addr | 409 | -> addr |
397 | -> (Maybe b -> IO ()) | 410 | -> (Result b -> IO ()) |
398 | -> IO (tid,POSIXTime,Int) | 411 | -> IO (tid,POSIXTime,Int) |
399 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | 412 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do |
400 | now <- getPOSIXTime | 413 | now <- getPOSIXTime |
@@ -419,14 +432,14 @@ asyncQuery :: Show meth => Client err meth tid addr x | |||
419 | -> MethodSerializer tid addr x meth a b | 432 | -> MethodSerializer tid addr x meth a b |
420 | -> a | 433 | -> a |
421 | -> addr | 434 | -> addr |
422 | -> (Maybe b -> IO ()) | 435 | -> (Result b -> IO ()) |
423 | -> IO () | 436 | -> IO () |
424 | asyncQuery client meth q addr withResponse0 = do | 437 | asyncQuery client meth q addr withResponse0 = do |
425 | tm <- getSystemTimerManager | 438 | tm <- getSystemTimerManager |
426 | tidvar <- newEmptyMVar | 439 | tidvar <- newEmptyMVar |
427 | timedout <- registerTimeout tm 1000000 $ do | 440 | timedout <- registerTimeout tm 1000000 $ do |
428 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | 441 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) |
429 | withResponse0 Nothing | 442 | withResponse0 TimedOut |
430 | tid <- takeMVar tidvar | 443 | tid <- takeMVar tidvar |
431 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | 444 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) |
432 | case client of | 445 | case client of |
@@ -448,16 +461,16 @@ sendQuery :: | |||
448 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | 461 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. |
449 | -> a -- ^ The outbound query. | 462 | -> a -- ^ The outbound query. |
450 | -> addr -- ^ Destination address of query. | 463 | -> addr -- ^ Destination address of query. |
451 | -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. | 464 | -> IO (Result b) -- ^ The response or failure condition. |
452 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | 465 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do |
453 | mvar <- newEmptyMVar | 466 | mvar <- newEmptyMVar |
454 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | 467 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) |
455 | mres <- timeout expiry $ takeMVar mvar | 468 | mres <- timeout expiry $ takeMVar mvar |
456 | case mres of | 469 | case mres of |
457 | Just b -> return $ Just b | 470 | Just b -> return $ Success b |
458 | Nothing -> do | 471 | Nothing -> do |
459 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | 472 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending |
460 | return Nothing | 473 | return TimedOut |
461 | 474 | ||
462 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | 475 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x |
463 | contramapAddr f (MethodHandler p s a) | 476 | contramapAddr f (MethodHandler p s a) |
@@ -495,8 +508,8 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr = | |||
495 | -- table of pending transactions. This also enables multiple 'Client's to | 508 | -- table of pending transactions. This also enables multiple 'Client's to |
496 | -- share a single transaction table. | 509 | -- share a single transaction table. |
497 | transactionMethods' :: | 510 | transactionMethods' :: |
498 | ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry | 511 | ((Result x -> IO ()) -> a) -- ^ store MVar into table entry |
499 | -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry | 512 | -> (a -> Result x -> IO void) -- ^ load MVar from table entry |
500 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 513 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. |
501 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 514 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
502 | -> TransactionMethods (g,t a) tid addr x | 515 | -> TransactionMethods (g,t a) tid addr x |
@@ -509,7 +522,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr | |||
509 | , dispatchResponse = \tid x (g,t) -> | 522 | , dispatchResponse = \tid x (g,t) -> |
510 | case lookup tid t of | 523 | case lookup tid t of |
511 | Just v -> let t' = delete tid t | 524 | Just v -> let t' = delete tid t |
512 | in return ((g,t'),void $ load v $ Just x) | 525 | in return ((g,t'),void $ load v $ Success x) |
513 | Nothing -> return ((g,t), return ()) | 526 | Nothing -> return ((g,t), return ()) |
514 | } | 527 | } |
515 | 528 | ||
@@ -518,7 +531,7 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr | |||
518 | transactionMethods :: | 531 | transactionMethods :: |
519 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 532 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. |
520 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 533 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
521 | -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x | 534 | -> TransactionMethods (g,t (Result x -> IO ())) tid addr x |
522 | transactionMethods methods generate = transactionMethods' id id methods generate | 535 | transactionMethods methods generate = transactionMethods' id id methods generate |
523 | 536 | ||
524 | -- | Handle a single inbound packet and then invoke the given continuation. | 537 | -- | Handle a single inbound packet and then invoke the given continuation. |