diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 23:08:58 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-07 13:24:59 -0500 |
commit | b411ab66ceee7386e4829e2337c735a08fb3d54d (patch) | |
tree | b4eaa02216188f627371154ff4e1b0a26be29623 | |
parent | b2a3bbf92c67b87e491e3d8187577110e7de57fb (diff) |
Support for async queries.
-rw-r--r-- | dht/src/Network/Tox/Onion/Routes.hs | 2 | ||||
-rw-r--r-- | dht/src/Network/Tox/TCP.hs | 6 | ||||
-rw-r--r-- | server/src/Network/QueryResponse.hs | 115 |
3 files changed, 57 insertions, 66 deletions
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs index 7c11227a..9ce4e316 100644 --- a/dht/src/Network/Tox/Onion/Routes.hs +++ b/dht/src/Network/Tox/Onion/Routes.hs | |||
@@ -171,7 +171,7 @@ newOnionRouter crypto perror tcp_enabled = do | |||
171 | ((tbl,(tcptbl,tcpcons,relaynet,onionnet)),tcp) <- do | 171 | ((tbl,(tcptbl,tcpcons,relaynet,onionnet)),tcp) <- do |
172 | (tcptbl, client) <- TCP.newClient crypto | 172 | (tcptbl, client) <- TCP.newClient crypto |
173 | id | 173 | id |
174 | (. (Success . (,) False)) | 174 | (\x -> \qid -> x qid . Success . (,) False) |
175 | (lookupSender' pq rlog) | 175 | (lookupSender' pq rlog) |
176 | (\_ (RouteId rid) -> atomically $ fmap storedRoute <$> readArray rm rid) | 176 | (\_ (RouteId rid) -> atomically $ fmap storedRoute <$> readArray rm rid) |
177 | 177 | ||
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs index 0850ce51..385da35b 100644 --- a/dht/src/Network/Tox/TCP.hs +++ b/dht/src/Network/Tox/TCP.hs | |||
@@ -319,8 +319,8 @@ type RelayCache = TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacke | |||
319 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state | 319 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state |
320 | -- will be returned to the caller along with the new client. | 320 | -- will be returned to the caller along with the new client. |
321 | newClient :: TransportCrypto | 321 | newClient :: TransportCrypto |
322 | -> ((QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query | 322 | -> ((Nonce8 -> QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query |
323 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for relay query | 323 | -> (a -> Nonce8 -> RelayPacket -> IO void) -- ^ load mvar for relay query |
324 | -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query | 324 | -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query |
325 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id | 325 | -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id |
326 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 326 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
@@ -361,7 +361,7 @@ newClient crypto store load lookupSender getRoute = do | |||
361 | } | 361 | } |
362 | , tableMethods = transactionMethods' | 362 | , tableMethods = transactionMethods' |
363 | store | 363 | store |
364 | (\x -> mapM_ (load x . snd)) | 364 | (\qid x -> mapM_ (load qid x . snd)) |
365 | (contramap (\(Nonce8 w64) -> w64) w64MapMethods) | 365 | (contramap (\(Nonce8 w64) -> w64) w64MapMethods) |
366 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 | 366 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 |
367 | } | 367 | } |
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs index cb65eb47..4f14ea3c 100644 --- a/server/src/Network/QueryResponse.hs +++ b/server/src/Network/QueryResponse.hs | |||
@@ -323,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods | |||
323 | -- that can be used to forget the 'MVar' if the remote peer is not | 323 | -- that can be used to forget the 'MVar' if the remote peer is not |
324 | -- responding. | 324 | -- responding. |
325 | dispatchRegister :: POSIXTime -- time of expiry | 325 | dispatchRegister :: POSIXTime -- time of expiry |
326 | -> (Result x -> IO ()) -- callback upon response (or timeout) | 326 | -> (qid -> Result x -> IO ()) -- callback upon response (or timeout) |
327 | -> addr | 327 | -> addr |
328 | -> d | 328 | -> d |
329 | -> STM (qid, d) | 329 | -> STM (qid, d) |
@@ -331,7 +331,7 @@ data TransactionMethods d qid addr x = TransactionMethods | |||
331 | -- a response to the transaction with id /qid/. The returned IO action | 331 | -- a response to the transaction with id /qid/. The returned IO action |
332 | -- will write the packet to the correct 'MVar' thus completing the | 332 | -- will write the packet to the correct 'MVar' thus completing the |
333 | -- dispatch. | 333 | -- dispatch. |
334 | , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) | 334 | , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ()) |
335 | -- | When a timeout interval elapses, this method is called to remove the | 335 | -- | When a timeout interval elapses, this method is called to remove the |
336 | -- transaction from the table. | 336 | -- transaction from the table. |
337 | , dispatchCancel :: qid -> d -> STM d | 337 | , dispatchCancel :: qid -> d -> STM d |
@@ -403,74 +403,65 @@ type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth | |||
403 | microsecondsDiff :: Int -> POSIXTime | 403 | microsecondsDiff :: Int -> POSIXTime |
404 | microsecondsDiff us = fromIntegral us / 1000000 | 404 | microsecondsDiff us = fromIntegral us / 1000000 |
405 | 405 | ||
406 | asyncQuery_ :: Client err meth tid addr x | 406 | asyncQuery :: Show meth => Client err meth qid addr x |
407 | -> MethodSerializer tid addr x meth a b | 407 | -> MethodSerializer qid addr x meth a b |
408 | -> a | 408 | -> a |
409 | -> addr | 409 | -> addr |
410 | -> (Result b -> IO ()) | 410 | -> (qid -> Result b -> IO ()) |
411 | -> IO (tid,POSIXTime,Int) | 411 | -> IO qid |
412 | asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do | 412 | asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do |
413 | tm <- getSystemTimerManager | ||
413 | now <- getPOSIXTime | 414 | now <- getPOSIXTime |
414 | (tid,addr,expiry) <- atomically $ do | 415 | keyvar <- newEmptyMVar |
416 | (qid,addr,expiry) <- atomically $ do | ||
415 | tbl <- readTVar pending | 417 | tbl <- readTVar pending |
416 | (addr,expiry) <- methodTimeout meth addr0 | 418 | (addr,expiry) <- methodTimeout meth addr0 |
417 | (tid, tbl') <- dispatchRegister (tableMethods d) | 419 | (qid, tbl') <- dispatchRegister (tableMethods d) |
418 | (now + microsecondsDiff expiry) | 420 | (now + microsecondsDiff expiry) |
419 | (withResponse . fmap (unwrapResponse meth)) | 421 | (\qid result -> do |
420 | addr -- XXX: Should be addr0 or addr? | 422 | tm_key <- swapMVar keyvar Nothing |
423 | mapM_ (unregisterTimeout tm) tm_key `catch` (\(SomeException _) -> return ()) | ||
424 | withResponse qid $ fmap (unwrapResponse meth) result) | ||
425 | addr | ||
421 | tbl | 426 | tbl |
422 | -- (addr,expiry) <- methodTimeout meth tid addr0 | ||
423 | writeTVar pending tbl' | 427 | writeTVar pending tbl' |
424 | return (tid,addr,expiry) | 428 | return (qid,addr,expiry) |
429 | tm_key <- registerTimeout tm expiry $ do | ||
430 | atomically $ do | ||
431 | tbl <- readTVar pending | ||
432 | v <- dispatchCancel (tableMethods d) qid tbl | ||
433 | writeTVar pending v | ||
434 | m <- takeMVar keyvar | ||
435 | forM_ m $ \_ -> withResponse qid TimedOut | ||
436 | putMVar keyvar (Just tm_key) | ||
425 | self <- whoami (Just addr) | 437 | self <- whoami (Just addr) |
426 | mres <- do sendMessage net addr (wrapQuery meth tid self addr q) | 438 | afterward <- newTVarIO $ return () -- Will be overridden by cancelation handler |
427 | return $ Just () | 439 | do sendMessage net addr (wrapQuery meth qid self addr q) |
428 | `catchIOError` (\e -> return Nothing) | 440 | return () |
429 | return (tid,now,expiry) | 441 | `catchIOError` \e -> atomically $ cancelQuery c (writeTVar afterward) qid |
430 | 442 | join $ atomically $ readTVar afterward | |
431 | asyncQuery :: Show meth => Client err meth tid addr x | 443 | return qid |
432 | -> MethodSerializer tid addr x meth a b | 444 | |
433 | -> a | 445 | cancelQuery :: ClientA err meth qid addr x y -> (IO () -> STM ()) -> qid -> STM () |
434 | -> addr | 446 | cancelQuery c@(Client net d err pending whoami _) runIO qid = do |
435 | -> (Result b -> IO ()) | 447 | tbl <- readTVar pending |
436 | -> IO () | 448 | (tbl', io) <- dispatchResponse (tableMethods d) qid Canceled tbl |
437 | asyncQuery client meth q addr withResponse0 = do | 449 | writeTVar pending tbl' |
438 | tm <- getSystemTimerManager | 450 | runIO io |
439 | tidvar <- newEmptyMVar | ||
440 | timedout <- registerTimeout tm 1000000 $ do | ||
441 | dput XMisc $ "async TIMEDOUT " ++ show (method meth) | ||
442 | withResponse0 TimedOut | ||
443 | tid <- takeMVar tidvar | ||
444 | dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth) | ||
445 | case client of | ||
446 | Client { clientDispatcher = d, clientPending = pending } -> do | ||
447 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
448 | (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do | ||
449 | unregisterTimeout tm timedout | ||
450 | withResponse0 x | ||
451 | putMVar tidvar tid | ||
452 | updateTimeout tm timedout expiry | ||
453 | dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry | ||
454 | 451 | ||
455 | -- | Send a query to a remote peer. Note that this function will always time | 452 | -- | Send a query to a remote peer. Note that this function will always time |
456 | -- out if 'forkListener' was never invoked to spawn a thread to receive and | 453 | -- out if 'forkListener' was never invoked to spawn a thread to receive and |
457 | -- dispatch the response. | 454 | -- dispatch the response. |
458 | sendQuery :: | 455 | sendQuery :: Show meth => |
459 | forall err a b tbl x meth tid addr. | ||
460 | Client err meth tid addr x -- ^ A query/response implementation. | 456 | Client err meth tid addr x -- ^ A query/response implementation. |
461 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. | 457 | -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. |
462 | -> a -- ^ The outbound query. | 458 | -> a -- ^ The outbound query. |
463 | -> addr -- ^ Destination address of query. | 459 | -> addr -- ^ Destination address of query. |
464 | -> IO (Result b) -- ^ The response or failure condition. | 460 | -> IO (Result b) -- ^ The response or failure condition. |
465 | sendQuery c@(Client net d err pending whoami _) meth q addr0 = do | 461 | sendQuery c meth q addr0 = do |
466 | mvar <- newEmptyMVar | 462 | got <- newEmptyMVar |
467 | (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) | 463 | tid <- asyncQuery c meth q addr0 $ \qid r -> putMVar got r |
468 | mres <- timeout expiry $ takeMVar mvar | 464 | takeMVar got |
469 | case mres of | ||
470 | Just b -> return $ Success b | ||
471 | Nothing -> do | ||
472 | atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending | ||
473 | return TimedOut | ||
474 | 465 | ||
475 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x | 466 | contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x |
476 | contramapAddr f (MethodHandler p s a) | 467 | contramapAddr f (MethodHandler p s a) |
@@ -508,11 +499,11 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr = | |||
508 | -- table of pending transactions. This also enables multiple 'Client's to | 499 | -- table of pending transactions. This also enables multiple 'Client's to |
509 | -- share a single transaction table. | 500 | -- share a single transaction table. |
510 | transactionMethods' :: | 501 | transactionMethods' :: |
511 | ((Result x -> IO ()) -> a) -- ^ store MVar into table entry | 502 | ((qid -> Result x -> IO ()) -> a) -- ^ store MVar into table entry |
512 | -> (a -> Result x -> IO void) -- ^ load MVar from table entry | 503 | -> (a -> qid -> Result x -> IO void) -- ^ load MVar from table entry |
513 | -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 504 | -> TableMethods t qid -- ^ Table methods to lookup values by /tid/. |
514 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 505 | -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
515 | -> TransactionMethods (g,t a) tid addr x | 506 | -> TransactionMethods (g,t a) qid addr x |
516 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods | 507 | transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods |
517 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) | 508 | { dispatchCancel = \tid (g,t) -> return (g, delete tid t) |
518 | , dispatchRegister = \nowPlusExpiry v a (g,t) -> do | 509 | , dispatchRegister = \nowPlusExpiry v a (g,t) -> do |
@@ -522,16 +513,16 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr | |||
522 | , dispatchResponse = \tid x (g,t) -> | 513 | , dispatchResponse = \tid x (g,t) -> |
523 | case lookup tid t of | 514 | case lookup tid t of |
524 | Just v -> let t' = delete tid t | 515 | Just v -> let t' = delete tid t |
525 | in return ((g,t'),void $ load v $ Success x) | 516 | in return ((g,t'),void $ load v tid x) |
526 | Nothing -> return ((g,t), return ()) | 517 | Nothing -> return ((g,t), return ()) |
527 | } | 518 | } |
528 | 519 | ||
529 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a | 520 | -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a |
530 | -- function for generating unique transaction ids. | 521 | -- function for generating unique transaction ids. |
531 | transactionMethods :: | 522 | transactionMethods :: |
532 | TableMethods t tid -- ^ Table methods to lookup values by /tid/. | 523 | TableMethods t qid -- ^ Table methods to lookup values by /tid/. |
533 | -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. | 524 | -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. |
534 | -> TransactionMethods (g,t (Result x -> IO ())) tid addr x | 525 | -> TransactionMethods (g,t (qid -> Result x -> IO ())) qid addr x |
535 | transactionMethods methods generate = transactionMethods' id id methods generate | 526 | transactionMethods methods generate = transactionMethods' id id methods generate |
536 | 527 | ||
537 | -- | Handle a single inbound packet and then invoke the given continuation. | 528 | -- | Handle a single inbound packet and then invoke the given continuation. |
@@ -566,7 +557,7 @@ handleMessage (Client net d err pending whoami responseID) addr plain = do | |||
566 | IsResponse tid -> do | 557 | IsResponse tid -> do |
567 | action <- atomically $ do | 558 | action <- atomically $ do |
568 | ts0 <- readTVar pending | 559 | ts0 <- readTVar pending |
569 | (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 | 560 | (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0 |
570 | writeTVar pending ts | 561 | writeTVar pending ts |
571 | return action | 562 | return action |
572 | action | 563 | action |