summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 23:08:58 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commitb411ab66ceee7386e4829e2337c735a08fb3d54d (patch)
treeb4eaa02216188f627371154ff4e1b0a26be29623 /server
parentb2a3bbf92c67b87e491e3d8187577110e7de57fb (diff)
Support for async queries.
Diffstat (limited to 'server')
-rw-r--r--server/src/Network/QueryResponse.hs115
1 files changed, 53 insertions, 62 deletions
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs
index cb65eb47..4f14ea3c 100644
--- a/server/src/Network/QueryResponse.hs
+++ b/server/src/Network/QueryResponse.hs
@@ -323,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods
323 -- that can be used to forget the 'MVar' if the remote peer is not 323 -- that can be used to forget the 'MVar' if the remote peer is not
324 -- responding. 324 -- responding.
325 dispatchRegister :: POSIXTime -- time of expiry 325 dispatchRegister :: POSIXTime -- time of expiry
326 -> (Result x -> IO ()) -- callback upon response (or timeout) 326 -> (qid -> Result x -> IO ()) -- callback upon response (or timeout)
327 -> addr 327 -> addr
328 -> d 328 -> d
329 -> STM (qid, d) 329 -> STM (qid, d)
@@ -331,7 +331,7 @@ data TransactionMethods d qid addr x = TransactionMethods
331 -- a response to the transaction with id /qid/. The returned IO action 331 -- a response to the transaction with id /qid/. The returned IO action
332 -- will write the packet to the correct 'MVar' thus completing the 332 -- will write the packet to the correct 'MVar' thus completing the
333 -- dispatch. 333 -- dispatch.
334 , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) 334 , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ())
335 -- | When a timeout interval elapses, this method is called to remove the 335 -- | When a timeout interval elapses, this method is called to remove the
336 -- transaction from the table. 336 -- transaction from the table.
337 , dispatchCancel :: qid -> d -> STM d 337 , dispatchCancel :: qid -> d -> STM d
@@ -403,74 +403,65 @@ type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth
403microsecondsDiff :: Int -> POSIXTime 403microsecondsDiff :: Int -> POSIXTime
404microsecondsDiff us = fromIntegral us / 1000000 404microsecondsDiff us = fromIntegral us / 1000000
405 405
406asyncQuery_ :: Client err meth tid addr x 406asyncQuery :: Show meth => Client err meth qid addr x
407 -> MethodSerializer tid addr x meth a b 407 -> MethodSerializer qid addr x meth a b
408 -> a 408 -> a
409 -> addr 409 -> addr
410 -> (Result b -> IO ()) 410 -> (qid -> Result b -> IO ())
411 -> IO (tid,POSIXTime,Int) 411 -> IO qid
412asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do 412asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do
413 tm <- getSystemTimerManager
413 now <- getPOSIXTime 414 now <- getPOSIXTime
414 (tid,addr,expiry) <- atomically $ do 415 keyvar <- newEmptyMVar
416 (qid,addr,expiry) <- atomically $ do
415 tbl <- readTVar pending 417 tbl <- readTVar pending
416 (addr,expiry) <- methodTimeout meth addr0 418 (addr,expiry) <- methodTimeout meth addr0
417 (tid, tbl') <- dispatchRegister (tableMethods d) 419 (qid, tbl') <- dispatchRegister (tableMethods d)
418 (now + microsecondsDiff expiry) 420 (now + microsecondsDiff expiry)
419 (withResponse . fmap (unwrapResponse meth)) 421 (\qid result -> do
420 addr -- XXX: Should be addr0 or addr? 422 tm_key <- swapMVar keyvar Nothing
423 mapM_ (unregisterTimeout tm) tm_key `catch` (\(SomeException _) -> return ())
424 withResponse qid $ fmap (unwrapResponse meth) result)
425 addr
421 tbl 426 tbl
422 -- (addr,expiry) <- methodTimeout meth tid addr0
423 writeTVar pending tbl' 427 writeTVar pending tbl'
424 return (tid,addr,expiry) 428 return (qid,addr,expiry)
429 tm_key <- registerTimeout tm expiry $ do
430 atomically $ do
431 tbl <- readTVar pending
432 v <- dispatchCancel (tableMethods d) qid tbl
433 writeTVar pending v
434 m <- takeMVar keyvar
435 forM_ m $ \_ -> withResponse qid TimedOut
436 putMVar keyvar (Just tm_key)
425 self <- whoami (Just addr) 437 self <- whoami (Just addr)
426 mres <- do sendMessage net addr (wrapQuery meth tid self addr q) 438 afterward <- newTVarIO $ return () -- Will be overridden by cancelation handler
427 return $ Just () 439 do sendMessage net addr (wrapQuery meth qid self addr q)
428 `catchIOError` (\e -> return Nothing) 440 return ()
429 return (tid,now,expiry) 441 `catchIOError` \e -> atomically $ cancelQuery c (writeTVar afterward) qid
430 442 join $ atomically $ readTVar afterward
431asyncQuery :: Show meth => Client err meth tid addr x 443 return qid
432 -> MethodSerializer tid addr x meth a b 444
433 -> a 445cancelQuery :: ClientA err meth qid addr x y -> (IO () -> STM ()) -> qid -> STM ()
434 -> addr 446cancelQuery c@(Client net d err pending whoami _) runIO qid = do
435 -> (Result b -> IO ()) 447 tbl <- readTVar pending
436 -> IO () 448 (tbl', io) <- dispatchResponse (tableMethods d) qid Canceled tbl
437asyncQuery client meth q addr withResponse0 = do 449 writeTVar pending tbl'
438 tm <- getSystemTimerManager 450 runIO io
439 tidvar <- newEmptyMVar
440 timedout <- registerTimeout tm 1000000 $ do
441 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
442 withResponse0 TimedOut
443 tid <- takeMVar tidvar
444 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
445 case client of
446 Client { clientDispatcher = d, clientPending = pending } -> do
447 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
448 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
449 unregisterTimeout tm timedout
450 withResponse0 x
451 putMVar tidvar tid
452 updateTimeout tm timedout expiry
453 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
454 451
455-- | Send a query to a remote peer. Note that this function will always time 452-- | Send a query to a remote peer. Note that this function will always time
456-- out if 'forkListener' was never invoked to spawn a thread to receive and 453-- out if 'forkListener' was never invoked to spawn a thread to receive and
457-- dispatch the response. 454-- dispatch the response.
458sendQuery :: 455sendQuery :: Show meth =>
459 forall err a b tbl x meth tid addr.
460 Client err meth tid addr x -- ^ A query/response implementation. 456 Client err meth tid addr x -- ^ A query/response implementation.
461 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. 457 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
462 -> a -- ^ The outbound query. 458 -> a -- ^ The outbound query.
463 -> addr -- ^ Destination address of query. 459 -> addr -- ^ Destination address of query.
464 -> IO (Result b) -- ^ The response or failure condition. 460 -> IO (Result b) -- ^ The response or failure condition.
465sendQuery c@(Client net d err pending whoami _) meth q addr0 = do 461sendQuery c meth q addr0 = do
466 mvar <- newEmptyMVar 462 got <- newEmptyMVar
467 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) 463 tid <- asyncQuery c meth q addr0 $ \qid r -> putMVar got r
468 mres <- timeout expiry $ takeMVar mvar 464 takeMVar got
469 case mres of
470 Just b -> return $ Success b
471 Nothing -> do
472 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
473 return TimedOut
474 465
475contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x 466contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
476contramapAddr f (MethodHandler p s a) 467contramapAddr f (MethodHandler p s a)
@@ -508,11 +499,11 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr =
508-- table of pending transactions. This also enables multiple 'Client's to 499-- table of pending transactions. This also enables multiple 'Client's to
509-- share a single transaction table. 500-- share a single transaction table.
510transactionMethods' :: 501transactionMethods' ::
511 ((Result x -> IO ()) -> a) -- ^ store MVar into table entry 502 ((qid -> Result x -> IO ()) -> a) -- ^ store MVar into table entry
512 -> (a -> Result x -> IO void) -- ^ load MVar from table entry 503 -> (a -> qid -> Result x -> IO void) -- ^ load MVar from table entry
513 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. 504 -> TableMethods t qid -- ^ Table methods to lookup values by /tid/.
514 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 505 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
515 -> TransactionMethods (g,t a) tid addr x 506 -> TransactionMethods (g,t a) qid addr x
516transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods 507transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
517 { dispatchCancel = \tid (g,t) -> return (g, delete tid t) 508 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
518 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do 509 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
@@ -522,16 +513,16 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr
522 , dispatchResponse = \tid x (g,t) -> 513 , dispatchResponse = \tid x (g,t) ->
523 case lookup tid t of 514 case lookup tid t of
524 Just v -> let t' = delete tid t 515 Just v -> let t' = delete tid t
525 in return ((g,t'),void $ load v $ Success x) 516 in return ((g,t'),void $ load v tid x)
526 Nothing -> return ((g,t), return ()) 517 Nothing -> return ((g,t), return ())
527 } 518 }
528 519
529-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a 520-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
530-- function for generating unique transaction ids. 521-- function for generating unique transaction ids.
531transactionMethods :: 522transactionMethods ::
532 TableMethods t tid -- ^ Table methods to lookup values by /tid/. 523 TableMethods t qid -- ^ Table methods to lookup values by /tid/.
533 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 524 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
534 -> TransactionMethods (g,t (Result x -> IO ())) tid addr x 525 -> TransactionMethods (g,t (qid -> Result x -> IO ())) qid addr x
535transactionMethods methods generate = transactionMethods' id id methods generate 526transactionMethods methods generate = transactionMethods' id id methods generate
536 527
537-- | Handle a single inbound packet and then invoke the given continuation. 528-- | Handle a single inbound packet and then invoke the given continuation.
@@ -566,7 +557,7 @@ handleMessage (Client net d err pending whoami responseID) addr plain = do
566 IsResponse tid -> do 557 IsResponse tid -> do
567 action <- atomically $ do 558 action <- atomically $ do
568 ts0 <- readTVar pending 559 ts0 <- readTVar pending
569 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 560 (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0
570 writeTVar pending ts 561 writeTVar pending ts
571 return action 562 return action
572 action 563 action