From b5df06bf0fed5a30a9b16e1032037e6cea378464 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Wed, 16 Jan 2019 21:50:19 -0500 Subject: Queries table: Switched MVar with callback. --- src/Network/BitTorrent/MainlineDHT.hs | 3 +- src/Network/QueryResponse.hs | 160 +++++++++++++++------------------- src/Network/Tox.hs | 18 ++-- src/Network/Tox/TCP.hs | 9 +- 4 files changed, 82 insertions(+), 108 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index 6f47e38f..180ae82d 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs @@ -82,6 +82,7 @@ import qualified Data.Aeson as JSON import Text.Read import System.Global6 import Control.TriadCommittee +import Data.TableMethods import DPut import DebugTag @@ -630,8 +631,6 @@ newClient swarms addr = do _ -> routing4 routing R.thisNode <$> readTVar var , clientResponseId = return - , clientEnterQuery = \_ -> return () - , clientLeaveQuery = \_ _ -> return () } -- TODO: Provide some means of shutting down these five auxillary threads: diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index 13160d31..01981cc8 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs @@ -27,10 +27,12 @@ import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) +import Data.Time.Clock.POSIX import qualified Data.Word64Map as W64Map ;import Data.Word64Map (Word64Map) import Data.Word import Data.Maybe +import GHC.Event import Network.Socket import Network.Socket.ByteString as B import System.Endian @@ -39,6 +41,7 @@ import System.IO.Error import System.Timeout import DPut import DebugTag +import Data.TableMethods -- | Three methods are required to implement a datagram based query\/response protocol. data TransportA err addr x y = Transport @@ -203,6 +206,52 @@ forkListener name client = do closeTransport client killThread thread_id +asyncQuery_ :: Client err meth tid addr x + -> MethodSerializer tid addr x meth a b + -> a + -> addr + -> (Maybe b -> IO ()) + -> IO (tid,POSIXTime,Int) +asyncQuery_ (Client net d err pending whoami _) meth q addr0 withResponse = do + now <- getPOSIXTime + (tid,addr,expiry) <- atomically $ do + tbl <- readTVar pending + ((tid,addr,expiry), tbl') <- dispatchRegister (tableMethods d) + (methodTimeout meth) + now + (withResponse . fmap (unwrapResponse meth)) + addr0 + tbl + -- (addr,expiry) <- methodTimeout meth tid addr0 + writeTVar pending tbl' + return (tid,addr,expiry) + self <- whoami (Just addr) + mres <- do sendMessage net addr (wrapQuery meth tid self addr q) + return $ Just () + `catchIOError` (\e -> return Nothing) + return (tid,now,expiry) + +asyncQuery :: Client err meth tid addr x + -> MethodSerializer tid addr x meth a b + -> a + -> addr + -> (Maybe b -> IO ()) + -> IO () +asyncQuery client meth q addr withResponse0 = do + tm <- getSystemTimerManager + tidvar <- newEmptyMVar + timedout <- registerTimeout tm 300000000 $ do + withResponse0 Nothing + tid <- takeMVar tidvar + case client of + Client { clientDispatcher = d, clientPending = pending } -> do + atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending + (tid,now,expiry) <- asyncQuery_ client meth q addr $ \x -> do + unregisterTimeout tm timedout + withResponse0 x + putMVar tidvar tid + updateTimeout tm timedout expiry + -- | Send a query to a remote peer. Note that this function will always time -- out if 'forkListener' was never invoked to spawn a thread to receive and -- dispatch the response. @@ -213,25 +262,14 @@ sendQuery :: -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. -sendQuery (Client net d err pending whoami _ enterQuery leaveQuery) meth q addr0 = do +sendQuery c@(Client net d err pending whoami _) meth q addr0 = do mvar <- newEmptyMVar - (tid,addr,expiry) <- atomically $ do - tbl <- readTVar pending - (tid, tbl') <- dispatchRegister (tableMethods d) mvar addr0 tbl - (addr,expiry) <- methodTimeout meth tid addr0 - writeTVar pending tbl' - return (tid,addr,expiry) - self <- whoami (Just addr) - enterQuery tid - mres <- do sendMessage net addr (wrapQuery meth tid self addr q) - timeout expiry $ takeMVar mvar - `catchIOError` (\e -> return Nothing) - leaveQuery tid (isJust mres) + (tid,now,expiry) <- asyncQuery_ c meth q addr0 $ mapM_ (putMVar mvar) + mres <- timeout expiry $ takeMVar mvar case mres of - Just x -> return $ Just $ unwrapResponse meth x + Just b -> return $ Just b Nothing -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending - reportTimeout err (method meth) tid addr return Nothing -- * Implementing a query\/response 'Client'. @@ -259,10 +297,6 @@ data Client err meth tid addr x = forall tbl. Client -- /tid/ includes a unique cryptographic nonce, then it should be -- generated here. , clientResponseId :: tid -> IO tid - -- | The enter/leave methods are no-ops by default. They are useful for - -- serializing all queries for debugging purposes. - , clientEnterQuery :: tid -> IO () - , clientLeaveQuery :: tid -> Bool -> IO () } -- | An incoming message can be classified into three cases. @@ -353,7 +387,7 @@ data TransactionMethods d tid addr x = TransactionMethods -- response will be written too. The returned /tid/ is a transaction id -- that can be used to forget the 'MVar' if the remote peer is not -- responding. - dispatchRegister :: MVar x -> addr -> d -> STM (tid, d) + dispatchRegister :: (tid -> addr -> STM (addr,Int)) -> POSIXTime -> (Maybe x -> IO ()) -> addr -> d -> STM ((tid,addr,Int), d) -- | This method is invoked when an incoming packet /x/ indicates it is -- a response to the transaction with id /tid/. The returned IO action -- will write the packet to the correct 'MVar' thus completing the @@ -364,69 +398,37 @@ data TransactionMethods d tid addr x = TransactionMethods , dispatchCancel :: tid -> d -> STM d } --- | The standard lookup table methods for use as input to 'transactionMethods' --- in lieu of directly implementing 'TransactionMethods'. -data TableMethods t tid = TableMethods - { -- | Insert a new /tid/ entry into the transaction table. - tblInsert :: forall a. tid -> a -> t a -> t a - -- | Delete transaction /tid/ from the transaction table. - , tblDelete :: forall a. tid -> t a -> t a - -- | Lookup the value associated with transaction /tid/. - , tblLookup :: forall a. tid -> t a -> Maybe a - } - --- | Methods for using 'Data.IntMap'. -intMapMethods :: TableMethods IntMap Int -intMapMethods = TableMethods IntMap.insert IntMap.delete IntMap.lookup - --- | Methods for using 'Data.Word64Map'. -w64MapMethods :: TableMethods Word64Map Word64 -w64MapMethods = TableMethods W64Map.insert W64Map.delete W64Map.lookup - --- | Methods for using 'Data.Map' -mapMethods :: Ord tid => TableMethods (Map tid) tid -mapMethods = TableMethods Map.insert Map.delete Map.lookup - --- | Change the key type for a lookup table implementation. --- --- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to --- only a part of the generated /tid/ value. This is useful for /tid/ types --- that are especially large due their use for other purposes, such as secure --- nonces for encryption. -instance Contravariant (TableMethods t) where - -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid - contramap f (TableMethods ins del lookup) = - TableMethods (\k v t -> ins (f k) v t) - (\k t -> del (f k) t) - (\k t -> lookup (f k) t) - -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a -- function for generating unique transaction ids. transactionMethods :: TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. - -> TransactionMethods (g,t (MVar x)) tid addr x -transactionMethods methods generate = transactionMethods' id tryPutMVar methods generate + -> TransactionMethods (g,t (Maybe x -> IO ())) tid addr x +transactionMethods methods generate = transactionMethods' id id methods generate + +microsecondsDiff :: Int -> POSIXTime +microsecondsDiff us = fromIntegral us / 1000000 -- | Like 'transactionMethods' but allows extra information to be stored in the -- table of pending transactions. This also enables multiple 'Client's to -- share a single transaction table. transactionMethods' :: - (MVar x -> a) -- ^ store MVar into table entry - -> (a -> x -> IO void) -- ^ load MVar from table entry - -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. - -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. + ((Maybe x -> IO ()) -> a) -- ^ store MVar into table entry + -> (a -> Maybe x -> IO void) -- ^ load MVar from table entry + -> TableMethods t tid -- ^ Table methods to lookup values by /tid/. + -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t a) tid addr x transactionMethods' store load (TableMethods insert delete lookup) generate = TransactionMethods { dispatchCancel = \tid (g,t) -> return (g, delete tid t) - , dispatchRegister = \v _ (g,t) -> + , dispatchRegister = \getTimeout now v a0 (g,t) -> do let (tid,g') = generate g - t' = insert tid (store v) t - in return ( tid, (g',t') ) + (a,expiry) <- getTimeout tid a0 + let t' = insert tid (store v) (now + microsecondsDiff expiry) t + return ( (tid,a,expiry), (g',t') ) , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t - in return ((g,t'),void $ load v x) + in return ((g,t'),void $ load v $ Just x) Nothing -> return ((g,t), return ()) } @@ -459,8 +461,6 @@ data ErrorReporter addr x meth tid err = ErrorReporter , reportMissingHandler :: meth -> addr -> x -> IO () -- | Incoming: unable to identify request. , reportUnknown :: addr -> x -> err -> IO () - -- | Outgoing: remote peer is not responding. - , reportTimeout :: meth -> tid -> addr -> IO () } ignoreErrors :: ErrorReporter addr x meth tid err @@ -468,7 +468,6 @@ ignoreErrors = ErrorReporter { reportParseError = \_ -> return () , reportMissingHandler = \_ _ _ -> return () , reportUnknown = \_ _ _ -> return () - , reportTimeout = \_ _ _ -> return () } logErrors :: ( Show addr @@ -478,7 +477,6 @@ logErrors = ErrorReporter { reportParseError = \err -> dput XMisc err , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err - , reportTimeout = \meth tid addr -> dput XMisc $ show addr ++ " --> Timeout ("++show meth++")" } printErrors :: ( Show addr @@ -488,17 +486,15 @@ printErrors h = ErrorReporter { reportParseError = \err -> hPutStrLn h err , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err - , reportTimeout = \meth tid addr -> hPutStrLn h $ show addr ++ " --> Timeout ("++show meth++")" } -- Change the /err/ type for an 'ErrorReporter'. instance Contravariant (ErrorReporter addr x meth tid) where -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 - contramap f (ErrorReporter pe mh unk tim) + contramap f (ErrorReporter pe mh unk) = ErrorReporter (\e -> pe (f e)) mh (\addr x e -> unk addr x (f e)) - tim -- | Handle a single inbound packet and then invoke the given continuation. -- The 'forkListener' function is implemented by passing this function to 'fix' @@ -509,7 +505,7 @@ handleMessage :: -> addr -> x -> IO (Maybe (x -> x)) -handleMessage (Client net d err pending whoami responseID _ _) addr plain = do +handleMessage (Client net d err pending whoami responseID) addr plain = do -- Just (Left e) -> do reportParseError err e -- return $! Just id -- Just (Right (plain, addr)) -> do @@ -637,19 +633,3 @@ testPairTransport = do b = SockAddrInet 2 2 return ( chanTransport (const bchan) a achan aclosed , chanTransport (const achan) b bchan bclosed ) - -serializeClient :: Client err meth tid addr x -> IO (Client err meth tid addr x) -serializeClient c = do - mvar <- newMVar () - return $ c { clientEnterQuery = \tid -> takeMVar mvar - , clientLeaveQuery = \tid didRespond -> putMVar mvar () - } - -retardSend :: Int -> Client err meth tid addr x -> IO (Client err meth tid addr x) -retardSend micros client = do - mvar <- newMVar () :: IO (MVar ()) - return client { clientEnterQuery = \tid -> do - takeMVar mvar - threadDelay micros - putMVar mvar () - } diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index c14339e4..98c03b80 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs @@ -44,6 +44,7 @@ import Network.Socket import System.Endian import System.IO.Error +import Data.TableMethods import qualified Data.Word64Map import Network.BitTorrent.DHT.Token as Token import qualified Data.Wrapper.PSQ as PSQ @@ -159,12 +160,10 @@ newClient drg net classify selfAddr handlers modifytbl modifynet = do let client = Client { clientNet = addHandler (reportParseError eprinter) (handleMessage client) $ modifynet client net , clientDispatcher = dispatch tbl var (handlers client) client - , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } + , clientErrorReporter = eprinter , clientPending = var , clientAddress = selfAddr , clientResponseId = genNonce24 var - , clientEnterQuery = \_ -> return () - , clientLeaveQuery = \_ _ -> return () } in client return $ either mkclient mkclient tblvar handlers @@ -250,8 +249,8 @@ newOnionClient :: DRG g => -> TVar Onion.AnnouncedKeys -> OnionRouter -> TVar (g, Data.Word64Map.Word64Map a) - -> (MVar Onion.Message -> a) - -> (a -> Onion.Message -> IO void) + -> ((Maybe Onion.Message -> IO ()) -> a) + -> (a -> Maybe Onion.Message -> IO void) -> Client String DHT.PacketKind DHT.TransactionId @@ -268,12 +267,10 @@ newOnionClient crypto net r toks keydb orouter map_var store load = c , tableMethods = hookQueries orouter DHT.transactionKey $ transactionMethods' store load (contramap w64Key w64MapMethods) gen } - , clientErrorReporter = eprinter { reportTimeout = reportTimeout ignoreErrors } + , clientErrorReporter = eprinter , clientPending = map_var , clientAddress = getOnionAlias crypto $ R.thisNode <$> readTVar (DHT.routing4 r) , clientResponseId = genNonce24 map_var - , clientEnterQuery = \_ -> return () - , clientLeaveQuery = \_ _ -> return () } newTox :: TVar Onion.AnnouncedKeys -- ^ Store of announced keys we are a rendezvous for. @@ -359,10 +356,9 @@ newToxOverTransport keydb addr onNewSession suppliedDHTKey udp tcp = do let onionnet = layerTransportM (Onion.decrypt crypto) (Onion.encrypt crypto) onioncrypt let onionclient = newOnionClient crypto onionnet (mkrouting dhtclient) toks keydb orouter' otbl Right $ \case - Right v -> tryPutMVar v - Left v -> \_ -> do + Right v -> v + Left v -> \_ -> dput XUnexpected "TCP-sent onion query got response over UDP?" - return False return Tox { toxDHT = dhtclient diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index adb42514..9c1ffe48 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs @@ -34,6 +34,7 @@ import System.Timeout import ControlMaybe import Crypto.Tox import Data.ByteString (hPut,hGet,ByteString,length) +import Data.TableMethods import Data.Tox.Relay import qualified Data.Word64Map import DebugTag @@ -269,7 +270,7 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state -- will be returned to the caller along with the new client. newClient :: TransportCrypto - -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query + -> ((Maybe (Bool,RelayPacket) -> IO ()) -> a) -- ^ store mvar for query -> (a -> RelayPacket -> IO void) -- ^ load mvar for query -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) , TCPCache (SessionProtocol RelayPacket RelayPacket) ) @@ -299,16 +300,14 @@ newClient crypto store load = do { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a , noreplyAction = \addr a -> dput XTCP $ "tcp-lookupHandler: "++show w } - , tableMethods = transactionMethods' store (\x -> load x . snd) (contramap (\(Nonce8 w64) -> w64) w64MapMethods) + , tableMethods = transactionMethods' store (\x -> mapM_ (load x . snd)) (contramap (\(Nonce8 w64) -> w64) w64MapMethods) $ first (either error Nonce8 . decode) . randomBytesGenerate 8 } - , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors } + , clientErrorReporter = logErrors , clientPending = map_var , clientAddress = \_ -> return $ NodeInfo { udpNodeInfo = either error id $ UDP.nodeInfo (UDP.key2id $ transportPublic crypto) (SockAddrInet 0 0) , tcpPort = 0 } , clientResponseId = return - , clientEnterQuery = \_ -> return () - , clientLeaveQuery = \_ _ -> return () } -- cgit v1.2.3