summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 23:08:58 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-07 13:24:59 -0500
commitb411ab66ceee7386e4829e2337c735a08fb3d54d (patch)
treeb4eaa02216188f627371154ff4e1b0a26be29623
parentb2a3bbf92c67b87e491e3d8187577110e7de57fb (diff)
Support for async queries.
-rw-r--r--dht/src/Network/Tox/Onion/Routes.hs2
-rw-r--r--dht/src/Network/Tox/TCP.hs6
-rw-r--r--server/src/Network/QueryResponse.hs115
3 files changed, 57 insertions, 66 deletions
diff --git a/dht/src/Network/Tox/Onion/Routes.hs b/dht/src/Network/Tox/Onion/Routes.hs
index 7c11227a..9ce4e316 100644
--- a/dht/src/Network/Tox/Onion/Routes.hs
+++ b/dht/src/Network/Tox/Onion/Routes.hs
@@ -171,7 +171,7 @@ newOnionRouter crypto perror tcp_enabled = do
171 ((tbl,(tcptbl,tcpcons,relaynet,onionnet)),tcp) <- do 171 ((tbl,(tcptbl,tcpcons,relaynet,onionnet)),tcp) <- do
172 (tcptbl, client) <- TCP.newClient crypto 172 (tcptbl, client) <- TCP.newClient crypto
173 id 173 id
174 (. (Success . (,) False)) 174 (\x -> \qid -> x qid . Success . (,) False)
175 (lookupSender' pq rlog) 175 (lookupSender' pq rlog)
176 (\_ (RouteId rid) -> atomically $ fmap storedRoute <$> readArray rm rid) 176 (\_ (RouteId rid) -> atomically $ fmap storedRoute <$> readArray rm rid)
177 177
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs
index 0850ce51..385da35b 100644
--- a/dht/src/Network/Tox/TCP.hs
+++ b/dht/src/Network/Tox/TCP.hs
@@ -319,8 +319,8 @@ type RelayCache = TCPCache (SessionProtocol (SessionData,RelayPacket) RelayPacke
319-- defaults are 'id' and 'tryPutMVar'. The resulting customized table state 319-- defaults are 'id' and 'tryPutMVar'. The resulting customized table state
320-- will be returned to the caller along with the new client. 320-- will be returned to the caller along with the new client.
321newClient :: TransportCrypto 321newClient :: TransportCrypto
322 -> ((QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query 322 -> ((Nonce8 -> QR.Result (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for relay query
323 -> (a -> RelayPacket -> IO void) -- ^ load mvar for relay query 323 -> (a -> Nonce8 -> RelayPacket -> IO void) -- ^ load mvar for relay query
324 -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query 324 -> (SockAddr -> Nonce8 -> IO (Maybe (OnionDestination RouteId))) -- ^ lookup sender of onion query
325 -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id 325 -> (UDP.NodeInfo -> RouteId -> IO (Maybe OnionRoute)) -- ^ lookup OnionRoute by id
326 -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) 326 -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a)
@@ -361,7 +361,7 @@ newClient crypto store load lookupSender getRoute = do
361 } 361 }
362 , tableMethods = transactionMethods' 362 , tableMethods = transactionMethods'
363 store 363 store
364 (\x -> mapM_ (load x . snd)) 364 (\qid x -> mapM_ (load qid x . snd))
365 (contramap (\(Nonce8 w64) -> w64) w64MapMethods) 365 (contramap (\(Nonce8 w64) -> w64) w64MapMethods)
366 $ first (either error Nonce8 . decode) . randomBytesGenerate 8 366 $ first (either error Nonce8 . decode) . randomBytesGenerate 8
367 } 367 }
diff --git a/server/src/Network/QueryResponse.hs b/server/src/Network/QueryResponse.hs
index cb65eb47..4f14ea3c 100644
--- a/server/src/Network/QueryResponse.hs
+++ b/server/src/Network/QueryResponse.hs
@@ -323,7 +323,7 @@ data TransactionMethods d qid addr x = TransactionMethods
323 -- that can be used to forget the 'MVar' if the remote peer is not 323 -- that can be used to forget the 'MVar' if the remote peer is not
324 -- responding. 324 -- responding.
325 dispatchRegister :: POSIXTime -- time of expiry 325 dispatchRegister :: POSIXTime -- time of expiry
326 -> (Result x -> IO ()) -- callback upon response (or timeout) 326 -> (qid -> Result x -> IO ()) -- callback upon response (or timeout)
327 -> addr 327 -> addr
328 -> d 328 -> d
329 -> STM (qid, d) 329 -> STM (qid, d)
@@ -331,7 +331,7 @@ data TransactionMethods d qid addr x = TransactionMethods
331 -- a response to the transaction with id /qid/. The returned IO action 331 -- a response to the transaction with id /qid/. The returned IO action
332 -- will write the packet to the correct 'MVar' thus completing the 332 -- will write the packet to the correct 'MVar' thus completing the
333 -- dispatch. 333 -- dispatch.
334 , dispatchResponse :: qid -> x -> d -> STM (d, IO ()) 334 , dispatchResponse :: qid -> Result x -> d -> STM (d, IO ())
335 -- | When a timeout interval elapses, this method is called to remove the 335 -- | When a timeout interval elapses, this method is called to remove the
336 -- transaction from the table. 336 -- transaction from the table.
337 , dispatchCancel :: qid -> d -> STM d 337 , dispatchCancel :: qid -> d -> STM d
@@ -403,74 +403,65 @@ type MethodSerializer tid addr x meth a b = MethodSerializerA tid addr x x meth
403microsecondsDiff :: Int -> POSIXTime 403microsecondsDiff :: Int -> POSIXTime
404microsecondsDiff us = fromIntegral us / 1000000 404microsecondsDiff us = fromIntegral us / 1000000
405 405
406asyncQuery_ :: Client err meth tid addr x 406asyncQuery :: Show meth => Client err meth qid addr x
407 -> MethodSerializer tid addr x meth a b 407 -> MethodSerializer qid addr x meth a b
408 -> a 408 -> a
409 -> addr 409 -> addr
410 -> (Result b -> IO ()) 410 -> (qid -> Result b -> IO ())
411 -> IO (tid,POSIXTime,Int) 411 -> IO qid
412asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do 412asyncQuery c@(Client net d err pending whoami _) meth q addr0 withResponse = do
413 tm <- getSystemTimerManager
413 now <- getPOSIXTime 414 now <- getPOSIXTime
414 (tid,addr,expiry) <- atomically $ do 415 keyvar <- newEmptyMVar
416 (qid,addr,expiry) <- atomically $ do
415 tbl <- readTVar pending 417 tbl <- readTVar pending
416 (addr,expiry) <- methodTimeout meth addr0 418 (addr,expiry) <- methodTimeout meth addr0
417 (tid, tbl') <- dispatchRegister (tableMethods d) 419 (qid, tbl') <- dispatchRegister (tableMethods d)
418 (now + microsecondsDiff expiry) 420 (now + microsecondsDiff expiry)
419 (withResponse . fmap (unwrapResponse meth)) 421 (\qid result -> do
420 addr -- XXX: Should be addr0 or addr? 422 tm_key <- swapMVar keyvar Nothing
423 mapM_ (unregisterTimeout tm) tm_key `catch` (\(SomeException _) -> return ())
424 withResponse qid $ fmap (unwrapResponse meth) result)
425 addr
421 tbl 426 tbl
422 -- (addr,expiry) <- methodTimeout meth tid addr0
423 writeTVar pending tbl' 427 writeTVar pending tbl'
424 return (tid,addr,expiry) 428 return (qid,addr,expiry)
429 tm_key <- registerTimeout tm expiry $ do
430 atomically $ do
431 tbl <- readTVar pending
432 v <- dispatchCancel (tableMethods d) qid tbl
433 writeTVar pending v
434 m <- takeMVar keyvar
435 forM_ m $ \_ -> withResponse qid TimedOut
436 putMVar keyvar (Just tm_key)
425 self <- whoami (Just addr) 437 self <- whoami (Just addr)
426 mres <- do sendMessage net addr (wrapQuery meth tid self addr q) 438 afterward <- newTVarIO $ return () -- Will be overridden by cancelation handler
427 return $ Just () 439 do sendMessage net addr (wrapQuery meth qid self addr q)
428 `catchIOError` (\e -> return Nothing) 440 return ()
429 return (tid,now,expiry) 441 `catchIOError` \e -> atomically $ cancelQuery c (writeTVar afterward) qid
430 442 join $ atomically $ readTVar afterward
431asyncQuery :: Show meth => Client err meth tid addr x 443 return qid
432 -> MethodSerializer tid addr x meth a b 444
433 -> a 445cancelQuery :: ClientA err meth qid addr x y -> (IO () -> STM ()) -> qid -> STM ()
434 -> addr 446cancelQuery c@(Client net d err pending whoami _) runIO qid = do
435 -> (Result b -> IO ()) 447 tbl <- readTVar pending
436 -> IO () 448 (tbl', io) <- dispatchResponse (tableMethods d) qid Canceled tbl
437asyncQuery client meth q addr withResponse0 = do 449 writeTVar pending tbl'
438 tm <- getSystemTimerManager 450 runIO io
439 tidvar <- newEmptyMVar
440 timedout <- registerTimeout tm 1000000 $ do
441 dput XMisc $ "async TIMEDOUT " ++ show (method meth)
442 withResponse0 TimedOut
443 tid <- takeMVar tidvar
444 dput XMisc $ "async TIMEDOUT mvar " ++ show (method meth)
445 case client of
446 Client { clientDispatcher = d, clientPending = pending } -> do
447 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
448 (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do
449 unregisterTimeout tm timedout
450 withResponse0 x
451 putMVar tidvar tid
452 updateTimeout tm timedout expiry
453 dput XMisc $ "FIN asyncQuery "++show (method meth)++" TIMEOUT="++show expiry
454 451
455-- | Send a query to a remote peer. Note that this function will always time 452-- | Send a query to a remote peer. Note that this function will always time
456-- out if 'forkListener' was never invoked to spawn a thread to receive and 453-- out if 'forkListener' was never invoked to spawn a thread to receive and
457-- dispatch the response. 454-- dispatch the response.
458sendQuery :: 455sendQuery :: Show meth =>
459 forall err a b tbl x meth tid addr.
460 Client err meth tid addr x -- ^ A query/response implementation. 456 Client err meth tid addr x -- ^ A query/response implementation.
461 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. 457 -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query.
462 -> a -- ^ The outbound query. 458 -> a -- ^ The outbound query.
463 -> addr -- ^ Destination address of query. 459 -> addr -- ^ Destination address of query.
464 -> IO (Result b) -- ^ The response or failure condition. 460 -> IO (Result b) -- ^ The response or failure condition.
465sendQuery c@(Client net d err pending whoami _) meth q addr0 = do 461sendQuery c meth q addr0 = do
466 mvar <- newEmptyMVar 462 got <- newEmptyMVar
467 (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) 463 tid <- asyncQuery c meth q addr0 $ \qid r -> putMVar got r
468 mres <- timeout expiry $ takeMVar mvar 464 takeMVar got
469 case mres of
470 Just b -> return $ Success b
471 Nothing -> do
472 atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending
473 return TimedOut
474 465
475contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x 466contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x
476contramapAddr f (MethodHandler p s a) 467contramapAddr f (MethodHandler p s a)
@@ -508,11 +499,11 @@ dispatchQuery (NoReply unwrapQ f) tid self x addr =
508-- table of pending transactions. This also enables multiple 'Client's to 499-- table of pending transactions. This also enables multiple 'Client's to
509-- share a single transaction table. 500-- share a single transaction table.
510transactionMethods' :: 501transactionMethods' ::
511 ((Result x -> IO ()) -> a) -- ^ store MVar into table entry 502 ((qid -> Result x -> IO ()) -> a) -- ^ store MVar into table entry
512 -> (a -> Result x -> IO void) -- ^ load MVar from table entry 503 -> (a -> qid -> Result x -> IO void) -- ^ load MVar from table entry
513 -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. 504 -> TableMethods t qid -- ^ Table methods to lookup values by /tid/.
514 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 505 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
515 -> TransactionMethods (g,t a) tid addr x 506 -> TransactionMethods (g,t a) qid addr x
516transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods 507transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods
517 { dispatchCancel = \tid (g,t) -> return (g, delete tid t) 508 { dispatchCancel = \tid (g,t) -> return (g, delete tid t)
518 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do 509 , dispatchRegister = \nowPlusExpiry v a (g,t) -> do
@@ -522,16 +513,16 @@ transactionMethods' store load (TableMethods insert delete lookup) generate = Tr
522 , dispatchResponse = \tid x (g,t) -> 513 , dispatchResponse = \tid x (g,t) ->
523 case lookup tid t of 514 case lookup tid t of
524 Just v -> let t' = delete tid t 515 Just v -> let t' = delete tid t
525 in return ((g,t'),void $ load v $ Success x) 516 in return ((g,t'),void $ load v tid x)
526 Nothing -> return ((g,t), return ()) 517 Nothing -> return ((g,t), return ())
527 } 518 }
528 519
529-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a 520-- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a
530-- function for generating unique transaction ids. 521-- function for generating unique transaction ids.
531transactionMethods :: 522transactionMethods ::
532 TableMethods t tid -- ^ Table methods to lookup values by /tid/. 523 TableMethods t qid -- ^ Table methods to lookup values by /tid/.
533 -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. 524 -> (g -> (qid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/.
534 -> TransactionMethods (g,t (Result x -> IO ())) tid addr x 525 -> TransactionMethods (g,t (qid -> Result x -> IO ())) qid addr x
535transactionMethods methods generate = transactionMethods' id id methods generate 526transactionMethods methods generate = transactionMethods' id id methods generate
536 527
537-- | Handle a single inbound packet and then invoke the given continuation. 528-- | Handle a single inbound packet and then invoke the given continuation.
@@ -566,7 +557,7 @@ handleMessage (Client net d err pending whoami responseID) addr plain = do
566 IsResponse tid -> do 557 IsResponse tid -> do
567 action <- atomically $ do 558 action <- atomically $ do
568 ts0 <- readTVar pending 559 ts0 <- readTVar pending
569 (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 560 (ts, action) <- dispatchResponse (tableMethods d) tid (Success plain) ts0
570 writeTVar pending ts 561 writeTVar pending ts
571 return action 562 return action
572 action 563 action