-- | -- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- Portability : portable -- -- Normally, you don't need to import this module. -- {-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DefaultSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE TemplateHaskell #-} module Network.KRPC.Manager ( -- * Manager MonadKRPC (..) , Options (..) , Manager , newManager , closeManager , withManager , isActive , listen -- * Queries , QueryFailure (..) , query , query' , queryRaw , getQueryCount -- * Handlers , HandlerFailure (..) , Handler , handler ) where import Control.Applicative #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import GHC.Conc (labelThread) import Control.Concurrent.Lifted #endif import Control.Exception hiding (Handler) import qualified Control.Exception.Lifted as E (Handler (..)) import Control.Exception.Lifted as Lifted (catches, finally) import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control #ifdef VERSION_bencoding import Data.BEncode as BE import Data.BEncode.Internal as BE import Data.BEncode.Pretty (showBEncode) #else import qualified Data.Tox as Tox #endif import qualified Data.ByteString.Base16 as Base16 import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.Default.Class import Data.IORef import Data.List as L import Data.Map as M import Data.Monoid import Data.Serialize as S import Data.Text as T import Data.Text.Encoding as T import Data.Tuple import Data.Typeable import Network.RPC import Network.KRPC.Message import Network.KRPC.Method hiding (Envelope) import qualified Network.KRPC.Method as KRPC (Envelope) import Network.Socket hiding (listen) import Network.Socket.ByteString as BS import System.IO.Error import System.Timeout import Network.DHT.Mainline {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} -- | RPC manager options. data Options = Options { -- | Initial 'TransactionId' incremented with each 'query'; optSeedTransaction :: {-# UNPACK #-} !Int -- | Time to wait for response from remote node, in seconds. , optQueryTimeout :: {-# UNPACK #-} !Int -- | Maximum number of bytes to receive. , optMaxMsgSize :: {-# UNPACK #-} !Int } deriving (Show, Eq) defaultSeedTransaction :: Int defaultSeedTransaction = 0 defaultQueryTimeout :: Int defaultQueryTimeout = 120 defaultMaxMsgSize :: Int defaultMaxMsgSize = 64 * 1024 -- | Permissive defaults. instance Default Options where def = Options { optSeedTransaction = defaultSeedTransaction , optQueryTimeout = defaultQueryTimeout , optMaxMsgSize = defaultMaxMsgSize } validateOptions :: Options -> IO () validateOptions Options {..} | optQueryTimeout < 1 = throwIO (userError "krpc: non-positive query timeout") | optMaxMsgSize < 1 = throwIO (userError "krpc: non-positive buffer size") | otherwise = return () {----------------------------------------------------------------------- -- Options -----------------------------------------------------------------------} type KResult = Either KError KMessage -- Response type TransactionCounter = IORef Int type CallId = (TransactionId, SockAddr) type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) type PendingCalls = IORef (Map CallId CallRes) type HandlerBody h msg v = SockAddr -> msg v -> h (Either String (msg v)) -- | Handler is a function which will be invoked then some /remote/ -- node querying /this/ node. type Handler h msg v = (MethodName, HandlerBody h msg v) -- | Keep track pending queries made by /this/ node and handle queries -- made by /remote/ nodes. data Manager h = Manager { sock :: !Socket , options :: !Options , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls #ifdef VERSION_bencoding , handlers :: [Handler h KMessageOf BValue] #else , handlers :: [Handler h KMessageOf BC.ByteString] #endif } -- | A monad which can perform or handle queries. class (MonadBaseControl IO m, MonadLogger m, MonadIO m) => MonadKRPC h m | m -> h where -- | Ask for manager. getManager :: m (Manager h) default getManager :: MonadReader (Manager h) m => m (Manager h) getManager = ask -- | Can be used to add logging for instance. liftHandler :: h a -> m a default liftHandler :: m a -> m a liftHandler = id instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) => MonadKRPC h (ReaderT (Manager h) h) where liftHandler = lift sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX sockAddrFamily (SockAddrCan _ ) = AF_CAN -- | Bind socket to the specified address. To enable query handling -- run 'listen'. newManager :: Options -- ^ various protocol options; -> SockAddr -- ^ address to listen on; #ifdef VERSION_bencoding -> [Handler h KMessageOf BValue] -- ^ handlers to run on incoming queries. #else -> [Handler h KMessageOf BC.ByteString] -- ^ handlers to run on incoming queries. #endif -> IO (Manager h) -- ^ new rpc manager. newManager opts @ Options {..} servAddr handlers = do validateOptions opts sock <- bindServ tref <- newEmptyMVar tran <- newIORef optSeedTransaction calls <- newIORef M.empty return $ Manager sock opts tref tran calls handlers where bindServ = do let family = sockAddrFamily servAddr sock <- socket family Datagram defaultProtocol when (family == AF_INET6) $ do setSocketOption sock IPv6Only 0 bindSocket sock servAddr return sock -- | Unblock all pending calls and close socket. closeManager :: Manager m -> IO () closeManager Manager {..} = do maybe (return ()) killThread =<< tryTakeMVar listenerThread -- TODO unblock calls close sock -- | Check if the manager is still active. Manager becomes active -- until 'closeManager' called. isActive :: Manager m -> IO Bool isActive Manager {..} = liftIO $ isBound sock {-# INLINE isActive #-} -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. #ifdef VERSION_bencoding withManager :: Options -> SockAddr -> [Handler h KMessageOf BValue] #else withManager :: Options -> SockAddr -> [Handler h KMessageOf BC.ByteString] #endif -> (Manager h -> IO a) -> IO a withManager opts addr hs = bracket (newManager opts addr hs) closeManager {----------------------------------------------------------------------- -- Logging -----------------------------------------------------------------------} -- TODO prettify log messages querySignature :: MethodName -> TransactionId -> SockAddr -> Text querySignature name transaction addr = T.concat #ifdef VERSION_bencoding [ "&", T.decodeUtf8 name , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction #else [ "&", T.pack (show name) , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) #endif , " @", T.pack (show addr) ] {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} -- we don't need to know about TransactionId while performing query, -- so we introduce QueryFailure exceptions -- | Used to signal 'query' errors. data QueryFailure = SendFailed -- ^ unable to send query; | QueryFailed ErrorCode Text -- ^ remote node return error; | TimeoutExpired -- ^ remote node not responding. deriving (Show, Eq, Typeable) instance Exception QueryFailure #ifdef VERSION_bencoding sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr #else sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock [a] addr #endif genTransactionId :: TransactionCounter -> IO TransactionId genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) #ifdef VERSION_bencoding return $ BC.pack (show cur) #else return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') #endif -- | How many times 'query' call have been performed. getQueryCount :: MonadKRPC h m => m Int getQueryCount = do Manager {..} <- getManager curTrans <- liftIO $ readIORef transactionCounter return $ curTrans - optSeedTransaction options registerQuery :: CallId -> PendingCalls -> IO CallRes registerQuery cid ref = do ares <- newEmptyMVar atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares -- simultaneous M.lookup and M.delete guarantees that we never get two -- or more responses to the same query unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) unregisterQuery cid ref = do atomicModifyIORef' ref $ swap . M.updateLookupWithKey (const (const Nothing)) cid -- (sendmsg EINVAL) #ifdef VERSION_bencoding sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr q #else sendQuery :: Serialize a => Socket -> SockAddr -> a -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr (S.encode q) #endif where sockError :: IOError -> IO () sockError _ = throwIO SendFailed -- | Enqueue query to the given node. -- -- This function should throw 'QueryFailure' exception if quered node -- respond with @error@ message or the query timeout expires. -- query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = queryK addr params (\_ x _ -> x) -- | Like 'query' but possibly returns your externally routable IP address. query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) query' addr params = queryK addr params (const (,)) -- | Enqueue a query, but give us the complete BEncoded content sent by the -- remote Node. This is useful for handling extensions that this library does -- not otherwise support. queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x queryK addr params kont = do Manager {..} <- getManager tid <- liftIO $ genTransactionId transactionCounter let queryMethod = method :: Method a b let signature = querySignature (methodName queryMethod) tid addr $(logDebugS) "query.sending" signature mres <- liftIO $ do ares <- registerQuery (tid, addr) pendingCalls #ifdef VERSION_bencoding let q = KQuery (toBEncode params) (methodName queryMethod) tid #else let q = Tox.Message (methodName queryMethod) cli tid params cli = error "TODO TOX client node id" #endif sendQuery sock addr q `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) case res of #ifdef VERSION_bencoding Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) Right (R (KResponse {..})) -> case fromBEncode respVals of Right r -> pure $ kont raw r respIP #else Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" Right (Tox.Message {..}) -> case S.decode msgPayload of Right r -> pure $ kont raw r Nothing #endif Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) case mres of Just res -> do $(logDebugS) "query.responded" $ signature return res Nothing -> do _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls $(logWarnS) "query.not_responding" $ signature <> " for " <> T.pack (show (optQueryTimeout options)) <> " seconds" throw $ TimeoutExpired {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} -- we already throw: -- -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; -- -- * ErrorCode(ServerError) in the 'runHandler'; -- -- * ErrorCode(GenericError) in the 'runHandler' (those can be -- async exception too) -- -- so HandlerFailure should cover *only* 'ProtocolError's. -- | Used to signal protocol errors. data HandlerFailure = BadAddress -- ^ for e.g.: node calls herself; | InvalidParameter Text -- ^ for e.g.: bad session token. deriving (Show, Eq, Typeable) instance Exception HandlerFailure prettyHF :: HandlerFailure -> BS.ByteString prettyHF BadAddress = T.encodeUtf8 "bad address" prettyHF (InvalidParameter reason) = T.encodeUtf8 $ "invalid parameter: " <> reason prettyQF :: QueryFailure -> BS.ByteString prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " <> T.pack (show e) -- | Make handler from handler function. Any thrown exception will be -- supressed and send over the wire back to the querying node. -- -- If the handler make some 'query' normally it /should/ handle -- corresponding 'QueryFailure's. -- handler :: forall h a b msg raw. (KRPC a b, Applicative h, Functor msg, WireFormat raw msg, SerializableTo raw a, SerializableTo raw b) => (SockAddr -> a -> h b) -> Handler h msg raw handler body = (name, wrapper) where Method name = method :: Method a b wrapper :: SockAddr -> msg raw -> h (Either String (msg raw)) wrapper addr args = case decodePayload args of Left e -> pure $ Left e Right a -> Right . encodePayload . buildReply (error "self node-id") addr args <$> body addr (envelopePayload a) runHandler :: MonadKRPC h m #ifdef VERSION_bencoding => HandlerBody h KMessageOf BValue -> SockAddr -> KQuery -> m KResult #else => HandlerBody h KMessageOf BC.ByteString -> SockAddr -> KQuery -> m KResult #endif runHandler h addr m = Lifted.catches wrapper failbacks where signature = querySignature (queryMethod m) (queryId m) addr wrapper = do $(logDebugS) "handler.quered" signature #ifdef VERSION_bencoding result <- liftHandler (h addr (Q m)) #else result <- liftHandler (h addr m) #endif case result of Left msg -> do $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg #ifdef VERSION_bencoding return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) #else return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) #endif Right a -> do -- KQueryArgs $(logDebugS) "handler.success" signature #ifdef VERSION_bencoding return $ Right a #else let cli = error "TODO TOX client node id" messageid = error "TODO TOX message response id" -- TODO: ReflectedIP addr ?? return $ Right $ Tox.Message messageid cli (queryId m) a #endif failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do $(logDebugS) "handler.failed" signature #ifdef VERSION_bencoding return $ Left $ KError ProtocolError (prettyHF e) (queryId m) #else return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) #endif -- may happen if handler makes query and fail , E.Handler $ \ (e :: QueryFailure) -> do #ifdef VERSION_bencoding return $ Left $ KError ServerError (prettyQF e) (queryId m) #else return $ Left $ decodeError "TODO TOX ServerError" (queryId m) #endif -- since handler thread exit after sendMessage we can safely -- suppress async exception here , E.Handler $ \ (e :: SomeException) -> do #ifdef VERSION_bencoding return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) #else return $ Left $ decodeError "TODO TOX GenericError" (queryId m) #endif ] dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult dispatchHandler q addr = do Manager {..} <- getManager case L.lookup (queryMethod q) handlers of #ifdef VERSION_bencoding Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) #else Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) #endif Just h -> runHandler h addr q {----------------------------------------------------------------------- -- Listener -----------------------------------------------------------------------} -- TODO bound amount of parallel handler *threads*: -- -- peer A flooding with find_node -- peer B trying to ping peer C -- peer B fork too many threads -- ... space leak -- handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () handleQuery raw q addr = void $ fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" Manager {..} <- getManager res <- dispatchHandler q addr #ifdef VERSION_bencoding let resbe = either toBEncode toBEncode res $(logOther "q") $ T.unlines [ either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) , "==>" , either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) ] sendMessage sock addr resbe #else -- Errors not sent for Tox. either (const $ return ()) (sendMessage sock addr . S.encode) res #endif handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () handleResponse raw result addr = do Manager {..} <- getManager liftIO $ do #ifdef VERSION_bencoding let resultId = either errorId envelopeTransaction result #else let resultId = either Tox.msgNonce Tox.msgNonce result #endif mcall <- unregisterQuery (resultId, addr) pendingCalls case mcall of Nothing -> return () Just ares -> putMVar ares (raw,result) #ifdef VERSION_bencoding handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () handleMessage raw (Q q) = handleQuery raw q handleMessage raw (R r) = handleResponse raw (Right (R r)) handleMessage raw (E e) = handleResponse raw (Left e) #else handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () handleMessage raw q | Tox.isQuery q = handleQuery raw q handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) handleMessage raw e | Tox.isError e = handleResponse raw (Left e) #endif listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager fix $ \again -> do (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) #ifdef VERSION_bencoding case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of #else case return bs >>= \r -> (,) r <$> decode bs of #endif -- TODO ignore unknown messages at all? #ifdef VERSION_bencoding Left e -> liftIO $ sendMessage sock addr $ unknownMessage e #else Left _ -> return () -- TODO TOX send unknownMessage error #endif Right (raw,m) -> handleMessage raw m addr again where exceptions :: IOError -> IO (BS.ByteString, SockAddr) exceptions e -- packets with empty payload may trigger eof exception | isEOFError e = return ("", SockAddrInet 0 0) | otherwise = throwIO e -- | Should be run before any 'query', otherwise they will never -- succeed. listen :: MonadKRPC h m => m () listen = do Manager {..} <- getManager tid <- fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.listen" listener `Lifted.finally` liftIO (takeMVar listenerThread) liftIO $ putMVar listenerThread tid