From b411ab66ceee7386e4829e2337c735a08fb3d54d Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 3 Jan 2020 23:08:58 -0500 Subject: Support for async queries. --- server/src/Network/QueryResponse.hs | 115 +++++++++++++++++------------------- 1 file changed, 53 insertions(+), 62 deletions(-) (limited to 'server') diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index cb65eb47..4f14ea3c 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs @@ -323,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods -- that can be used to forget the 'MVar' if the remote peer is not -- responding. dispatchRegister :: POSIXTime -- time of expiry - -> (Result x -> IO ()) -- callback upon response (or timeout) + -> (qid -> Result x -> IO ()) -- callback upon response (or timeout) -> addr -> d -> STM (qid, d) @@ -331,7 +331,7 @@ data TransactionMethods d qid addr x = TransactionMethods -- a response to the transaction with id /qid/. The returned IO action -- will write the packet to the correct 'MVar' thus completing the -- dispatch. - , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) + , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ()) -- | When a timeout interval elapses, this method is called to remove the -- transaction from the table. , dispatchCancel :: qid -> d -> STM d @@ -403,74 +403,65 @@ type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth microsecondsDiff :: Int -> POSIXTime microsecondsDiff us = fromIntegral us / 1000000 -asyncQuery_ :: Client err meth tid addr x - -> MethodSerializer tid addr x meth a b +asyncQuery :: Show meth => Client err meth qid addr x + -> MethodSerializer qid addr x meth a b -> a -> addr - -> (Result b -> IO ()) - -> IO (tid,POSIXTime,Int) -asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do + -> (qid -> Result b -> IO ()) + -> IO qid +asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do + tm <- getSystemTimerManager now <- getPOSIXTime - (tid,addr,expiry) <- atomically $ do + keyvar <- newEmptyMVar + (qid,addr,expiry) <- atomically $ do tbl <- readTVar pending (addr,expiry) <- methodTimeout meth addr0 - (tid, tbl') <- dispatchRegister (tableMethods d) + (qid, tbl') <- dispatchRegister (tableMethods d) (now + microsecondsDiff expiry) - (withResponse . fmap (unwrapResponse meth)) - addr -- XXX: Should be addr0 or addr? + (\qid result -> do + tm_key <- swapMVar keyvar Nothing + mapM_ (unregisterTimeout tm) tm_key `catch` (\(SomeException _) -> return ()) + withResponse qid $ fmap (unwrapResponse meth) result) + addr tbl - -- (addr,expiry) <- methodTimeout meth tid addr0 writeTVar pending tbl' - return (tid,addr,expiry) + return (qid,addr,expiry) + tm_key <- registerTimeout tm expiry $ do + atomically $ do + tbl <- readTVar pending + v <- dispatchCancel (tableMethods d) qid tbl + writeTVar pending v + m <- takeMVar keyvar + forM_ m $ \_ -> withResponse qid TimedOut + putMVar keyvar (Just tm_key) self <- whoami (Just addr) - mres <- do sendMessage net addr (wrapQuery meth tid self addr q) - return $ Just () - `catchIOError` (\e -> return Nothing) - return (tid,now,expiry) - -asyncQuery :: Show meth => Client err meth tid addr x - -> MethodSerializer tid addr x meth a b - -> a - -> addr - -> (Result b -> IO ()) - -> IO () -asyncQuery client meth q addr withResponse0 = do - tm <- getSystemTimerManager - tidvar <- newEmptyMVar - timedout <- registerTimeout tm 1000000 $ do - dput XMisc $ "async TIMEDOUT " ++ show (method meth) - withResponse0 TimedOut - tid <- takeMVar tidvar - dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) - case client of - Client { clientDispatcher = d, clientPending = pending } -> do - atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending - (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do - unregisterTimeout tm timedout - withResponse0 x - putMVar tidvar tid - updateTimeout tm timedout expiry - dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry + afterward <- newTVarIO $ return () -- Will be overridden by cancelation handler + do sendMessage net addr (wrapQuery meth qid self addr q) + return () + `catchIOError` \e -> atomically $ cancelQuery c (writeTVar afterward) qid + join $ atomically $ readTVar afterward + return qid + +cancelQuery :: ClientA err meth qid addr x y -> (IO () -> STM ()) -> qid -> STM () +cancelQuery c@(Client net d err pending whoami _) runIO qid = do + tbl <- readTVar pending + (tbl', io) <- dispatchResponse (tableMethods d) qid Canceled tbl + writeTVar pending tbl' + runIO io -- | Send a query to a remote peer. Note that this function will always time -- out if 'forkListener' was never invoked to spawn a thread to receive and -- dispatch the response. -sendQuery :: - forall err a b tbl x meth tid addr. +sendQuery :: Show meth => Client err meth tid addr x -- ^ A query/response implementation. -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. -> IO (Result b) -- ^ The response or failure condition. -sendQuery c@(Client net d err pending whoami _) meth q addr0 = do - mvar <- newEmptyMVar - (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) - mres <- timeout expiry $ takeMVar mvar - case mres of - Just b -> return $ Success b - Nothing -> do - atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending - return TimedOut +sendQuery c meth q addr0 = do + got <- newEmptyMVar + tid <- asyncQuery c meth q addr0 $ \qid r -> putMVar got r + takeMVar got contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) @@ -508,11 +499,11 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr = -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. transactionMethods' :: - ((Result x -> IO ()) -> a) -- ^ store MVar into table entry - -> (a -> Result x -> IO void) -- ^ load MVar from table entry - -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. - -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t a) tid addr x + ((qid -> Result x -> IO ()) -> a) -- ^ store MVar into table entry + -> (a -> qid -> Result x -> IO void) -- ^ load MVar from table entry + -> TableMethods t qid -- ^ Table methods to lookup values by /tid/. + -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. + -> TransactionMethods (g,t a) qid addr x transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods { dispatchCancel = \tid (g,t) -> return (g, delete tid t) , dispatchRegister = \nowPlusExpiry v a (g,t) -> do @@ -522,16 +513,16 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t - in return ((g,t'),void $ load v $ Success x) + in return ((g,t'),void $ load v tid x) Nothing -> return ((g,t), return ()) } -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a -- function for generating unique transaction ids. transactionMethods :: - TableMethods t tid -- ^ Table methods to lookup values by /tid/. - -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t (Result x -> IO ())) tid addr x + TableMethods t qid -- ^ Table methods to lookup values by /tid/. + -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. + -> TransactionMethods (g,t (qid -> Result x -> IO ())) qid addr x transactionMethods methods generate = transactionMethods' id id methods generate -- | Handle a single inbound packet and then invoke the given continuation. @@ -566,7 +557,7 @@ handleMessage (Client net d err pending whoami responseID) addr plain = do IsResponse tid -> do action <- atomically $ do ts0 <- readTVar pending - (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 + (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0 writeTVar pending ts return action action -- cgit v1.2.3