From 58db745cc60428ee7a959599b2e83ff7504d5b57 Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 12 Jul 2017 19:54:44 -0400 Subject: Polymorphic implementation of a query/response protocol. --- src/Network/QueryResponse.hs | 332 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 332 insertions(+) create mode 100644 src/Network/QueryResponse.hs (limited to 'src/Network') diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs new file mode 100644 index 00000000..5b8bcda4 --- /dev/null +++ b/src/Network/QueryResponse.hs @@ -0,0 +1,332 @@ +-- | This module can implement any query\/response protocol. It was written +-- with Kademlia implementations in mind. + +{-# LANGUAGE CPP #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE PartialTypeSignatures #-} +{-# LANGUAGE GADTs #-} +module Network.QueryResponse where + +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import GHC.Conc (labelThread) +import Control.Concurrent +#endif +import Control.Concurrent.STM +import System.Timeout +import Data.Function +import Control.Exception +import Control.Monad +import qualified Data.ByteString as B + ;import Data.ByteString (ByteString) +import Network.Socket +import Network.Socket.ByteString as B +import System.IO.Error +import Data.Maybe + +-- * Using a query\/response 'Client'. + +-- | Fork a thread that handles inbound packets. The returned action may be used +-- to terminate the thread and clean up any related state. +-- +-- Example usage: +-- +-- > -- Start client. +-- > quitServer <- forkListener client +-- > -- Send a query q, recieve a response r. +-- > r <- sendQuery client method q +-- > -- Quit client. +-- > quitServer +forkListener :: Client err tbl meth tid addr x -> IO (IO ()) +forkListener client = do + thread_id <- fork $ do + myThreadId >>= flip labelThread "listener" + fix $ handleMessage client + return $ do + closeTransport (clientNet client) + killThread thread_id + +-- | Send a query to a remote peer. Note that this funciton will always time +-- out if 'forkListener' was never invoked to spawn a thread receive and +-- dispatch the response. +sendQuery :: + forall err a b tbl x meth tid addr. + Client err tbl meth tid addr x -- ^ A query/response implementation. + -> Method addr x meth a b -- ^ Information for marshalling the query. + -> a -- ^ The outbound query. + -> addr -- ^ Destination address of query. + -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. +sendQuery (Client net d err pending whoami) meth q addr = do + mvar <- newEmptyMVar + tid <- atomically $ do + tbl <- readTVar pending + let (tid, tbl') = dispatchRegister (tableMethods d) mvar tbl + writeTVar pending tbl' + return tid + self <- whoami + sendMessage net addr (wrapQuery meth self addr q) + mres <- timeout (methodTimeout meth) $ takeMVar mvar + case mres of + Just x -> return $ Just $ unwrapResponse meth x + Nothing -> do + atomically $ modifyTVar' pending (dispatchCancel (tableMethods d) tid) + reportTimeout err (method meth) tid addr + return Nothing + +-- * Implementing a query\/response 'Client'. + +-- | All inputs required to implement a query\/response client. +data Client err tbl meth tid addr x = Client + { -- | The 'Transport' used to dispatch and receive packets. + clientNet :: Transport err addr x + -- | Methods for handling inbound packets. + , clientDispatcher :: DispatchMethods tbl err meth tid addr x + -- | Methods for reporting various conditions. + , clientErrorReporter :: ErrorReporter addr x meth tid err + -- | State necessary for routing inbound responses and assigning unique + -- /tid/ values for outgoing queries. + , clientPending :: TVar tbl + -- | An action yielding this client\'s own address. It is invoked once on + -- each outbound and inbound packet. It is valid for this to always + -- return the same value. + , clientMyAddress :: IO addr + } + +-- | An incomming message can be classified into three cases. +data MessageClass err meth tid + = IsQuery meth -- ^ An unsolicited query is handled based on it's /meth/ value. + | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. + | IsUnknown err -- ^ None of the above. + +-- | Handler for an inbound query of type _x_ from an address of type _addr_. +data MethodHandler err addr x = forall a b. MethodHandler + { -- | Parse the query into a more specific type for this method. + methodParse :: x -> Either err a + -- | Serialize the response type for transmission. Origin and destination + -- addresses for the packet are supplied in case they are required. + , methodSerialize :: addr -> addr -> b -> x + -- | Fully typed action to perform upon the query. The remote origin + -- address of the query is provided to the handler. + , methodAction :: addr -> a -> IO b + } + +-- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the +-- parse is successful, the returned IO action will construct our reply. +-- Otherwise, a parse err is returned. +dispatchQuery :: MethodHandler err addr x -- ^ Handler to invoke. + -> addr -- ^ Our own address, to which the query was sent. + -> x -- ^ The query packet. + -> addr -- ^ The origin address of the query. + -> Either err (IO x) +dispatchQuery (MethodHandler unwrapQ wrapR f) self x addr = + fmap (\a -> wrapR self addr <$> f addr a) $ unwrapQ x + +-- | These four parameters are required to implement an ougoing query. A +-- peer-to-peer algorithm will define a 'Method' for every 'MethodHandler' that +-- might be returned by 'lookupHandler'. +data Method addr x meth a b = Method + { -- | Seconds to wait for a response. + methodTimeout :: Int + -- | A method identifier used for error reporting. This needn't be the + -- same as the /meth/ argument to 'MethodHandler', but it is suggested. + , method :: meth + -- | Serialize the outgoing query /a/ into a transmitable packet /x/. + -- The /addr/ arguments are, respectively, our own origin address and the + -- destination of the request. + , wrapQuery :: addr -> addr -> a -> x + -- | Parse an inbound packet /x/ into a response /b/ for this query. + , unwrapResponse :: x -> b + } + + +-- | Three methods are required to implement a datagram based query\/response protocol. +data Transport err addr x = Transport + { -- | Blocks until an inbound packet is available. Returns 'Nothing' when + -- no more packets are expected due to a shutdown or close event. + -- Otherwise, the packet will be parsed as type /x/ and an origin address + -- /addr/. Parse failure is indicated by the type 'err'. + awaitMessage :: IO (Maybe (Either err (x, addr))) + -- | Send an /x/ packet to the given destination /addr/. + , sendMessage :: addr -> x -> IO () + -- | Shutdown and clean up any state related to this 'Transport'. + , closeTransport :: IO () + } + +-- | This function modifies a 'Transport' to use higher-level addresses and +-- packet representations. It could be used to change UDP 'ByteString's into +-- bencoded syntax trees or to add an encryption layer in which addresses have +-- associated public keys. +layerTransport :: + (x -> addr -> Either err (x', addr')) + -- ^ Function that attempts to transform a low-level address/packet + -- pair into a higher level representation. + -> (x' -> addr' -> (x, addr)) + -- ^ Function to encode a high-level address/packet into a lower level + -- representation. + -> Transport err addr x + -- ^ The low-level transport to be transformed. + -> Transport err addr' x' +layerTransport parse encode tr = + tr { awaitMessage = do + m <- awaitMessage tr + return $ fmap (>>= uncurry parse) m + , sendMessage = \addr' msg' -> do + let (msg,addr) = encode msg' addr' + sendMessage tr addr msg + } + + +-- | To dipatch responses to our outbound queries, we require three primitives. +-- See the 'transactionTableMethods' function to create these primitives out of a +-- lookup table and a generator for transaction ids. +-- +-- The type variable /d/ is used to represent the current state of the +-- transaction generator and the table of pending transactions. +data TableMethods d tid x = TableMethods + { + -- | Before a query is sent, this function stores an 'MVar' to which the + -- response will be written too. The returned _tid_ is a transaction id + -- that can be used to forget the 'MVar' if the remote peer is not + -- responding. + dispatchRegister :: MVar x -> d -> (tid, d) + -- | This method is invoked when an incomming packet _x_ indicates it is + -- a response to the transaction with id _tid_. The returned IO action + -- is will write the packet to the correct 'MVar' thus completing the + -- dispatch. + , dispatchResponse :: tid -> x -> d -> (d, IO ()) + -- | When a timeout interval elapses, this method is called to remove the + -- transaction from the table. + , dispatchCancel :: tid -> d -> d + } + +-- | Construct 'TableMethods' methods out of 3 lookup table primitives and a +-- function for generating unique transaction ids. +transactionTableMethods :: + (forall a. tid -> a -> t a -> t a) + -- ^ Insert a new _tid_ entry into the transaction table. + -> (forall a. tid -> t a -> t a) + -- ^ Delete transaction _tid_ from the transaction table. + -> (forall a. tid -> t a -> Maybe a) + -- ^ Lookup the value associated with transaction _tid_. + -> (g -> (tid,g)) + -- ^ Generate a new unique _tid_ value and update the generator state _g_. + -> TableMethods (g,t (MVar x)) tid x +transactionTableMethods insert delete lookup generate = TableMethods + { dispatchCancel = \tid (g,t) -> (g, delete tid t) + , dispatchRegister = \v (g,t) -> + let (tid,g') = generate g + t' = insert tid v t + in ( tid, (g',t') ) + , dispatchResponse = \tid x (g,t) -> + case lookup tid t of + Just v -> let t' = delete tid t + in ((g,t'),void $ tryPutMVar v x) + Nothing -> ((g,t), return ()) + } + +-- | A set of methods neccessary for dispatching incomming packets. +data DispatchMethods tbl err meth tid addr x = DispatchMethods + { -- | Clasify an inbound packet as a query or response. + classifyInbound :: x -> MessageClass err meth tid + -- | Lookup the handler for a inbound query. + , lookupHandler :: meth -> Maybe (MethodHandler err addr x) + -- | Methods for handling incomming responses. + , tableMethods :: TableMethods tbl tid x + } + +-- | These methods indicate what should be done upon various conditions. Write +-- to a log file, make debug prints, or simply ignore them. +-- +-- [ /addr/ ] Address of remote peer. +-- +-- [ /x/ ] Incomming or outgoing packet. +-- +-- [ /meth/ ] Method id of incomming or outgoing request. +-- +-- [ /tid/ ] Transaction id for outgoing packet. +-- +-- [ /err/ ] Error information, typically a 'String'. +data ErrorReporter addr x meth tid err = ErrorReporter + { -- | Incomming: failed to parse packet. + reportParseError :: err -> IO () + -- | Incomming: no handler for request. + , reportMissingHandler :: meth -> addr -> x -> IO () + -- | Incomming: unable to identify request. + , reportUnknown :: addr -> x -> err -> IO () + -- | Outgoing: remote peer is not responding. + , reportTimeout :: meth -> tid -> addr -> IO () + } + +-- | Handle a single inbound packet and then invoke the given continuation. +-- The 'forkListener' function is implemeneted by passing this function to +-- 'fix' in a forked thread that loops until 'awaitMessage' returns 'Nothing' +-- or throws an exception. +handleMessage :: + Client err tbl meth tid addr x + -> IO () + -> IO () +handleMessage (Client net d err pending whoami) again = do + awaitMessage net >>= \case + Just (Left e) -> do reportParseError err e + again + Just (Right (plain, addr)) -> do + case classifyInbound d plain of + IsQuery meth -> case lookupHandler d meth of + Nothing -> reportMissingHandler err meth addr plain + Just m -> do + self <- whoami + either (reportParseError err) + (>>= sendMessage net addr) + (dispatchQuery m self plain addr) + IsResponse tid -> do + action <- atomically $ do + ts0 <- readTVar pending + let (ts, action) = dispatchResponse (tableMethods d) tid plain ts0 + writeTVar pending ts + return action + action + IsUnknown e -> reportUnknown err addr plain e + again + Nothing -> return () + +-- * UDP Datagrams. + +-- | Access the address family of a given 'SockAddr'. This convenient accessor +-- is missing from 'Network.Socket', so I implemented it here. +sockAddrFamily :: SockAddr -> Family +sockAddrFamily (SockAddrInet _ _ ) = AF_INET +sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 +sockAddrFamily (SockAddrUnix _ ) = AF_UNIX +sockAddrFamily (SockAddrCan _ ) = AF_CAN + +-- | Packets with an empty payload may trigger eof exception. +-- 'udpTransport' uses this function to avoid throwing in that +-- case. +ignoreEOF def e | isEOFError e = pure def + | otherwise = throwIO e + +-- | Hardcoded maximum packet size for incomming udp packets received via +-- 'udpTransport'. +udpBufferSize :: Int +udpBufferSize = 65536 + +-- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The +-- argument is the listen-address for incomming packets. This is a useful +-- low-level 'Transport' that can be transformed for higher-level protocols +-- using 'layerTransport'. +udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString) +udpTransport bind_address = do + let family = sockAddrFamily bind_address + sock <- socket family Datagram defaultProtocol + when (family == AF_INET6) $ do + setSocketOption sock IPv6Only 0 + bind sock bind_address + return Transport + { awaitMessage = handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do + r <- B.recvFrom sock udpBufferSize + return $ Just $ Right r + , sendMessage = \addr bs -> void $ B.sendTo sock bs addr + , closeTransport = close sock + } -- cgit v1.2.3