From 1f1bcd70f5c0b7d3c1a135fa8b53a03b507442c4 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 4 Aug 2017 18:34:18 -0400 Subject: Switched awaitMessage to continuation-passing style. --- Mainline.hs | 9 ++- Tox.hs | 9 +-- examples/dhtd.hs | 4 +- src/Network/QueryResponse.hs | 143 +++++++++++++++++++++++-------------------- 4 files changed, 86 insertions(+), 79 deletions(-) diff --git a/Mainline.hs b/Mainline.hs index 291a196f..860372dc 100644 --- a/Mainline.hs +++ b/Mainline.hs @@ -419,11 +419,10 @@ showPacket f addr flow bs = L8.unpack $ L8.unlines es -- Add detailed printouts for every packet. addVerbosity tr = - tr { awaitMessage = do - m <- awaitMessage tr + tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do forM_ m $ mapM_ $ \(msg,addr) -> do hPutStrLn stderr (showPacket id addr " --> " msg) - return m + kont m , sendMessage = \addr msg -> do hPutStrLn stderr (showPacket id addr " <-- " msg) sendMessage tr addr msg @@ -566,7 +565,7 @@ newClient addr = do -- recursive since 'updateRouting' does not invoke 'awaitMessage' which -- which was modified by 'onInbound'. However, I'm going to avoid the -- mutual reference just to be safe. - outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } + outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } dispatch = DispatchMethods { classifyInbound = classify -- :: x -> MessageClass err meth tid @@ -587,7 +586,7 @@ newClient addr = do gen cnt = (TransactionId $ S.encode cnt, cnt+1) client = Client - { clientNet = net + { clientNet = addHandler (handleMessage client) net , clientDispatcher = dispatch , clientErrorReporter = ignoreErrors -- printErrors stderr , clientPending = map_var diff --git a/Tox.hs b/Tox.hs index 253c83e7..bd5ebbc2 100644 --- a/Tox.hs +++ b/Tox.hs @@ -597,7 +597,7 @@ newClient addr = do -- recursive since 'updateRouting' does not invoke 'awaitMessage' which -- which was modified by 'onInbound'. However, I'm going to avoid the -- mutual reference just to be safe. - outgoingClient = client { clientNet = net { awaitMessage = return Nothing } } + outgoingClient = client { clientNet = net { awaitMessage = ($ Nothing) } } dispatch tbl var = DispatchMethods { classifyInbound = classify @@ -608,6 +608,7 @@ newClient addr = do -- handlers :: TVar -> Method -> Maybe Handler handlers var PingType = handler PongType pingH handlers var GetNodesType = handler SendNodesType $ getNodesH routing + {- handlers var OnionRequest0 = noreply OnionRequest0 $ onionSend0H (symmetricCipher (return symkey) (fst <$> readTVar var) @@ -616,6 +617,7 @@ newClient addr = do handlers var OnionResponse1 = noreply OnionResponse1 $ onionResponse1H (symmetricDecipher (return symkey)) udp + -} handlers var _ = Nothing -- TODO DHTRequest public key (onion) -- TODO DHTRequest NAT ping @@ -690,12 +692,11 @@ dropEnd8 bs = B.take (B.length bs - 8) bs -- Add detailed printouts for every packet. addVerbosity tr = - tr { awaitMessage = do - m <- awaitMessage tr + tr { awaitMessage = \kont -> awaitMessage tr $ \m -> do forM_ m $ mapM_ $ \(msg,addr) -> do hPutStrLn stderr ( (show addr) ++ " --> " ++ show (msgType msg)) - return m + kont m , sendMessage = \addr msg -> do hPutStrLn stderr ( (show addr) ++ " <-- " ++ show (msgType msg)) diff --git a/examples/dhtd.hs b/examples/dhtd.hs index f0a48bb2..8e8d47a2 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -469,13 +469,13 @@ main = do (atomically . writeTVar (Mainline.contactInfo swarms)) (peerdb >>= S.decodeLazy) - quitBt <- forkListener bt + quitBt <- forkListener (clientNet bt) let toxport = succ $ fromMaybe 33445 (fromIntegral <$> sockAddrPort addr) addrTox <- getBindAddress (show toxport) True (tox,toxR) <- Tox.newClient addrTox - quitTox <- forkListener tox + quitTox <- forkListener (clientNet tox) mainlineSearches <- atomically $ newTVar Map.empty toxSearches <- atomically $ newTVar Map.empty diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index c8a6fa80..190cc73e 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs @@ -7,6 +7,7 @@ {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} module Network.QueryResponse where #ifdef THREAD_DEBUG @@ -33,7 +34,55 @@ import System.IO import System.IO.Error import System.Timeout --- * Using a query\/response 'Client'. +-- | Three methods are required to implement a datagram based query\/response protocol. +data Transport err addr x = Transport + { -- | Blocks until an inbound packet is available. Returns 'Nothing' when + -- no more packets are expected due to a shutdown or close event. + -- Otherwise, the packet will be parsed as type /x/ and an origin address + -- /addr/. Parse failure is indicated by the type 'err'. + awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a + -- | Send an /x/ packet to the given destination /addr/. + , sendMessage :: addr -> x -> IO () + -- | Shutdown and clean up any state related to this 'Transport'. + , closeTransport :: IO () + } + +-- | This function modifies a 'Transport' to use higher-level addresses and +-- packet representations. It could be used to change UDP 'ByteString's into +-- bencoded syntax trees or to add an encryption layer in which addresses have +-- associated public keys. +layerTransport :: + (x -> addr -> Either err (x', addr')) + -- ^ Function that attempts to transform a low-level address/packet + -- pair into a higher level representation. + -> (x' -> addr' -> (x, addr)) + -- ^ Function to encode a high-level address/packet into a lower level + -- representation. + -> Transport err addr x + -- ^ The low-level transport to be transformed. + -> Transport err addr' x' +layerTransport parse encode tr = + tr { awaitMessage = \kont -> + awaitMessage tr $ \m -> kont $ fmap (>>= uncurry parse) m + , sendMessage = \addr' msg' -> do + let (msg,addr) = encode msg' addr' + sendMessage tr addr msg + } + +addHandler :: (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x +addHandler f tr = tr + { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do + case m of + Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) + Just (Left e ) -> kont $ Just (Left e) + Nothing -> kont $ Nothing + } + +-- | Modify a 'Transport' to invoke an action upon every received packet. +onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x +onInbound f tr = addHandler (\addr x -> f addr x >> return (Just id)) tr + +-- * Using a query\/response client. -- | Fork a thread that handles inbound packets. The returned action may be used -- to terminate the thread and clean up any related state. @@ -41,18 +90,18 @@ import System.Timeout -- Example usage: -- -- > -- Start client. --- > quitServer <- forkListener client +-- > quitServer <- forkListener (clientNet client) -- > -- Send a query q, recieve a response r. -- > r <- sendQuery client method q -- > -- Quit client. -- > quitServer -forkListener :: Client err meth tid addr x -> IO (IO ()) +forkListener :: Transport err addr x -> IO (IO ()) forkListener client = do thread_id <- forkIO $ do myThreadId >>= flip labelThread "listener" - fix $ handleMessage client + fix $ awaitMessage client . const return $ do - closeTransport (clientNet client) + closeTransport client killThread thread_id -- | Send a query to a remote peer. Note that this funciton will always time @@ -165,53 +214,6 @@ data MethodSerializer tid addr x meth a b = MethodSerializer } --- | Three methods are required to implement a datagram based query\/response protocol. -data Transport err addr x = Transport - { -- | Blocks until an inbound packet is available. Returns 'Nothing' when - -- no more packets are expected due to a shutdown or close event. - -- Otherwise, the packet will be parsed as type /x/ and an origin address - -- /addr/. Parse failure is indicated by the type 'err'. - awaitMessage :: IO (Maybe (Either err (x, addr))) - -- | Send an /x/ packet to the given destination /addr/. - , sendMessage :: addr -> x -> IO () - -- | Shutdown and clean up any state related to this 'Transport'. - , closeTransport :: IO () - } - --- | This function modifies a 'Transport' to use higher-level addresses and --- packet representations. It could be used to change UDP 'ByteString's into --- bencoded syntax trees or to add an encryption layer in which addresses have --- associated public keys. -layerTransport :: - (x -> addr -> Either err (x', addr')) - -- ^ Function that attempts to transform a low-level address/packet - -- pair into a higher level representation. - -> (x' -> addr' -> (x, addr)) - -- ^ Function to encode a high-level address/packet into a lower level - -- representation. - -> Transport err addr x - -- ^ The low-level transport to be transformed. - -> Transport err addr' x' -layerTransport parse encode tr = - tr { awaitMessage = do - m <- awaitMessage tr - return $ fmap (>>= uncurry parse) m - , sendMessage = \addr' msg' -> do - let (msg,addr) = encode msg' addr' - sendMessage tr addr msg - } - --- | Modify a 'Transport' to invoke an action upon every received packet. -onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x -onInbound f tr = tr - { awaitMessage = do - m <- awaitMessage tr - case m of - Just (Right (x, addr)) -> f addr x - _ -> return () - return m - } - -- | To dipatch responses to our outbound queries, we require three primitives. -- See the 'transactionMethods' function to create these primitives out of a -- lookup table and a generator for transaction ids. @@ -360,21 +362,24 @@ contramapE f (ErrorReporter pe mh unk tim) -- or throws an exception. handleMessage :: Client err meth tid addr x - -> IO () - -> IO () -handleMessage (Client net d err pending whoami responseID) again = do - awaitMessage net >>= \case - Just (Left e) -> do reportParseError err e - again - Just (Right (plain, addr)) -> do + -> addr + -> x + -> IO (Maybe (x -> x)) +handleMessage (Client net d err pending whoami responseID) addr plain = do + -- Just (Left e) -> do reportParseError err e + -- return $! Just id + -- Just (Right (plain, addr)) -> do case classifyInbound d plain of IsQuery meth tid -> case lookupHandler d meth of - Nothing -> reportMissingHandler err meth addr plain + Nothing -> do reportMissingHandler err meth addr plain + return $! Just id Just m -> do self <- whoami (Just addr) tid' <- responseID tid - either (reportParseError err) - (>>= mapM_ (sendMessage net addr)) + either (\e -> do reportParseError err e + return $! Just id) + (>>= \m -> do mapM_ (sendMessage net addr) m + return $! Nothing) (dispatchQuery m tid' self plain addr) IsResponse tid -> do action <- atomically $ do @@ -383,9 +388,10 @@ handleMessage (Client net d err pending whoami responseID) again = do writeTVar pending ts return action action - IsUnknown e -> reportUnknown err addr plain e - again - Nothing -> return () + return $! Nothing + IsUnknown e -> do reportUnknown err addr plain e + return $! Just id + -- Nothing -> return $! id -- * UDP Datagrams. @@ -420,9 +426,10 @@ udpTransport bind_address = do setSocketOption sock IPv6Only 0 bind sock bind_address return Transport - { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do - r <- B.recvFrom sock udpBufferSize - return $ Just $ Right r + { awaitMessage = \kont -> do + r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do + Just . Right <$!> B.recvFrom sock udpBufferSize + kont $! r , sendMessage = \addr bs -> void $ B.sendTo sock bs addr -- TODO: sendTo: does not exist (Network is unreachable) -- Occurs when IPv6 network is not available. -- cgit v1.2.3