-- | -- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides safe remote procedure call. One important -- point is exceptions and errors, so to be able handle them -- properly we need to investigate a bit about how this all works. -- Internally, in order to make method invokation KRPC makes the -- following steps: -- -- * Caller serialize arguments to bencoded bytestrings; -- -- * Caller send bytestring data over UDP to the callee; -- -- * Callee receive and decode arguments to the method and method -- name. If it can't decode then it send 'ProtocolError' back to the -- caller; -- -- * Callee search for the @method name@ in the method table. -- If it not present in the table then callee send 'MethodUnknown' -- back to the caller; -- -- * Callee check if argument names match. If not it send -- 'ProtocolError' back; -- -- * Callee make the actuall call to the plain old haskell -- function. If the function throw exception then callee send -- 'ServerError' back. -- -- * Callee serialize result of the function to bencoded bytestring. -- -- * Callee encode result to bencoded bytestring and send it back -- to the caller. -- -- * Caller check if return values names match with the signature -- it called in the first step. -- -- * Caller extracts results and finally return results of the -- procedure call as ordinary haskell values. -- -- If every other error occurred then caller get the -- 'GenericError'. All errors returned by callee are throwed as -- ordinary haskell exceptions at caller side. Also note that both -- caller and callee use plain UDP, so KRPC is unreliable. -- -- For async 'query' use @async@ package. -- -- For protocol details see "Network.DatagramServer.Mainline" module. -- {-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DefaultSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE KindSignatures #-} module Network.DatagramServer ( -- ** Query QueryFailure (..) , query , query' , queryRaw , getQueryCount -- ** Handler , HandlerFailure (..) , Handler , handler -- * Manager , MonadKRPC (..) , Options (..) , def , Manager , newManager , closeManager , withManager , isActive , listen , Protocol(..) -- * Re-exports , ErrorCode (..) , SockAddr (..) ) where import Data.Default.Class import Network.Socket (SockAddr (..)) import Control.Applicative #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Control.Exception hiding (Handler) import qualified Control.Exception.Lifted as E (Handler (..)) import Control.Exception.Lifted as Lifted (catches, finally) import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control #ifdef VERSION_bencoding #else -- import qualified Network.DatagramServer.Tox as Tox #endif import qualified Data.ByteString.Base16 as Base16 import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.Default.Class import Data.IORef import Data.List as L import Data.Map as M import Data.Monoid import Data.Serialize as S import Data.Text as T import Data.Text.Encoding as T import Data.Tuple import Data.Typeable import Network.DatagramServer.Types import Network.Socket hiding (listen) import Network.Socket.ByteString as BS import System.IO.Error import System.Timeout {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} -- | RPC manager options. data Options = Options { -- | Initial 'TransactionId' incremented with each 'query'; optSeedTransaction :: {-# UNPACK #-} !Int -- | Time to wait for response from remote node, in seconds. , optQueryTimeout :: {-# UNPACK #-} !Int -- | Maximum number of bytes to receive. , optMaxMsgSize :: {-# UNPACK #-} !Int } deriving (Show, Eq) defaultSeedTransaction :: Int defaultSeedTransaction = 0 defaultQueryTimeout :: Int defaultQueryTimeout = 120 defaultMaxMsgSize :: Int defaultMaxMsgSize = 64 * 1024 -- | Permissive defaults. instance Default Options where def = Options { optSeedTransaction = defaultSeedTransaction , optQueryTimeout = defaultQueryTimeout , optMaxMsgSize = defaultMaxMsgSize } validateOptions :: Options -> IO () validateOptions Options {..} | optQueryTimeout < 1 = throwIO (userError "krpc: non-positive query timeout") | optMaxMsgSize < 1 = throwIO (userError "krpc: non-positive buffer size") | otherwise = return () {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response type TransactionCounter = IORef Int type CallId msg = (TransactionID msg, SockAddr) type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response) type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw)) type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) -- | Handler is a function which will be invoked then some /remote/ -- node querying /this/ node. type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) -- | Keep track pending queries made by /this/ node and handle queries -- made by /remote/ nodes. data Manager h raw msg = Manager { sock :: !Socket , options :: !Options , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) , handlers :: [Handler h msg raw] } -- | A monad which can perform or handle queries. class (MonadBaseControl IO m, MonadLogger m, MonadIO m) => MonadKRPC h m raw msg | m -> h, m -> raw, m -> msg where -- | Ask for manager. getManager :: m (Manager h raw msg) default getManager :: MonadReader (Manager h raw msg) m => m (Manager h raw msg) getManager = ask -- | Can be used to add logging for instance. liftHandler :: h a -> m a default liftHandler :: m a -> m a liftHandler = id instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) => MonadKRPC h (ReaderT (Manager h raw msg) h) raw msg where liftHandler = lift sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX sockAddrFamily (SockAddrCan _ ) = AF_CAN -- | Bind socket to the specified address. To enable query handling -- run 'listen'. newManager :: Options -- ^ various protocol options; -> SockAddr -- ^ address to listen on; -> [Handler h msg raw] -- ^ handlers to run on incoming queries. -> IO (Manager h raw msg) -- ^ new rpc manager. newManager opts @ Options {..} servAddr handlers = do validateOptions opts sock <- bindServ tref <- newEmptyMVar tran <- newIORef optSeedTransaction calls <- newIORef M.empty return $ Manager sock opts tref tran calls handlers where bindServ = do let family = sockAddrFamily servAddr sock <- socket family Datagram defaultProtocol when (family == AF_INET6) $ do setSocketOption sock IPv6Only 0 bindSocket sock servAddr return sock -- | Unblock all pending calls and close socket. closeManager :: Manager m raw msg -> IO () closeManager Manager {..} = do maybe (return ()) killThread =<< tryTakeMVar listenerThread -- TODO unblock calls close sock -- | Check if the manager is still active. Manager becomes active -- until 'closeManager' called. isActive :: Manager m raw msg -> IO Bool isActive Manager {..} = liftIO $ isBound sock {-# INLINE isActive #-} -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. withManager :: Options -> SockAddr -> [Handler h msg raw] -> (Manager h raw msg -> IO a) -> IO a withManager opts addr hs = bracket (newManager opts addr hs) closeManager {----------------------------------------------------------------------- -- Logging -----------------------------------------------------------------------} -- TODO prettify log messages querySignature :: ( Show ( QueryMethod msg ) , Serialize ( TransactionID msg ) ) => QueryMethod msg -> TransactionID msg -> SockAddr -> Text querySignature name transaction addr = T.concat [ "&", T.pack (show name) , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) , " @", T.pack (show addr) ] {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} -- we don't need to know about TransactionId while performing query, -- so we introduce QueryFailure exceptions -- | Used to signal 'query' errors. data QueryFailure = SendFailed -- ^ unable to send query; | QueryFailed ErrorCode Text -- ^ remote node return error; | TimeoutExpired -- ^ remote node not responding. deriving (Show, Eq, Typeable) instance Exception QueryFailure sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock [a] addr genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg) genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) uniqueTransactionId cur -- | How many times 'query' call have been performed. getQueryCount :: MonadKRPC h m raw msg => m Int getQueryCount = do Manager {..} <- getManager curTrans <- liftIO $ readIORef transactionCounter return $ curTrans - optSeedTransaction options registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw) registerQuery cid ref = do ares <- newEmptyMVar atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares -- simultaneous M.lookup and M.delete guarantees that we never get two -- or more responses to the same query unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw)) unregisterQuery cid ref = do atomicModifyIORef' ref $ swap . M.updateLookupWithKey (const (const Nothing)) cid -- (sendmsg EINVAL) sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr q where sockError :: IOError -> IO () sockError _ = throwIO SendFailed -- | Enqueue query to the given node. -- -- This function should throw 'QueryFailure' exception if quered node -- respond with @error@ message or the query timeout expires. -- query :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m b query meth addr params = queryK meth addr params (\_ x _ -> x) -- | Like 'query' but possibly returns your externally routable IP address. query' :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m (b, Maybe ReflectedIP) query' meth addr params = queryK meth addr params (const (,)) -- | Enqueue a query, but give us the complete BEncoded content sent by the -- remote Node. This is useful for handling extensions that this library does -- not otherwise support. queryRaw :: forall h m a b raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> m (b, raw) queryRaw meth addr params = queryK meth addr params (\raw x _ -> (x,raw)) queryK :: forall h m a b x raw msg. (SerializableTo raw b, Show (QueryMethod msg), Ord (TransactionID msg), Serialize (TransactionID msg), SerializableTo raw a, MonadKRPC h m raw msg, WireFormat raw msg) => QueryMethod msg -> SockAddr -> a -> (raw -> b -> Maybe ReflectedIP -> x) -> m x queryK meth addr params kont = do Manager {..} <- getManager tid <- liftIO $ genTransactionId transactionCounter -- let queryMethod = method :: Method a b let signature = querySignature meth tid addr $(logDebugS) "query.sending" signature mres <- liftIO $ do ares <- registerQuery (tid, addr) pendingCalls let cli = error "TODO TOX client node id" ctx = error "TODO TOX ToxCipherContext or () for Mainline" q <- buildQuery cli addr meth tid params let qb = encodePayload (q :: msg a) :: msg raw qbs = encodeHeaders ctx qb #ifdef VERSION_bencoding #else let q = Tox.Message (methodName queryMethod) cli tid params cli = error "TODO TOX client node id" ctx = error "TODO TOX ToxCipherContext" qb = encodePayload q :: Tox.Message BC.ByteString qbs = encodeHeaders ctx qb :: BC.ByteString #endif sendQuery sock addr qbs `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) case res of Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) Right m -> case decodePayload m of Right r -> case envelopeClass (r :: msg b) of Response reflectedAddr -> pure $ kont raw (envelopePayload r) reflectedAddr Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary? Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE" Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) case mres of Just res -> do $(logDebugS) "query.responded" $ signature return res Nothing -> do _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls $(logWarnS) "query.not_responding" $ signature <> " for " <> T.pack (show (optQueryTimeout options)) <> " seconds" throw $ TimeoutExpired {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} -- we already throw: -- -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; -- -- * ErrorCode(ServerError) in the 'runHandler'; -- -- * ErrorCode(GenericError) in the 'runHandler' (those can be -- async exception too) -- -- so HandlerFailure should cover *only* 'ProtocolError's. -- | Used to signal protocol errors. data HandlerFailure = BadAddress -- ^ for e.g.: node calls herself; | InvalidParameter Text -- ^ for e.g.: bad session token. deriving (Show, Eq, Typeable) instance Exception HandlerFailure prettyHF :: HandlerFailure -> BS.ByteString prettyHF BadAddress = T.encodeUtf8 "bad address" prettyHF (InvalidParameter reason) = T.encodeUtf8 $ "invalid parameter: " <> reason prettyQF :: QueryFailure -> BS.ByteString prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " <> T.pack (show e) -- | Make handler from handler function. Any thrown exception will be -- supressed and send over the wire back to the querying node. -- -- If the handler make some 'query' normally it /should/ handle -- corresponding 'QueryFailure's. -- handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) => QueryMethod msg -> (SockAddr -> a -> h b) -> Handler h msg raw handler name body = (name, wrapper) where wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) wrapper addr args = case decodePayload args of Left e -> pure $ Left e Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) runHandler :: ( MonadKRPC h m raw msg , Envelope msg , Show (QueryMethod msg) , Serialize (TransactionID msg)) => QueryMethod msg -> HandlerBody h msg raw -> SockAddr -> msg raw -> m (KResult msg raw) runHandler meth h addr m = Lifted.catches wrapper failbacks where signature = querySignature meth (envelopeTransaction m) addr wrapper = do $(logDebugS) "handler.quered" signature result <- liftHandler (h addr m) case result of Left msg -> do $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m) Right a -> do -- KQueryArgs $(logDebugS) "handler.success" signature return $ Right a failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do $(logDebugS) "handler.failed" signature return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m) -- may happen if handler makes query and fail , E.Handler $ \ (e :: QueryFailure) -> do return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m) -- since handler thread exit after sendMessage we can safely -- suppress async exception here , E.Handler $ \ (e :: SomeException) -> do return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m) ] dispatchHandler :: ( MonadKRPC h m raw msg , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) , Envelope msg ) => QueryMethod msg -> msg raw -> SockAddr -> m (KResult msg raw) dispatchHandler meth q addr = do Manager {..} <- getManager case L.lookup meth handlers of Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) Just h -> runHandler meth h addr q {----------------------------------------------------------------------- -- Listener -----------------------------------------------------------------------} -- TODO bound amount of parallel handler *threads*: -- -- peer A flooding with find_node -- peer B trying to ping peer C -- peer B fork too many threads -- ... space leak -- handleQuery :: ( MonadKRPC h m raw msg , WireFormat raw msg , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => QueryMethod msg -> raw -> msg raw -> SockAddr -> m () handleQuery meth raw q addr = void $ fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" Manager {..} <- getManager res <- dispatchHandler meth q addr let res' = either buildError Just res ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline" resbs = fmap (encodeHeaders ctx) res' :: Maybe BS.ByteString -- TODO: Generalize this debug print. -- resbe = either toBEncode toBEncode res -- $(logOther "q") $ T.unlines -- [ either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) -- , "==>" -- , either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) -- ] maybe (return ()) (sendMessage sock addr) resbs handleResponse :: ( MonadKRPC h m raw msg , Ord (TransactionID msg) , Envelope msg ) => raw -> KResult msg raw -> SockAddr -> m () handleResponse raw result addr = do Manager {..} <- getManager liftIO $ do let resultId = either errorId envelopeTransaction result mcall <- unregisterQuery (resultId, addr) pendingCalls case mcall of Nothing -> return () Just ares -> putMVar ares (raw,result) data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw) , msgProxy :: !(Proxy msg) } listener :: forall h m raw msg. ( MonadKRPC h m raw msg , WireFormat raw msg , Ord (TransactionID msg) , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => Protocol raw msg -> m () listener p = do Manager {..} <- getManager fix $ \again -> do let ctx = error "TODO TOX ToxCipherContext or () for Mainline" (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of Left e -> -- XXX: Send parse failure message? -- liftIO $ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e) return () -- Without transaction id, error message isn't very useful. Right (raw,m) -> case envelopeClass m of Query meth -> handleQuery meth (raw::raw) m addr Response _ -> handleResponse raw (Right m) addr Error e -> handleResponse raw (Left e) addr again where exceptions :: IOError -> IO (BS.ByteString, SockAddr) exceptions e -- packets with empty payload may trigger eof exception | isEOFError e = return ("", SockAddrInet 0 0) | otherwise = throwIO e -- | Should be run before any 'query', otherwise they will never -- succeed. listen :: ( MonadKRPC h m raw msg , WireFormat raw msg , Ord (TransactionID msg) , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => Protocol raw msg -> m () listen p = do Manager {..} <- getManager tid <- fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.listen" listener p `Lifted.finally` liftIO (takeMVar listenerThread) liftIO $ putMVar listenerThread tid