-- | This module can implement any query\/response protocol. It was written -- with Kademlia implementations in mind. {-# LANGUAGE CPP #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Network.QueryResponse where #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent import GHC.Conc (labelThread) #endif import Control.Concurrent.STM import Control.Exception import Control.Monad import qualified Data.ByteString as B ;import Data.ByteString (ByteString) import Data.Function import Data.Functor.Contravariant import qualified Data.IntMap.Strict as IntMap ;import Data.IntMap.Strict (IntMap) import qualified Data.Map.Strict as Map ;import Data.Map.Strict (Map) import qualified Data.Word64Map as W64Map ;import Data.Word64Map (Word64Map) import Data.Word import Data.Maybe import Network.Socket import Network.Socket.ByteString as B import System.Endian import System.IO import System.IO.Error import System.Timeout import DPut -- | Three methods are required to implement a datagram based query\/response protocol. data Transport err addr x = Transport { -- | Blocks until an inbound packet is available. Returns 'Nothing' when -- no more packets are expected due to a shutdown or close event. -- Otherwise, the packet will be parsed as type /x/ and an origin address -- /addr/. Parse failure is indicated by the type 'err'. awaitMessage :: forall a. (Maybe (Either err (x, addr)) -> IO a) -> IO a -- | Send an /x/ packet to the given destination /addr/. , sendMessage :: addr -> x -> IO () -- | Shutdown and clean up any state related to this 'Transport'. , closeTransport :: IO () } -- | This function modifies a 'Transport' to use higher-level addresses and -- packet representations. It could be used to change UDP 'ByteString's into -- bencoded syntax trees or to add an encryption layer in which addresses have -- associated public keys. layerTransportM :: (x -> addr -> IO (Either err (x', addr'))) -- ^ Function that attempts to transform a low-level address/packet -- pair into a higher level representation. -> (x' -> addr' -> IO (x, addr)) -- ^ Function to encode a high-level address/packet into a lower level -- representation. -> Transport err addr x -- ^ The low-level transport to be transformed. -> Transport err addr' x' layerTransportM parse encode tr = tr { awaitMessage = \kont -> awaitMessage tr $ \m -> mapM (mapM $ uncurry parse) m >>= kont . fmap join , sendMessage = \addr' msg' -> do (msg,addr) <- encode msg' addr' sendMessage tr addr msg } -- | This function modifies a 'Transport' to use higher-level addresses and -- packet representations. It could be used to change UDP 'ByteString's into -- bencoded syntax trees or to add an encryption layer in which addresses have -- associated public keys. layerTransport :: (x -> addr -> Either err (x', addr')) -- ^ Function that attempts to transform a low-level address/packet -- pair into a higher level representation. -> (x' -> addr' -> (x, addr)) -- ^ Function to encode a high-level address/packet into a lower level -- representation. -> Transport err addr x -- ^ The low-level transport to be transformed. -> Transport err addr' x' layerTransport parse encode tr = layerTransportM (\x addr -> return $ parse x addr) (\x' addr' -> return $ encode x' addr') tr -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransport :: ((b,a) -> Either (x,xaddr) (b,a)) -> ((x,xaddr) -> Maybe (b,a)) -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionTransport parse encodex tr = partitionTransportM (return . parse) (return . encodex) tr -- | Paritions a 'Transport' into two higher-level transports. Note: An 'MVar' -- is used to share the same underlying socket, so be sure to fork a thread for -- both returned 'Transport's to avoid hanging. partitionTransportM :: ((b,a) -> IO (Either (x,xaddr) (b,a))) -> ((x,xaddr) -> IO (Maybe (b,a))) -> Transport err a b -> IO (Transport err xaddr x, Transport err a b) partitionTransportM parse encodex tr = do mvar <- newEmptyMVar let xtr = tr { awaitMessage = \kont -> fix $ \again -> do awaitMessage tr $ \m -> case m of Just (Right msg) -> parse msg >>= either (kont . Just . Right) (\y -> putMVar mvar y >> again) Just (Left e) -> kont $ Just (Left e) Nothing -> kont Nothing , sendMessage = \addr' msg' -> do msg_addr <- encodex (msg',addr') mapM_ (uncurry . flip $ sendMessage tr) msg_addr } ytr = Transport { awaitMessage = \kont -> takeMVar mvar >>= kont . Just . Right , sendMessage = sendMessage tr , closeTransport = return () } return (xtr, ytr) -- | -- * f add x --> Nothing, consume x -- --> Just id, leave x to a different handler -- --> Just g, apply g to x and leave that to a different handler addHandler :: (err -> IO ()) -> (addr -> x -> IO (Maybe (x -> x))) -> Transport err addr x -> Transport err addr x addHandler onParseError f tr = tr { awaitMessage = \kont -> fix $ \eat -> awaitMessage tr $ \m -> do case m of Just (Right (x, addr)) -> f addr x >>= maybe eat (kont . Just . Right . (, addr) . ($ x)) Just (Left e ) -> onParseError e >> kont (Just $ Left e) Nothing -> kont $ Nothing } -- | Modify a 'Transport' to invoke an action upon every received packet. onInbound :: (addr -> x -> IO ()) -> Transport err addr x -> Transport err addr x onInbound f tr = addHandler (const $ return ()) (\addr x -> f addr x >> return (Just id)) tr -- * Using a query\/response client. -- | Fork a thread that handles inbound packets. The returned action may be used -- to terminate the thread and clean up any related state. -- -- Example usage: -- -- > -- Start client. -- > quitServer <- forkListener "listener" (clientNet client) -- > -- Send a query q, recieve a response r. -- > r <- sendQuery client method q -- > -- Quit client. -- > quitServer forkListener :: String -> Transport err addr x -> IO (IO ()) forkListener name client = do thread_id <- forkIO $ do myThreadId >>= flip labelThread ("listener."++name) fix $ awaitMessage client . const return $ do closeTransport client killThread thread_id -- | Send a query to a remote peer. Note that this function will always time -- out if 'forkListener' was never invoked to spawn a thread to receive and -- dispatch the response. sendQuery :: forall err a b tbl x meth tid addr. Client err meth tid addr x -- ^ A query/response implementation. -> MethodSerializer tid addr x meth a b -- ^ Information for marshaling the query. -> a -- ^ The outbound query. -> addr -- ^ Destination address of query. -> IO (Maybe b) -- ^ The response, or 'Nothing' if it timed out. sendQuery (Client net d err pending whoami _ enterQuery leaveQuery) meth q addr0 = do mvar <- newEmptyMVar (tid,addr,expiry) <- atomically $ do tbl <- readTVar pending (tid, tbl') <- dispatchRegister (tableMethods d) mvar addr0 tbl (addr,expiry) <- methodTimeout meth tid addr0 writeTVar pending tbl' return (tid,addr,expiry) self <- whoami (Just addr) enterQuery tid mres <- do sendMessage net addr (wrapQuery meth tid self addr q) timeout expiry $ takeMVar mvar `catchIOError` (\e -> return Nothing) leaveQuery tid (isJust mres) case mres of Just x -> return $ Just $ unwrapResponse meth x Nothing -> do atomically $ readTVar pending >>= dispatchCancel (tableMethods d) tid >>= writeTVar pending reportTimeout err (method meth) tid addr return Nothing -- * Implementing a query\/response 'Client'. -- | All inputs required to implement a query\/response client. data Client err meth tid addr x = forall tbl. Client { -- | The 'Transport' used to dispatch and receive packets. clientNet :: Transport err addr x -- | Methods for handling inbound packets. , clientDispatcher :: DispatchMethods tbl err meth tid addr x -- | Methods for reporting various conditions. , clientErrorReporter :: ErrorReporter addr x meth tid err -- | State necessary for routing inbound responses and assigning unique -- /tid/ values for outgoing queries. , clientPending :: TVar tbl -- | An action yielding this client\'s own address. It is invoked once -- on each outbound and inbound packet. It is valid for this to always -- return the same value. -- -- The argument, if supplied, is the remote address for the transaction. -- This can be used to maintain consistent aliases for specific peers. , clientAddress :: Maybe addr -> IO addr -- | Transform a query /tid/ value to an appropriate response /tid/ -- value. Normally, this would be the identity transformation, but if -- /tid/ includes a unique cryptographic nonce, then it should be -- generated here. , clientResponseId :: tid -> IO tid -- | The enter/leave methods are no-ops by default. They are useful for -- serializing all queries for debugging purposes. , clientEnterQuery :: tid -> IO () , clientLeaveQuery :: tid -> Bool -> IO () } -- | An incoming message can be classified into three cases. data MessageClass err meth tid addr x = IsQuery meth tid -- ^ An unsolicited query is handled based on it's /meth/ value. Any response -- should include the provided /tid/ value. | IsResponse tid -- ^ A response to a outgoing query we associated with a /tid/ value. | IsUnsolicited (addr -> addr -> IO (Maybe (x -> x))) -- ^ Transactionless informative packet. The io action will be invoked -- with the source and destination address of a message. If it handles the -- message, it should return Nothing. Otherwise, it should return a transform -- (usually /id/) to apply before the next handler examines it. | IsUnknown err -- ^ None of the above. -- | Handler for an inbound query of type /x/ from an address of type _addr_. data MethodHandler err tid addr x = forall a b. MethodHandler { -- | Parse the query into a more specific type for this method. methodParse :: x -> Either err a -- | Serialize the response for transmission, given a context /ctx/ and the origin -- and destination addresses. , methodSerialize :: tid -> addr -> addr -> b -> x -- | Fully typed action to perform upon the query. The remote origin -- address of the query is provided to the handler. , methodAction :: addr -> a -> IO b } -- | See also 'IsUnsolicited' which likely makes this constructor unnecessary. | forall a. NoReply { -- | Parse the query into a more specific type for this method. methodParse :: x -> Either err a -- | Fully typed action to perform upon the query. The remote origin -- address of the query is provided to the handler. , noreplyAction :: addr -> a -> IO () } contramapAddr :: (a -> b) -> MethodHandler err tid b x -> MethodHandler err tid a x contramapAddr f (MethodHandler p s a) = MethodHandler p (\tid src dst result -> s tid (f src) (f dst) result) (\addr arg -> a (f addr) arg) contramapAddr f (NoReply p a) = NoReply p (\addr arg -> a (f addr) arg) -- | Attempt to invoke a 'MethodHandler' upon a given inbound query. If the -- parse is successful, the returned IO action will construct our reply if -- there is one. Otherwise, a parse err is returned. dispatchQuery :: MethodHandler err tid addr x -- ^ Handler to invoke. -> tid -- ^ The transaction id for this query\/response session. -> addr -- ^ Our own address, to which the query was sent. -> x -- ^ The query packet. -> addr -- ^ The origin address of the query. -> Either err (IO (Maybe x)) dispatchQuery (MethodHandler unwrapQ wrapR f) tid self x addr = fmap (\a -> Just . wrapR tid self addr <$> f addr a) $ unwrapQ x dispatchQuery (NoReply unwrapQ f) tid self x addr = fmap (\a -> f addr a >> return Nothing) $ unwrapQ x -- | These four parameters are required to implement an outgoing query. A -- peer-to-peer algorithm will define a 'MethodSerializer' for every 'MethodHandler' that -- might be returned by 'lookupHandler'. data MethodSerializer tid addr x meth a b = MethodSerializer { -- | Returns the microseconds to wait for a response to this query being -- sent to the given address. The /addr/ may also be modified to add -- routing information. methodTimeout :: tid -> addr -> STM (addr,Int) -- | A method identifier used for error reporting. This needn't be the -- same as the /meth/ argument to 'MethodHandler', but it is suggested. , method :: meth -- | Serialize the outgoing query /a/ into a transmittable packet /x/. -- The /addr/ arguments are, respectively, our own origin address and the -- destination of the request. The /tid/ argument is useful for attaching -- auxiliary notations on all outgoing packets. , wrapQuery :: tid -> addr -> addr -> a -> x -- | Parse an inbound packet /x/ into a response /b/ for this query. , unwrapResponse :: x -> b } -- | To dispatch responses to our outbound queries, we require three -- primitives. See the 'transactionMethods' function to create these -- primitives out of a lookup table and a generator for transaction ids. -- -- The type variable /d/ is used to represent the current state of the -- transaction generator and the table of pending transactions. data TransactionMethods d tid addr x = TransactionMethods { -- | Before a query is sent, this function stores an 'MVar' to which the -- response will be written too. The returned /tid/ is a transaction id -- that can be used to forget the 'MVar' if the remote peer is not -- responding. dispatchRegister :: MVar x -> addr -> d -> STM (tid, d) -- | This method is invoked when an incoming packet /x/ indicates it is -- a response to the transaction with id /tid/. The returned IO action -- will write the packet to the correct 'MVar' thus completing the -- dispatch. , dispatchResponse :: tid -> x -> d -> STM (d, IO ()) -- | When a timeout interval elapses, this method is called to remove the -- transaction from the table. , dispatchCancel :: tid -> d -> STM d } -- | The standard lookup table methods for use as input to 'transactionMethods' -- in lieu of directly implementing 'TransactionMethods'. data TableMethods t tid = TableMethods { -- | Insert a new /tid/ entry into the transaction table. tblInsert :: forall a. tid -> a -> t a -> t a -- | Delete transaction /tid/ from the transaction table. , tblDelete :: forall a. tid -> t a -> t a -- | Lookup the value associated with transaction /tid/. , tblLookup :: forall a. tid -> t a -> Maybe a } -- | Methods for using 'Data.IntMap'. intMapMethods :: TableMethods IntMap Int intMapMethods = TableMethods IntMap.insert IntMap.delete IntMap.lookup -- | Methods for using 'Data.Word64Map'. w64MapMethods :: TableMethods Word64Map Word64 w64MapMethods = TableMethods W64Map.insert W64Map.delete W64Map.lookup -- | Methods for using 'Data.Map' mapMethods :: Ord tid => TableMethods (Map tid) tid mapMethods = TableMethods Map.insert Map.delete Map.lookup -- | Change the key type for a lookup table implementation. -- -- This can be used with 'intMapMethods' or 'mapMethods' to restrict lookups to -- only a part of the generated /tid/ value. This is useful for /tid/ types -- that are especially large due their use for other purposes, such as secure -- nonces for encryption. instance Contravariant (TableMethods t) where -- contramap :: (tid -> t1) -> TableMethods t t1 -> TableMethods t tid contramap f (TableMethods ins del lookup) = TableMethods (\k v t -> ins (f k) v t) (\k t -> del (f k) t) (\k t -> lookup (f k) t) -- | Construct 'TransactionMethods' methods out of 3 lookup table primitives and a -- function for generating unique transaction ids. transactionMethods :: TableMethods t tid -- ^ Table methods to lookup values by /tid/. -> (g -> (tid,g)) -- ^ Generate a new unique /tid/ value and update the generator state /g/. -> TransactionMethods (g,t (MVar x)) tid addr x transactionMethods (TableMethods insert delete lookup) generate = TransactionMethods { dispatchCancel = \tid (g,t) -> return (g, delete tid t) , dispatchRegister = \v _ (g,t) -> let (tid,g') = generate g t' = insert tid v t in return ( tid, (g',t') ) , dispatchResponse = \tid x (g,t) -> case lookup tid t of Just v -> let t' = delete tid t in return ((g,t'),void $ tryPutMVar v x) Nothing -> return ((g,t), return ()) } -- | A set of methods necessary for dispatching incoming packets. data DispatchMethods tbl err meth tid addr x = DispatchMethods { -- | Classify an inbound packet as a query or response. classifyInbound :: x -> MessageClass err meth tid addr x -- | Lookup the handler for a inbound query. , lookupHandler :: meth -> Maybe (MethodHandler err tid addr x) -- | Methods for handling incoming responses. , tableMethods :: TransactionMethods tbl tid addr x } -- | These methods indicate what should be done upon various conditions. Write -- to a log file, make debug prints, or simply ignore them. -- -- [ /addr/ ] Address of remote peer. -- -- [ /x/ ] Incoming or outgoing packet. -- -- [ /meth/ ] Method id of incoming or outgoing request. -- -- [ /tid/ ] Transaction id for outgoing packet. -- -- [ /err/ ] Error information, typically a 'String'. data ErrorReporter addr x meth tid err = ErrorReporter { -- | Incoming: failed to parse packet. reportParseError :: err -> IO () -- | Incoming: no handler for request. , reportMissingHandler :: meth -> addr -> x -> IO () -- | Incoming: unable to identify request. , reportUnknown :: addr -> x -> err -> IO () -- | Outgoing: remote peer is not responding. , reportTimeout :: meth -> tid -> addr -> IO () } ignoreErrors :: ErrorReporter addr x meth tid err ignoreErrors = ErrorReporter { reportParseError = \_ -> return () , reportMissingHandler = \_ _ _ -> return () , reportUnknown = \_ _ _ -> return () , reportTimeout = \_ _ _ -> return () } logErrors :: ( Show addr , Show meth ) => ErrorReporter addr x meth tid String logErrors = ErrorReporter { reportParseError = \err -> dput XMisc err , reportMissingHandler = \meth addr x -> dput XMisc $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> dput XMisc $ show addr ++ " --> " ++ err , reportTimeout = \meth tid addr -> dput XMisc $ show addr ++ " --> Timeout ("++show meth++")" } printErrors :: ( Show addr , Show meth ) => Handle -> ErrorReporter addr x meth tid String printErrors h = ErrorReporter { reportParseError = \err -> hPutStrLn h err , reportMissingHandler = \meth addr x -> hPutStrLn h $ show addr ++ " --> Missing handler ("++show meth++")" , reportUnknown = \addr x err -> hPutStrLn h $ show addr ++ " --> " ++ err , reportTimeout = \meth tid addr -> hPutStrLn h $ show addr ++ " --> Timeout ("++show meth++")" } -- Change the /err/ type for an 'ErrorReporter'. instance Contravariant (ErrorReporter addr x meth tid) where -- contramap :: (t5 -> t4) -> ErrorReporter t3 t2 t1 t t4 -> ErrorReporter t3 t2 t1 t t5 contramap f (ErrorReporter pe mh unk tim) = ErrorReporter (\e -> pe (f e)) mh (\addr x e -> unk addr x (f e)) tim -- | Handle a single inbound packet and then invoke the given continuation. -- The 'forkListener' function is implemented by passing this function to 'fix' -- in a forked thread that loops until 'awaitMessage' returns 'Nothing' or -- throws an exception. handleMessage :: Client err meth tid addr x -> addr -> x -> IO (Maybe (x -> x)) handleMessage (Client net d err pending whoami responseID _ _) addr plain = do -- Just (Left e) -> do reportParseError err e -- return $! Just id -- Just (Right (plain, addr)) -> do case classifyInbound d plain of IsQuery meth tid -> case lookupHandler d meth of Nothing -> do reportMissingHandler err meth addr plain return $! Just id Just m -> do self <- whoami (Just addr) tid' <- responseID tid either (\e -> do reportParseError err e return $! Just id) (>>= \m -> do mapM_ (sendMessage net addr) m return $! Nothing) (dispatchQuery m tid' self plain addr) IsUnsolicited action -> do self <- whoami (Just addr) action self addr return Nothing IsResponse tid -> do action <- atomically $ do ts0 <- readTVar pending (ts, action) <- dispatchResponse (tableMethods d) tid plain ts0 writeTVar pending ts return action action return $! Nothing IsUnknown e -> do reportUnknown err addr plain e return $! Just id -- Nothing -> return $! id -- * UDP Datagrams. -- | Access the address family of a given 'SockAddr'. This convenient accessor -- is missing from 'Network.Socket', so I implemented it here. sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX sockAddrFamily (SockAddrCan _ ) = AF_CAN -- | Packets with an empty payload may trigger EOF exception. -- 'udpTransport' uses this function to avoid throwing in that -- case. ignoreEOF :: a -> IOError -> IO a ignoreEOF def e | isEOFError e = pure def | otherwise = throwIO e -- | Hard-coded maximum packet size for incoming UDP Packets received via -- 'udpTransport'. udpBufferSize :: Int udpBufferSize = 65536 -- | Wrapper around 'B.sendTo' that silently ignores DoesNotExistError. saferSendTo :: Socket -> ByteString -> SockAddr -> IO () saferSendTo sock bs saddr = void (B.sendTo sock bs saddr) `catch` \e -> -- sendTo: does not exist (Network is unreachable) -- Occurs when IPv6 or IPv4 network is not available. -- Currently, we require -threaded to prevent a forever-hang in this case. if isDoesNotExistError e then return () else throw e -- | A 'udpTransport' uses a UDP socket to send and receive 'ByteString's. The -- argument is the listen-address for incoming packets. This is a useful -- low-level 'Transport' that can be transformed for higher-level protocols -- using 'layerTransport'. udpTransport :: SockAddr -> IO (Transport err SockAddr ByteString) udpTransport bind_address = fst <$> udpTransport' bind_address -- | Like 'udpTransport' except also returns the raw socket (for broadcast use). udpTransport' :: SockAddr -> IO (Transport err SockAddr ByteString, Socket) udpTransport' bind_address = do let family = sockAddrFamily bind_address sock <- socket family Datagram defaultProtocol when (family == AF_INET6) $ do setSocketOption sock IPv6Only 0 setSocketOption sock Broadcast 1 bind sock bind_address let tr = Transport { awaitMessage = \kont -> do r <- handle (ignoreEOF $ Just $ Right (B.empty, SockAddrInet 0 0)) $ do Just . Right <$!> B.recvFrom sock udpBufferSize kont $! r , sendMessage = case family of AF_INET6 -> \case (SockAddrInet port addr) -> \bs -> -- Change IPv4 to 4mapped6 address. saferSendTo sock bs $ SockAddrInet6 port 0 (0,0,0x0000ffff,fromBE32 addr) 0 addr6 -> \bs -> saferSendTo sock bs addr6 AF_INET -> \case (SockAddrInet6 port 0 (0,0,0x0000ffff,raw4) 0) -> \bs -> do let host4 = toBE32 raw4 -- Change 4mapped6 to ordinary IPv4. -- dput XMisc $ "4mapped6 -> "++show (SockAddrInet port host4) saferSendTo sock bs (SockAddrInet port host4) addr@(SockAddrInet6 {}) -> \bs -> dput XMisc ("Discarding packet to "++show addr) addr4 -> \bs -> saferSendTo sock bs addr4 _ -> \addr bs -> saferSendTo sock bs addr , closeTransport = close sock } return (tr, sock) serializeClient :: Client err meth tid addr x -> IO (Client err meth tid addr x) serializeClient c = do mvar <- newMVar () return $ c { clientEnterQuery = \tid -> takeMVar mvar , clientLeaveQuery = \tid didRespond -> putMVar mvar () } retardSend :: Int -> Client err meth tid addr x -> IO (Client err meth tid addr x) retardSend micros client = do mvar <- newMVar () :: IO (MVar ()) return client { clientEnterQuery = \tid -> do takeMVar mvar threadDelay micros putMVar mvar () }