-- | -- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- This module provides safe remote procedure call. One important -- point is exceptions and errors, so to be able handle them -- properly we need to investigate a bit about how this all works. -- Internally, in order to make method invokation KRPC makes the -- following steps: -- -- * Caller serialize arguments to bencoded bytestrings; -- -- * Caller send bytestring data over UDP to the callee; -- -- * Callee receive and decode arguments to the method and method -- name. If it can't decode then it send 'ProtocolError' back to the -- caller; -- -- * Callee search for the @method name@ in the method table. -- If it not present in the table then callee send 'MethodUnknown' -- back to the caller; -- -- * Callee check if argument names match. If not it send -- 'ProtocolError' back; -- -- * Callee make the actuall call to the plain old haskell -- function. If the function throw exception then callee send -- 'ServerError' back. -- -- * Callee serialize result of the function to bencoded bytestring. -- -- * Callee encode result to bencoded bytestring and send it back -- to the caller. -- -- * Caller check if return values names match with the signature -- it called in the first step. -- -- * Caller extracts results and finally return results of the -- procedure call as ordinary haskell values. -- -- If every other error occurred then caller get the -- 'GenericError'. All errors returned by callee are throwed as -- ordinary haskell exceptions at caller side. Also note that both -- caller and callee use plain UDP, so KRPC is unreliable. -- -- For async 'query' use @async@ package. -- -- For protocol details see "Network.DatagramServer.Mainline" module. -- {-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DefaultSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE KindSignatures #-} module Network.DatagramServer ( -- ** Query QueryFailure (..) , query , query' , queryRaw , getQueryCount -- ** Handler , HandlerFailure (..) , Handler , handler -- * Manager , Options (..) , def , Manager , newManager , closeManager -- , withManager , isActive , listen , Protocol(..) -- * Re-exports , ErrorCode (..) , SockAddr (..) ) where import Data.Default.Class import Network.Socket (SockAddr (..)) import Control.Applicative #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Control.Exception hiding (Handler) import qualified Control.Exception.Lifted as E (Handler (..)) import Control.Exception.Lifted as Lifted (catches, finally) import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control import qualified Data.ByteString.Base16 as Base16 import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.Default.Class import Data.IORef import Data.List as L import Data.Map as M import Data.Monoid import Data.Serialize as S import Data.Text as T import Data.Text.Encoding as T import Data.Tuple import Data.Typeable import Network.DatagramServer.Types import Network.Socket hiding (listen) import Network.Socket.ByteString as BS import System.IO.Error import System.Timeout import Network.KRPC.Method {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} -- | RPC manager options. data Options = Options { -- | Initial 'TransactionId' incremented with each 'query'; optSeedTransaction :: {-# UNPACK #-} !Int -- | Time to wait for response from remote node, in seconds. , optQueryTimeout :: {-# UNPACK #-} !Int -- | Maximum number of bytes to receive. , optMaxMsgSize :: {-# UNPACK #-} !Int } deriving (Show, Eq) defaultSeedTransaction :: Int defaultSeedTransaction = 0 defaultQueryTimeout :: Int defaultQueryTimeout = 120 defaultMaxMsgSize :: Int defaultMaxMsgSize = 64 * 1024 -- | Permissive defaults. instance Default Options where def = Options { optSeedTransaction = defaultSeedTransaction , optQueryTimeout = defaultQueryTimeout , optMaxMsgSize = defaultMaxMsgSize } validateOptions :: Options -> IO () validateOptions Options {..} | optQueryTimeout < 1 = throwIO (userError "krpc: non-positive query timeout") | optMaxMsgSize < 1 = throwIO (userError "krpc: non-positive buffer size") | otherwise = return () {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} type KResult msg raw = Either (KError (TransactionID msg)) (msg raw)-- Response type TransactionCounter = IORef Int type CallId msg = (TransactionID msg, SockAddr) type CallRes msg raw = MVar (raw, KResult msg raw) -- (raw response, decoded response) type PendingCalls msg raw = IORef (Map (CallId msg) (CallRes msg raw)) type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) -- | Handler is a function which will be invoked then some /remote/ -- node querying /this/ node. type Handler h msg v = (QueryMethod msg, HandlerBody h msg v) -- | Keep track pending queries made by /this/ node and handle queries -- made by /remote/ nodes. data Manager raw msg = Manager { sock :: !Socket , options :: !Options , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !(PendingCalls msg raw) -- , handlers :: [Handler h msg raw] -- TODO delete this, it's not used , logMsg :: Char -> String -> T.Text -> IO () } sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX sockAddrFamily (SockAddrCan _ ) = AF_CAN -- | Bind socket to the specified address. To enable query handling -- run 'listen'. newManager :: Options -- ^ various protocol options; -> (Char -> String -> T.Text -> IO ()) -- ^ loging function -> SockAddr -- ^ address to listen on; -> [Handler h msg raw] -- ^ handlers to run on incoming queries. -> IO (Manager raw msg) -- ^ new rpc manager. newManager opts @ Options {..} logmsg servAddr handlers = do validateOptions opts sock <- bindServ tref <- newEmptyMVar tran <- newIORef optSeedTransaction calls <- newIORef M.empty return $ Manager sock opts tref tran calls logmsg where bindServ = do let family = sockAddrFamily servAddr sock <- socket family Datagram defaultProtocol when (family == AF_INET6) $ do setSocketOption sock IPv6Only 0 bindSocket sock servAddr return sock -- | Unblock all pending calls and close socket. closeManager :: Manager raw msg -> IO () closeManager Manager {..} = do maybe (return ()) killThread =<< tryTakeMVar listenerThread -- TODO unblock calls close sock -- | Check if the manager is still active. Manager becomes active -- until 'closeManager' called. isActive :: Manager raw msg -> IO Bool isActive Manager {..} = liftIO $ isBound sock {-# INLINE isActive #-} #if 0 -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. withManager :: Options -> SockAddr -> [Handler h msg raw] -> (Manager raw msg -> IO a) -> IO a withManager opts addr hs = bracket (newManager opts addr hs) closeManager #endif {----------------------------------------------------------------------- -- Logging -----------------------------------------------------------------------} -- TODO prettify log messages querySignature :: ( Show ( QueryMethod msg ) , Serialize ( TransactionID msg ) ) => QueryMethod msg -> TransactionID msg -> SockAddr -> Text querySignature name transaction addr = T.concat [ "&", T.pack (show name) , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) , " @", T.pack (show addr) ] {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} -- we don't need to know about TransactionId while performing query, -- so we introduce QueryFailure exceptions -- | Used to signal 'query' errors. data QueryFailure = SendFailed -- ^ unable to send query; | QueryFailed ErrorCode Text -- ^ remote node return error; | TimeoutExpired -- ^ remote node not responding. deriving (Show, Eq, Typeable) instance Exception QueryFailure sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock [a] addr genTransactionId :: Envelope msg => TransactionCounter -> IO (TransactionID msg) genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) uniqueTransactionId cur -- | How many times 'query' call have been performed. getQueryCount :: Manager raw msg -> IO Int getQueryCount mgr@Manager{..} = do curTrans <- readIORef transactionCounter return $ curTrans - optSeedTransaction options registerQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (CallRes msg raw) registerQuery cid ref = do ares <- newEmptyMVar atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares -- simultaneous M.lookup and M.delete guarantees that we never get two -- or more responses to the same query unregisterQuery :: Ord (TransactionID msg) => CallId msg -> PendingCalls msg raw -> IO (Maybe (CallRes msg raw)) unregisterQuery cid ref = do atomicModifyIORef' ref $ swap . M.updateLookupWithKey (const (const Nothing)) cid -- (sendmsg EINVAL) sendQuery :: Socket -> SockAddr -> BC.ByteString -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr q where sockError :: IOError -> IO () sockError _ = throwIO SendFailed -- | Enqueue query to the given node. -- -- This function should throw 'QueryFailure' exception if quered node -- respond with @error@ message or the query timeout expires. -- query :: forall h a b raw msg. ( SerializableTo raw b , Show (QueryMethod msg) , Ord (TransactionID msg) , Serialize (TransactionID msg) , SerializableTo raw a , WireFormat raw msg , KRPC msg a b ) => Manager raw msg -> PacketDestination msg -> a -> IO b query mgr addr params = queryK mgr addr params (\_ x _ _ -> x) -- | Like 'query' but possibly returns your externally routable IP address. query' :: forall h a b raw msg. ( SerializableTo raw b , Show (QueryMethod msg) , Ord (TransactionID msg) , Serialize (TransactionID msg) , SerializableTo raw a , WireFormat raw msg , KRPC msg a b ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , NodeId msg, Maybe ReflectedIP) query' mgr addr params = queryK mgr addr params (\_ b nid ip -> (b,nid,ip)) -- | Enqueue a query, but give us the complete BEncoded content sent by the -- remote Node. This is useful for handling extensions that this library does -- not otherwise support. queryRaw :: forall h a b raw msg. ( SerializableTo raw b , Show (QueryMethod msg) , Ord (TransactionID msg) , Serialize (TransactionID msg) , SerializableTo raw a , WireFormat raw msg , KRPC msg a b ) => Manager raw msg -> PacketDestination msg -> a -> IO (b , raw) queryRaw mgr addr params = queryK mgr addr params (\raw x _ _ -> (x,raw)) queryK :: forall h a b x raw msg. ( SerializableTo raw b , SerializableTo raw a , WireFormat raw msg , Show (QueryMethod msg) , Ord (TransactionID msg) , Serialize (TransactionID msg) , KRPC msg a b ) => Manager raw msg -> PacketDestination msg -> a -> (raw -> b -> NodeId msg -> Maybe ReflectedIP -> x) -> IO x queryK mgr@Manager{..} dest params kont = do tid <- liftIO $ genTransactionId transactionCounter let addr = toSockAddr dest Method meth = method :: Method msg a b signature = querySignature meth tid addr logMsg 'D' "query.sending" signature mres <- liftIO $ do ares <- registerQuery (tid, addr) pendingCalls let cli = error "TODO TOX client node id" ctx = error "TODO TOX ToxCipherContext or () for Mainline" q <- buildQuery cli addr meth tid params let qb = encodePayload (q :: msg a) :: msg raw qbs = encodeHeaders ctx qb dest sendQuery sock addr qbs `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do fix $ \loop -> do (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) case res of Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) Right m -> case decodePayload m of Right r -> case envelopeClass (r :: msg b) of Response reflectedAddr | validateExchange q r -> do let remoteId = messageResponder (Proxy :: Proxy a) r return $ kont raw (envelopePayload r) remoteId reflectedAddr | otherwise -> loop Error (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) -- XXX neccessary? Query _ -> throwIO $ QueryFailed ProtocolError "BUG!! UNREACHABLE" Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) case mres of Just res -> do logMsg 'D' "query.responded" $ signature return res Nothing -> do _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls logMsg 'W' "query.not_responding" $ signature <> " for " <> T.pack (show (optQueryTimeout options)) <> " seconds" throw $ TimeoutExpired {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} -- we already throw: -- -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; -- -- * ErrorCode(ServerError) in the 'runHandler'; -- -- * ErrorCode(GenericError) in the 'runHandler' (those can be -- async exception too) -- -- so HandlerFailure should cover *only* 'ProtocolError's. -- | Used to signal protocol errors. data HandlerFailure = BadAddress -- ^ for e.g.: node calls herself; | InvalidParameter Text -- ^ for e.g.: bad session token. deriving (Show, Eq, Typeable) instance Exception HandlerFailure prettyHF :: HandlerFailure -> BS.ByteString prettyHF BadAddress = T.encodeUtf8 "bad address" prettyHF (InvalidParameter reason) = T.encodeUtf8 $ "invalid parameter: " <> reason prettyQF :: QueryFailure -> BS.ByteString prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " <> T.pack (show e) -- | Make handler from handler function. Any thrown exception will be -- supressed and send over the wire back to the querying node. -- -- If the handler make some 'query' normally it /should/ handle -- corresponding 'QueryFailure's. -- handler :: forall h a b msg raw. (Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) => QueryMethod msg -> (SockAddr -> msg a -> h b) -> Handler h msg raw handler name body = (name, wrapper) where wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) wrapper addr args = case decodePayload args of Left e -> pure $ Left e Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr a runHandler :: ( Envelope msg , Show (QueryMethod msg) , Serialize (TransactionID msg)) => Manager raw msg -> QueryMethod msg -> HandlerBody IO msg raw -> SockAddr -> msg raw -> IO (KResult msg raw) runHandler mgr@Manager{..} meth h addr m = Lifted.catches wrapper failbacks where signature = querySignature meth (envelopeTransaction m) addr wrapper = do logMsg 'D' "handler.quered" signature result <- h addr m case result of Left msg -> do logMsg 'D' "handler.bad_query" $ signature <> " !" <> T.pack msg return $ Left $ KError ProtocolError (BC.pack msg) (envelopeTransaction m) Right a -> do -- KQueryArgs logMsg 'D' "handler.success" signature return $ Right a failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do logMsg 'D' "handler.HandlerFailure" signature return $ Left $ KError ProtocolError (prettyHF e) (envelopeTransaction m) -- may happen if handler makes query and fail , E.Handler $ \ (e :: QueryFailure) -> do logMsg 'D' "handler.QueryFailure" signature return $ Left $ KError ServerError (prettyQF e) (envelopeTransaction m) -- since handler thread exit after sendMessage we can safely -- suppress async exception here , E.Handler $ \ (e :: SomeException) -> do logMsg 'D' "handler.SomeException" (signature <> T.pack (" "++show e)) return $ Left $ KError GenericError (BC.pack (show e)) (envelopeTransaction m) ] dispatchHandler :: ( Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) , Envelope msg ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> msg raw -> SockAddr -> IO (KResult msg raw) dispatchHandler mgr handlers meth q addr = do case L.lookup meth handlers of Nothing -> return $ Left $ KError MethodUnknown ("Unknown method " <> BC.pack (show meth)) (envelopeTransaction q) Just h -> runHandler mgr meth h addr q {----------------------------------------------------------------------- -- Listener -----------------------------------------------------------------------} -- TODO bound amount of parallel handler *threads*: -- -- peer A flooding with find_node -- peer B trying to ping peer C -- peer B fork too many threads -- ... space leak -- handleQuery :: ( WireFormat raw msg , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => Manager raw msg -> [Handler IO msg raw] -> QueryMethod msg -> raw -> msg raw -> SockAddr -> IO () handleQuery mgr@Manager{..} hs meth raw q addr = void $ fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" res <- dispatchHandler mgr hs meth q addr let res' = either buildError Just res ctx = error "TODO TOX ToxCipherContext 2 or () for Mainline" dest = makeAddress (Right q) addr resbs = fmap (\raw -> encodeHeaders ctx raw dest) res' :: Maybe BS.ByteString -- TODO: Generalize this debug print. -- resbe = either toBEncode toBEncode res -- .(logOther "q") \$ T.unlines -- [ either (const "") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode raw) -- , "==>" -- , either (const "") id \$ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) -- ] maybe (return ()) (sendMessage sock addr) resbs handleResponse :: ( Ord (TransactionID msg) , Envelope msg ) => Manager raw msg -> raw -> KResult msg raw -> SockAddr -> IO () handleResponse mgr@Manager{..} raw result addr = do liftIO $ do let resultId = either errorId envelopeTransaction result mcall <- unregisterQuery (resultId, addr) pendingCalls case mcall of Nothing -> return () Just ares -> putMVar ares (raw,result) data Protocol raw (msg :: * -> *) = Protocol { rawProxy :: !(Proxy raw) , msgProxy :: !(Proxy msg) } listener :: forall raw msg. ( WireFormat raw msg , Ord (TransactionID msg) , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () listener mgr@Manager{..} hs p = do fix $ \again -> do let ctx = error "TODO TOX ToxCipherContext or () for Mainline" (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) case parsePacket (msgProxy p) bs >>= \r -> (,) r <$> decodeHeaders ctx r of Left e -> -- XXX: Send parse failure message? -- liftIO \$ sendMessage sock addr $ encodeHeaders ctx (unknownMessage e) return () -- Without transaction id, error message isn't very useful. Right (raw,m) -> case envelopeClass m of Query meth -> handleQuery mgr hs meth (raw::raw) m addr Response _ -> handleResponse mgr raw (Right m) addr Error e -> handleResponse mgr raw (Left e) addr again where exceptions :: IOError -> IO (BS.ByteString, SockAddr) exceptions e -- packets with empty payload may trigger eof exception | isEOFError e = return ("", SockAddrInet 0 0) | otherwise = throwIO e -- | Should be run before any 'query', otherwise they will never -- succeed. listen :: ( WireFormat raw msg , Ord (TransactionID msg) , Eq (QueryMethod msg) , Show (QueryMethod msg) , Serialize (TransactionID msg) ) => Manager raw msg -> [Handler IO msg raw] -> Protocol raw msg -> IO () listen mgr@Manager{..} hs p = do tid <- fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.listen" listener mgr hs p `Lifted.finally` liftIO (takeMVar listenerThread) liftIO $ putMVar listenerThread tid