From 53d384bd0028cbb54053e11b49fe0673257b7c45 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Fri, 20 Dec 2013 00:03:39 +0400 Subject: Handle transactions properly --- src/Network/KRPC/Manager.hs | 179 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 src/Network/KRPC/Manager.hs (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs new file mode 100644 index 00000000..9aa1bea7 --- /dev/null +++ b/src/Network/KRPC/Manager.hs @@ -0,0 +1,179 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE ScopedTypeVariables #-} +module Network.KRPC.Manager + ( MonadKRPC (..) + , newManager + , query + , handler + ) where + +import Control.Applicative +import Control.Arrow +import Control.Concurrent +--import Control.Exception hiding (Handler) +import Control.Exception.Lifted as Lifted hiding (Handler) +import Control.Monad +import Control.Monad.Trans.Control +import Control.Monad.IO.Class +import Data.BEncode as BE +import Data.ByteString.Char8 as BC +import Data.ByteString.Lazy as BL +import Data.IORef +import Data.List as L +import Data.Map as M +import Network.KRPC.Message +import Network.KRPC.Method +import Network.Socket +import Network.Socket.ByteString as BS + + +type KResult = Either KError KResponse + +type TransactionCounter = IORef Int +type CallId = (TransactionId, SockAddr) +type CallRes = MVar KResult +type PendingCalls = IORef (Map CallId CallRes) + +type HandlerBody m = SockAddr -> BValue -> m (BE.Result BValue) +type Handler m = (MethodName, HandlerBody m) + +data Manager m = Manager + { sock :: !Socket + , transactionCounter :: {-# UNPACK #-} !TransactionCounter + , pendingCalls :: {-# UNPACK #-} !PendingCalls + , handlers :: [Handler m] + } + +class (MonadBaseControl IO m, MonadIO m) => MonadKRPC m where + getManager :: m (Manager a) + +sockAddrFamily :: SockAddr -> Family +sockAddrFamily (SockAddrInet _ _ ) = AF_INET +sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 +sockAddrFamily (SockAddrUnix _ ) = AF_UNIX + +seedTransaction :: Int +seedTransaction = 0 + +newManager :: SockAddr -> IO (Manager a) +newManager servAddr = do + sock <- bindServ + tran <- newIORef seedTransaction + calls <- newIORef M.empty + return $ Manager sock tran calls [] + where + bindServ = do + let family = sockAddrFamily servAddr + sock <- socket family Datagram defaultProtocol + when (family == AF_INET6) $ do + setSocketOption sock IPv6Only 0 + bindSocket sock servAddr + return sock + +sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () +sendMessage sock addr a = + liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr + +{----------------------------------------------------------------------- +-- Client +-----------------------------------------------------------------------} + +genTransactionId :: TransactionCounter -> IO TransactionId +genTransactionId ref = do + cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) + return $ BC.pack (show cur) + +registerQuery :: CallId -> PendingCalls -> IO CallRes +registerQuery cid ref = do + ares <- newEmptyMVar + atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) + return ares + +unregisterQuery :: CallId -> PendingCalls -> IO () +unregisterQuery cid ref = do + atomicModifyIORef' ref $ \ m -> (M.delete cid m, ()) + +queryResponse :: BEncode a => CallRes -> IO a +queryResponse ares = do + res <- readMVar ares + case res of + Left e -> throwIO e + Right (KResponse {..}) -> + case fromBEncode respVals of + Left e -> throwIO (KError ProtocolError (BC.pack e) respId) + Right a -> return a + +query :: forall m a b. (MonadKRPC m, KRPC a b) => SockAddr -> a -> m b +query addr params = do + Manager {..} <- getManager + liftIO $ do + tid <- genTransactionId transactionCounter + let Method name = method :: Method a b + let q = KQuery (toBEncode params) name tid + ares <- registerQuery (tid, addr) pendingCalls + sendMessage sock addr q + `onException` unregisterQuery (tid, addr) pendingCalls + queryResponse ares + +{----------------------------------------------------------------------- +-- Handlers +-----------------------------------------------------------------------} + +handler :: forall m a b. (KRPC a b, MonadKRPC m) + => (SockAddr -> a -> m b) -> Handler m +handler body = (name, wrapper) + where + Method name = method :: Method a b + wrapper addr args = + case fromBEncode args of + Left e -> return $ Left e + Right a -> (Right . toBEncode) <$> body addr a + +runHandler :: MonadKRPC m => HandlerBody m -> SockAddr -> KQuery -> m KResult +runHandler handler addr KQuery {..} = wrapper `Lifted.catch` failback + where + wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId)) + <$> handler addr queryArgs + failback e = return $ Left $ serverError e queryId + +dispatchHandler :: MonadKRPC m => KQuery -> SockAddr -> m KResult +dispatchHandler q @ KQuery {..} addr = do + Manager {..} <- getManager + case L.lookup queryMethod handlers of + Nothing -> return $ Left $ unknownMethod queryMethod queryId + Just handler -> runHandler handler addr q + +{----------------------------------------------------------------------- +-- Listener +-----------------------------------------------------------------------} + +handleQuery :: MonadKRPC m => KQuery -> SockAddr -> m () +handleQuery q addr = do + Manager {..} <- getManager + res <- dispatchHandler q addr + sendMessage sock addr $ either toBEncode toBEncode res + +handleResponse :: MonadKRPC m => KResult -> SockAddr -> m () +handleResponse result addr = do + Manager {..} <- getManager + mcall <- undefined (addr, respId) pendingCalls + case mcall of + Nothing -> return () + Just ares -> liftIO $ putMVar ares result + +handleMessage :: MonadKRPC m => KMessage -> SockAddr -> m () +handleMessage (Q q) = handleQuery q +handleMessage (R r) = handleResponse (Right r) +handleMessage (E e) = handleResponse (Left e) + +maxMsgSize :: Int +maxMsgSize = 64 * 1024 + +listener :: MonadKRPC m => m () +listener = do + Manager {..} <- getManager + forever $ do + (bs, addr) <- liftIO $ BS.recvFrom sock maxMsgSize + case BE.decode bs of + Left e -> liftIO $ sendMessage sock addr $ unknownMessage e + Right m -> handleMessage m addr -- cgit v1.2.3 From 3cbfda28704a6963baf8bcd919826b0ae67b2a5a Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sat, 21 Dec 2013 01:25:33 +0400 Subject: Separate KRPC monad from Handler monad --- krpc.cabal | 1 + src/Network/KRPC.hs | 14 ++---- src/Network/KRPC/Manager.hs | 105 +++++++++++++++++++++++++++++--------------- 3 files changed, 74 insertions(+), 46 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/krpc.cabal b/krpc.cabal index bccdd6c3..f80b462c 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -45,6 +45,7 @@ library , bytestring >= 0.10 , lifted-base >= 0.1.1 , transformers >= 0.2 + , mtl , monad-control >= 0.3 , bencoding >= 0.4.3 , network >= 2.3 diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 09d1c5b2..6809a330 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -86,16 +86,6 @@ -- -- For protocol details see 'Remote.KRPC.Protocol' module. -- -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE ExplicitForAll #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE FunctionalDependencies #-} module Network.KRPC ( -- * Methods Method @@ -103,10 +93,12 @@ module Network.KRPC -- * RPC , handler + , listen , query -- * Manager , MonadKRPC (..) + , Manager , newManager -- , closeManager @@ -116,4 +108,4 @@ module Network.KRPC import Network.KRPC.Message import Network.KRPC.Method -import Network.KRPC.Manager \ No newline at end of file +import Network.KRPC.Manager diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 9aa1bea7..64b0dd62 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -1,29 +1,40 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DefaultSignatures #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE FlexibleInstances #-} module Network.KRPC.Manager ( MonadKRPC (..) + , Manager , newManager + , closeManager , query + , handler + , listener + , listen ) where import Control.Applicative import Control.Arrow import Control.Concurrent ---import Control.Exception hiding (Handler) -import Control.Exception.Lifted as Lifted hiding (Handler) +import Control.Concurrent.Lifted (fork) +import Control.Exception hiding (Handler) +import Control.Exception.Lifted as Lifted (catch) import Control.Monad +import Control.Monad.Reader import Control.Monad.Trans.Control -import Control.Monad.IO.Class import Data.BEncode as BE import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.IORef import Data.List as L import Data.Map as M +import Data.Tuple import Network.KRPC.Message import Network.KRPC.Method -import Network.Socket +import Network.Socket hiding (listen) import Network.Socket.ByteString as BS @@ -34,18 +45,27 @@ type CallId = (TransactionId, SockAddr) type CallRes = MVar KResult type PendingCalls = IORef (Map CallId CallRes) -type HandlerBody m = SockAddr -> BValue -> m (BE.Result BValue) -type Handler m = (MethodName, HandlerBody m) +type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) +type Handler h = (MethodName, HandlerBody h) -data Manager m = Manager +data Manager h = Manager { sock :: !Socket , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls - , handlers :: [Handler m] + , handlers :: [Handler h] } -class (MonadBaseControl IO m, MonadIO m) => MonadKRPC m where - getManager :: m (Manager a) +class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where + getManager :: m (Manager h) + + default getManager :: MonadReader (Manager h) m => m (Manager h) + getManager = ask + + liftHandler :: h a -> m a + +instance (MonadBaseControl IO h, MonadIO h) + => MonadKRPC h (ReaderT (Manager h) h) where + liftHandler = lift sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET @@ -55,12 +75,12 @@ sockAddrFamily (SockAddrUnix _ ) = AF_UNIX seedTransaction :: Int seedTransaction = 0 -newManager :: SockAddr -> IO (Manager a) -newManager servAddr = do +newManager :: SockAddr -> [Handler h] -> IO (Manager h) +newManager servAddr handlers = do sock <- bindServ tran <- newIORef seedTransaction calls <- newIORef M.empty - return $ Manager sock tran calls [] + return $ Manager sock tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -70,8 +90,14 @@ newManager servAddr = do bindSocket sock servAddr return sock +-- | Unblock all pending calls and close socket. +closeManager :: Manager m -> IO () +closeManager Manager {..} = do + -- TODO unblock calls + close sock + sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () -sendMessage sock addr a = +sendMessage sock addr a = do liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr {----------------------------------------------------------------------- @@ -89,9 +115,10 @@ registerQuery cid ref = do atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares -unregisterQuery :: CallId -> PendingCalls -> IO () +unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) unregisterQuery cid ref = do - atomicModifyIORef' ref $ \ m -> (M.delete cid m, ()) + atomicModifyIORef' ref $ swap . + M.updateLookupWithKey (const (const Nothing)) cid queryResponse :: BEncode a => CallRes -> IO a queryResponse ares = do @@ -103,7 +130,7 @@ queryResponse ares = do Left e -> throwIO (KError ProtocolError (BC.pack e) respId) Right a -> return a -query :: forall m a b. (MonadKRPC m, KRPC a b) => SockAddr -> a -> m b +query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do Manager {..} <- getManager liftIO $ do @@ -113,55 +140,60 @@ query addr params = do ares <- registerQuery (tid, addr) pendingCalls sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - queryResponse ares + res <- queryResponse ares + return res {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} -handler :: forall m a b. (KRPC a b, MonadKRPC m) - => (SockAddr -> a -> m b) -> Handler m +handler :: forall h a b. (KRPC a b, Monad h) + => (SockAddr -> a -> h b) -> Handler h handler body = (name, wrapper) where Method name = method :: Method a b wrapper addr args = case fromBEncode args of Left e -> return $ Left e - Right a -> (Right . toBEncode) <$> body addr a + Right a -> do + r <- body addr a + return $ Right $ toBEncode r -runHandler :: MonadKRPC m => HandlerBody m -> SockAddr -> KQuery -> m KResult -runHandler handler addr KQuery {..} = wrapper `Lifted.catch` failback +runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult +runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback where wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId)) - <$> handler addr queryArgs + <$> liftHandler (h addr queryArgs) failback e = return $ Left $ serverError e queryId -dispatchHandler :: MonadKRPC m => KQuery -> SockAddr -> m KResult +dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult dispatchHandler q @ KQuery {..} addr = do Manager {..} <- getManager case L.lookup queryMethod handlers of - Nothing -> return $ Left $ unknownMethod queryMethod queryId - Just handler -> runHandler handler addr q + Nothing -> return $ Left $ unknownMethod queryMethod queryId + Just h -> runHandler h addr q {----------------------------------------------------------------------- -- Listener -----------------------------------------------------------------------} -handleQuery :: MonadKRPC m => KQuery -> SockAddr -> m () +handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m () handleQuery q addr = do Manager {..} <- getManager res <- dispatchHandler q addr sendMessage sock addr $ either toBEncode toBEncode res -handleResponse :: MonadKRPC m => KResult -> SockAddr -> m () +handleResponse :: MonadKRPC h m => KResult -> SockAddr -> m () handleResponse result addr = do Manager {..} <- getManager - mcall <- undefined (addr, respId) pendingCalls - case mcall of - Nothing -> return () - Just ares -> liftIO $ putMVar ares result + liftIO $ do + let resultId = either errorId respId result + mcall <- unregisterQuery (resultId, addr) pendingCalls + case mcall of + Nothing -> return () + Just ares -> putMVar ares result -handleMessage :: MonadKRPC m => KMessage -> SockAddr -> m () +handleMessage :: MonadKRPC h m => KMessage -> SockAddr -> m () handleMessage (Q q) = handleQuery q handleMessage (R r) = handleResponse (Right r) handleMessage (E e) = handleResponse (Left e) @@ -169,7 +201,7 @@ handleMessage (E e) = handleResponse (Left e) maxMsgSize :: Int maxMsgSize = 64 * 1024 -listener :: MonadKRPC m => m () +listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager forever $ do @@ -177,3 +209,6 @@ listener = do case BE.decode bs of Left e -> liftIO $ sendMessage sock addr $ unknownMessage e Right m -> handleMessage m addr + +listen :: MonadKRPC h m => m ThreadId +listen = fork $ listener -- cgit v1.2.3 From a6358e332e8b2820541fd75a426d92428c27c58f Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 22 Dec 2013 03:09:01 +0400 Subject: Expose Handler type synonym --- src/Network/KRPC.hs | 3 ++- src/Network/KRPC/Manager.hs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 6809a330..e10fcb58 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -92,6 +92,7 @@ module Network.KRPC , KRPC (..) -- * RPC + , Handler , handler , listen , query @@ -100,7 +101,7 @@ module Network.KRPC , MonadKRPC (..) , Manager , newManager --- , closeManager + , closeManager -- * Exceptions , KError (..) diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 64b0dd62..4a3dc93f 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -11,6 +11,7 @@ module Network.KRPC.Manager , closeManager , query + , Handler , handler , listener , listen -- cgit v1.2.3 From c61a5412e2ca22f6da783182261fbb0d7e8d9217 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 22 Dec 2013 04:51:06 +0400 Subject: Throw exception if query takes too long --- src/Network/KRPC/Manager.hs | 31 +++++++++++++++++++++---------- src/Network/KRPC/Message.hs | 14 +++++++++++++- 2 files changed, 34 insertions(+), 11 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4a3dc93f..0b090e6b 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -37,6 +37,7 @@ import Network.KRPC.Message import Network.KRPC.Method import Network.Socket hiding (listen) import Network.Socket.ByteString as BS +import System.Timeout type KResult = Either KError KResponse @@ -51,6 +52,7 @@ type Handler h = (MethodName, HandlerBody h) data Manager h = Manager { sock :: !Socket + , queryTimeout :: !Int -- ^ in seconds , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls , handlers :: [Handler h] @@ -76,12 +78,15 @@ sockAddrFamily (SockAddrUnix _ ) = AF_UNIX seedTransaction :: Int seedTransaction = 0 +defaultQueryTimeout :: Int +defaultQueryTimeout = 10 + newManager :: SockAddr -> [Handler h] -> IO (Manager h) newManager servAddr handlers = do sock <- bindServ tran <- newIORef seedTransaction calls <- newIORef M.empty - return $ Manager sock tran calls handlers + return $ Manager sock defaultQueryTimeout tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -116,6 +121,8 @@ registerQuery cid ref = do atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares +-- simultaneous M.lookup and M.delete guarantees that we never get two +-- or more responses to the same query unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) unregisterQuery cid ref = do atomicModifyIORef' ref $ swap . @@ -123,13 +130,11 @@ unregisterQuery cid ref = do queryResponse :: BEncode a => CallRes -> IO a queryResponse ares = do - res <- readMVar ares - case res of - Left e -> throwIO e - Right (KResponse {..}) -> - case fromBEncode respVals of - Left e -> throwIO (KError ProtocolError (BC.pack e) respId) - Right a -> return a + res <- readMVar ares + KResponse {..} <- either throwIO pure res + case fromBEncode respVals of + Right r -> pure r + Left e -> throwIO $ decodeError e respId query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do @@ -138,11 +143,17 @@ query addr params = do tid <- genTransactionId transactionCounter let Method name = method :: Method a b let q = KQuery (toBEncode params) name tid + ares <- registerQuery (tid, addr) pendingCalls sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - res <- queryResponse ares - return res + + mres <- timeout (queryTimeout * 10 ^ 6) $ queryResponse ares + case mres of + Just res -> return res + Nothing -> do + unregisterQuery (tid, addr) pendingCalls + throwIO $ timeoutExpired tid {----------------------------------------------------------------------- -- Handlers diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs index 3bbfb1db..0bd34400 100644 --- a/src/Network/KRPC/Message.hs +++ b/src/Network/KRPC/Message.hs @@ -30,6 +30,7 @@ module Network.KRPC.Message , decodeError , unknownMethod , unknownMessage + , timeoutExpired -- * Query , KQuery(..) @@ -130,17 +131,28 @@ instance BEncode KError where instance Exception KError +-- | Happen when some handler fail. serverError :: SomeException -> TransactionId -> KError serverError e = KError ServerError (BC.pack (show e)) +-- | Received 'queryArgs' or 'respVals' can not be decoded. decodeError :: String -> TransactionId -> KError decodeError msg = KError ProtocolError (BC.pack msg) +-- | If /remote/ node send query /this/ node doesn't know about then +-- this error message should be sent in response. unknownMethod :: MethodName -> TransactionId -> KError unknownMethod = KError MethodUnknown +-- | A remote node has send some 'KMessage' this node is unable to +-- decode. unknownMessage :: String -> KError -unknownMessage msg = KError ProtocolError (BC.pack msg) "" +unknownMessage msg = KError ProtocolError (BC.pack msg) unknownTransaction + +-- | A /remote/ node is not responding to the /our/ request the for +-- specified period of time. +timeoutExpired :: TransactionId -> KError +timeoutExpired = KError GenericError "timeout expired" {----------------------------------------------------------------------- -- Query messages -- cgit v1.2.3 From ca59e5cfac34d8a59203e91fdd9dd432f537c346 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 22 Dec 2013 04:56:48 +0400 Subject: Fix warnings --- src/Network/KRPC/Manager.hs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 0b090e6b..c63967d0 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -148,11 +148,13 @@ query addr params = do sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - mres <- timeout (queryTimeout * 10 ^ 6) $ queryResponse ares + mres <- timeout (queryTimeout * 10 ^ (6 :: Int)) $ do + queryResponse ares + case mres of Just res -> return res Nothing -> do - unregisterQuery (tid, addr) pendingCalls + _ <- unregisterQuery (tid, addr) pendingCalls throwIO $ timeoutExpired tid {----------------------------------------------------------------------- -- cgit v1.2.3 From ce8a1546bdcfbbb7c45407e3811cafc99d667ee1 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 22 Dec 2013 05:08:51 +0400 Subject: Run each handler in separate thread. This is needed because handler can call query too. The minimal example: * listener received KQuery(1); * listener dispatch corresponding handler; * handler send KQuery(2); * handler blocked waiting for response; * listener is unable to receive KQuery(2) because it is blocked on handler. So we should run each handler in separated thread otherwise dead lock can happen. --- src/Network/KRPC/Manager.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index c63967d0..084a8d8d 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -192,7 +192,7 @@ dispatchHandler q @ KQuery {..} addr = do -----------------------------------------------------------------------} handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m () -handleQuery q addr = do +handleQuery q addr = void $ fork $ do Manager {..} <- getManager res <- dispatchHandler q addr sendMessage sock addr $ either toBEncode toBEncode res -- cgit v1.2.3 From c0377edb380e49be5bd2d1cdb3c5a7dc612b57b5 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sun, 22 Dec 2013 05:16:05 +0400 Subject: More permissive default query timeout --- src/Network/KRPC/Manager.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 084a8d8d..304f43f2 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -79,7 +79,7 @@ seedTransaction :: Int seedTransaction = 0 defaultQueryTimeout :: Int -defaultQueryTimeout = 10 +defaultQueryTimeout = 120 newManager :: SockAddr -> [Handler h] -> IO (Manager h) newManager servAddr handlers = do -- cgit v1.2.3 From 46b6ba10202b73ba413d18bd21a284e3897c12b0 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 24 Dec 2013 23:50:23 +0400 Subject: Update tests --- krpc.cabal | 54 +++++++-------------------- src/Network/KRPC.hs | 4 +- src/Network/KRPC/Manager.hs | 12 ++++++ tests/Client.hs | 80 ---------------------------------------- tests/Network/KRPC/MethodSpec.hs | 52 ++++++++++++++++++++++++++ tests/Network/KRPCSpec.hs | 33 +++++++++++++++++ tests/Server.hs | 20 ---------- tests/Shared.hs | 39 -------------------- 8 files changed, 114 insertions(+), 180 deletions(-) delete mode 100644 tests/Client.hs create mode 100644 tests/Network/KRPC/MethodSpec.hs create mode 100644 tests/Network/KRPCSpec.hs delete mode 100644 tests/Server.hs delete mode 100644 tests/Shared.hs (limited to 'src/Network/KRPC/Manager.hs') diff --git a/krpc.cabal b/krpc.cabal index fb7b01fe..c4c0ae10 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -64,8 +64,13 @@ test-suite spec default-language: Haskell2010 hs-source-dirs: tests main-is: Spec.hs + other-modules: Network.KRPCSpec + Network.KRPC.MethodSpec + Network.KRPC.MessageSpec build-depends: base == 4.* , bytestring + , network + , mtl , hspec , QuickCheck @@ -74,55 +79,24 @@ test-suite spec , bencoding , krpc ---test-suite test-client --- type: exitcode-stdio-1.0 --- default-language: Haskell2010 --- hs-source-dirs: tests --- main-is: Client.hs --- other-modules: Shared --- build-depends: base == 4.* --- , bytestring --- , process --- , filepath --- --- , bencoding --- , krpc --- , network --- --- , HUnit --- , test-framework --- , test-framework-hunit - - ---executable test-server +--executable bench-server -- default-language: Haskell2010 --- hs-source-dirs: tests +-- hs-source-dirs: bench -- main-is: Server.hs --- other-modules: Shared -- build-depends: base == 4.* -- , bytestring --- , bencoding -- , krpc -- , network +-- ghc-options: -fforce-recomp ---executable bench-server +--benchmark bench-client +-- type: exitcode-stdio-1.0 -- default-language: Haskell2010 -- hs-source-dirs: bench --- main-is: Server.hs --- build-depends: base == 4.* +-- main-is: Main.hs +-- build-depends: base == 4.* -- , bytestring +-- , criterion -- , krpc -- , network --- ghc-options: -fforce-recomp - -benchmark bench-client - type: exitcode-stdio-1.0 - default-language: Haskell2010 - hs-source-dirs: bench - main-is: Main.hs - build-depends: base == 4.* - , bytestring - , criterion - , krpc - , network - ghc-options: -O2 -fforce-recomp \ No newline at end of file +-- ghc-options: -O2 -fforce-recomp \ No newline at end of file diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index e10fcb58..10d2eb55 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -94,7 +94,6 @@ module Network.KRPC -- * RPC , Handler , handler - , listen , query -- * Manager @@ -102,9 +101,12 @@ module Network.KRPC , Manager , newManager , closeManager + , withManager + , listen -- * Exceptions , KError (..) + , ErrorCode (..) ) where import Network.KRPC.Message diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 304f43f2..9d8688d3 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -9,6 +9,8 @@ module Network.KRPC.Manager , Manager , newManager , closeManager + , withManager + , query , Handler @@ -102,6 +104,9 @@ closeManager Manager {..} = do -- TODO unblock calls close sock +withManager :: SockAddr -> [Handler h] -> (Manager h -> IO a) -> IO a +withManager addr hs = bracket (newManager addr hs) closeManager + sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr @@ -136,6 +141,11 @@ queryResponse ares = do Right r -> pure r Left e -> throwIO $ decodeError e respId +-- | +-- +-- This function will throw exception if quered node respond with +-- @error@ message or timeout expires. +-- query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do Manager {..} <- getManager @@ -161,6 +171,8 @@ query addr params = do -- Handlers -----------------------------------------------------------------------} +-- | Any thrown exception will be supressed and send over wire back to +-- the quering node. handler :: forall h a b. (KRPC a b, Monad h) => (SockAddr -> a -> h b) -> Handler h handler body = (name, wrapper) diff --git a/tests/Client.hs b/tests/Client.hs deleted file mode 100644 index 2b49bd82..00000000 --- a/tests/Client.hs +++ /dev/null @@ -1,80 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} -module Main (main) where - -import Control.Concurrent -import Control.Exception -import qualified Data.ByteString as B -import Data.BEncode as BE -import Data.BEncode.BDict as BE -import System.Process -import System.FilePath - -import Test.HUnit hiding (Test) -import Test.Framework -import Test.Framework.Providers.HUnit - -import Network.KRPC -import Network.Socket -import Shared - - -addr :: SockAddr -addr = SockAddrInet 6000 0 - -withServ :: FilePath -> IO () -> IO () -withServ serv_path = bracket up terminateProcess . const - where - up = do - (_, _, _, h) <- createProcess (proc serv_path []) - threadDelay 1000000 - return h - -main :: IO () -main = do - let serv_path = "dist" "build" "test-server" "test-server" - withServ serv_path $ - defaultMain tests - - -(==?) :: (Eq a, Show a) => a -> IO a -> Assertion -expected ==? action = do - actual <- action - expected @=? actual - -tests :: [Test] -tests = - [ testCase "unit" $ - () ==? call addr unitM () - - , testCase "echo int" $ - 1234 ==? call addr echoM 1234 - - , testCase "reverse 1..100" $ - reverse [1..100] ==? call addr reverseM [1..100] - - , testCase "reverse empty list" $ - reverse [] ==? call addr reverseM [] - - , testCase "reverse singleton list" $ - reverse [1] ==? call addr reverseM [1] - - , testCase "swap pair" $ - (1, 0) ==? call addr swapM (0, 1) - - , testCase "shift triple" $ - ([2..10], (), 1) ==? call addr shiftR ((), 1, [2..10]) - - , testCase "echo bytestring" $ - let bs = B.replicate 400 0 in - bs ==? call addr echoBytes bs - - , testCase "raw method" $ - BInteger 10 ==? call addr rawM (BInteger 10) - - , testCase "raw dict" $ - let dict = BDict $ BE.fromAscList - [ ("some_int", BInteger 100) - , ("some_list", BList [BInteger 10]) - ] - in dict ==? call addr rawDictM dict - ] diff --git a/tests/Network/KRPC/MethodSpec.hs b/tests/Network/KRPC/MethodSpec.hs new file mode 100644 index 00000000..c1c58282 --- /dev/null +++ b/tests/Network/KRPC/MethodSpec.hs @@ -0,0 +1,52 @@ +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# OPTIONS_GHC -fno-warn-orphans #-} +module Network.KRPC.MethodSpec where +import Control.Applicative +import Data.BEncode +import Data.ByteString as BS +import Data.Typeable +import Network.KRPC +import Test.Hspec + + +data Ping = Ping + deriving (Show, Eq, Typeable) + +instance BEncode Ping where + toBEncode Ping = toBEncode () + fromBEncode b = Ping <$ (fromBEncode b :: Result ()) + +instance KRPC Ping Ping + +ping :: Monad h => Handler h +ping = handler $ \ _ Ping -> return Ping + +newtype Echo a = Echo a + deriving (Show, Eq, BEncode, Typeable) + +echo :: Monad h => Handler h +echo = handler $ \ _ (Echo a) -> return (Echo (a :: ByteString)) + +instance (Typeable a, BEncode a) => KRPC (Echo a) (Echo a) + +spec :: Spec +spec = do + describe "ping method" $ do + it "name is ping" $ do + (method :: Method Ping Ping) `shouldBe` "ping" + + it "has pretty Show instance" $ do + show (method :: Method Ping Ping) `shouldBe` "ping :: Ping -> Ping" + + describe "echo method" $ do + it "is overloadable" $ do + (method :: Method (Echo Int ) (Echo Int )) `shouldBe` "echo int" + (method :: Method (Echo Bool) (Echo Bool)) `shouldBe` "echo bool" + + it "has pretty Show instance" $ do + show (method :: Method (Echo Int) (Echo Int)) + `shouldBe` "echo int :: Echo Int -> Echo Int" \ No newline at end of file diff --git a/tests/Network/KRPCSpec.hs b/tests/Network/KRPCSpec.hs new file mode 100644 index 00000000..27148682 --- /dev/null +++ b/tests/Network/KRPCSpec.hs @@ -0,0 +1,33 @@ +{-# LANGUAGE OverloadedStrings #-} +module Network.KRPCSpec (spec) where +import Control.Monad.Reader +import Network.Socket (SockAddr (..)) +import Network.KRPC +import Network.KRPC.MethodSpec hiding (spec) +import Test.Hspec + +servAddr :: SockAddr +servAddr = SockAddrInet 6000 (256 * 256 * 256 + 127) + +handlers :: [Handler IO] +handlers = + [ handler $ \ _ Ping -> return Ping + , handler $ \ _ (Echo a) -> return (Echo (a :: Bool)) + , handler $ \ _ (Echo a) -> return (Echo (a :: Int)) + ] + +spec :: Spec +spec = do + describe "query" $ do + it "run handlers" $ do + let int = 0xabcd :: Int + (withManager servAddr handlers $ runReaderT $ do + listen + query servAddr (Echo int)) + `shouldReturn` Echo int + + it "throw timeout exception" $ do + (withManager servAddr handlers $ runReaderT $ do + query servAddr (Echo (0xabcd :: Int)) + ) + `shouldThrow` (== KError GenericError "timeout expired" "0") diff --git a/tests/Server.hs b/tests/Server.hs deleted file mode 100644 index b4b34891..00000000 --- a/tests/Server.hs +++ /dev/null @@ -1,20 +0,0 @@ -{-# LANGUAGE IncoherentInstances #-} -module Main (main) where - -import Data.BEncode -import Network.KRPC -import Network.Socket -import Shared - - -main :: IO () -main = server (SockAddrInet 6000 0) - [ unitM ==> return - , echoM ==> return - , echoBytes ==> return - , swapM ==> \(a, b) -> return (b, a) - , reverseM ==> return . reverse - , shiftR ==> \(a, b, c) -> return (c, a, b) - , rawM ==> return - , rawDictM ==> return - ] diff --git a/tests/Shared.hs b/tests/Shared.hs deleted file mode 100644 index 16547644..00000000 --- a/tests/Shared.hs +++ /dev/null @@ -1,39 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} -module Shared - ( echoM - , echoBytes - , unitM - , swapM - , reverseM - , shiftR - , rawM - , rawDictM - ) where - -import Data.ByteString (ByteString) -import Data.BEncode -import Network.KRPC - -unitM :: Method () () -unitM = method "unit" [] [] - -echoM :: Method Int Int -echoM = method "echo" ["x"] ["x"] - -echoBytes :: Method ByteString ByteString -echoBytes = method "echoBytes" ["x"] ["x"] - -reverseM :: Method [Int] [Int] -reverseM = method "reverse" ["xs"] ["ys"] - -swapM :: Method (Int, Int) (Int, Int) -swapM = method "swap" ["x", "y"] ["b", "a"] - -shiftR :: Method ((), Int, [Int]) ([Int], (), Int) -shiftR = method "shiftR" ["x", "y", "z"] ["a", "b", "c"] - -rawM :: Method BValue BValue -rawM = method "rawM" [""] [""] - -rawDictM :: Method BValue BValue -rawDictM = method "m" [] [] \ No newline at end of file -- cgit v1.2.3 From d59901591644413e8ff298c83242bd7d8b15d3e9 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 25 Dec 2013 01:44:41 +0400 Subject: Kill listener thread at exit --- src/Network/KRPC/Manager.hs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 9d8688d3..a8c90b33 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -55,6 +55,7 @@ type Handler h = (MethodName, HandlerBody h) data Manager h = Manager { sock :: !Socket , queryTimeout :: !Int -- ^ in seconds + , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls , handlers :: [Handler h] @@ -68,6 +69,9 @@ class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where liftHandler :: h a -> m a + default liftHandler :: m a -> m a + liftHandler = id + instance (MonadBaseControl IO h, MonadIO h) => MonadKRPC h (ReaderT (Manager h) h) where liftHandler = lift @@ -86,9 +90,10 @@ defaultQueryTimeout = 120 newManager :: SockAddr -> [Handler h] -> IO (Manager h) newManager servAddr handlers = do sock <- bindServ + tref <- newEmptyMVar tran <- newIORef seedTransaction calls <- newIORef M.empty - return $ Manager sock defaultQueryTimeout tran calls handlers + return $ Manager sock defaultQueryTimeout tref tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -101,9 +106,11 @@ newManager servAddr handlers = do -- | Unblock all pending calls and close socket. closeManager :: Manager m -> IO () closeManager Manager {..} = do + maybe (return ()) killThread =<< tryTakeMVar listenerThread -- TODO unblock calls close sock +-- | Normally you should use Control.Monad.Trans.allocate function. withManager :: SockAddr -> [Handler h] -> (Manager h -> IO a) -> IO a withManager addr hs = bracket (newManager addr hs) closeManager @@ -236,5 +243,10 @@ listener = do Left e -> liftIO $ sendMessage sock addr $ unknownMessage e Right m -> handleMessage m addr -listen :: MonadKRPC h m => m ThreadId -listen = fork $ listener +-- | Should be run before any 'query', otherwise they will never +-- succeed. +listen :: MonadKRPC h m => m () +listen = do + Manager {..} <- getManager + tid <- fork $ listener + liftIO $ putMVar listenerThread tid -- cgit v1.2.3 From f0a5eec0f054f3c7e26b76a74c50a7c1f79c1e97 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 25 Dec 2013 04:23:56 +0400 Subject: Update documentation --- src/Network/KRPC.hs | 52 +++++++-------------------------------------- src/Network/KRPC/Manager.hs | 31 +++++++++++++++++++-------- 2 files changed, 30 insertions(+), 53 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 286c063e..a1767161 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -6,8 +6,8 @@ -- Portability : portable -- -- This module provides safe remote procedure call. One important --- point is exceptions and errors, so be able handle them properly --- we need to investigate a bit about how this all works. +-- point is exceptions and errors, so to be able handle them +-- properly we need to investigate a bit about how this all works. -- Internally, in order to make method invokation KRPC makes the -- following steps: -- @@ -41,50 +41,14 @@ -- * Caller extracts results and finally return results of the -- procedure call as ordinary haskell values. -- --- If every other error occurred caller get the 'GenericError'. All --- errors returned by callee are throwed as ordinary haskell --- exceptions at caller side. Make sure that both callee and caller --- uses the same method signatures and everything should be ok: this --- KRPC implementation provides some level of safety through --- types. Also note that both caller and callee use plain UDP, so --- KRPC is unreliable. +-- If every other error occurred then caller get the +-- 'GenericError'. All errors returned by callee are throwed as +-- ordinary haskell exceptions at caller side. Also note that both +-- caller and callee use plain UDP, so KRPC is unreliable. -- --- Consider one tiny example. From now @caller = client@ and --- @callee = server or remote@. +-- For async 'query' use @async@ package. -- --- Somewhere we have to define all procedure signatures. Imagine --- that this is a library shared between client and server: --- --- > factorialMethod :: Method Int Int --- > factorialMethod = method "factorial" ["x"] ["y"] --- --- Otherwise you can define this code in both client and server of --- course. But in this case you might get into troubles: you can get --- 'MethodUnknown' or 'ProtocolError' if name or type of method --- will mismatch after not synced changes in client or server code. --- --- Now let's define our client-side: --- --- > main = withRemote $ \remote -> do --- > result <- call remote (0, 6000) factorialMethod 4 --- > assert (result == 24) $ print "Success!" --- --- It basically open socket with 'withRemote' and make all the other --- steps in 'call' as describe above. And finally our server-side: --- --- > factorialImpl :: Int -> Int --- > factorialImpl n = product [1..n] --- > --- > main = runServer [factorialMethod $ return . factorialImpl] --- --- Here we implement method signature from that shared lib and run --- server with runServer by passing method table in. --- --- For async API use /async/ package, old API have been removed. --- --- For more examples see @exsamples@ or @tests@ directories. --- --- For protocol details see 'Remote.KRPC.Protocol' module. +-- For protocol details see "Network.KRPC.Message" module. -- module Network.KRPC ( -- * Methods diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index a8c90b33..a883a34a 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -50,8 +50,13 @@ type CallRes = MVar KResult type PendingCalls = IORef (Map CallId CallRes) type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) + +-- | Handler is a function which will be invoked then some /remote/ +-- node querying /this/ node. type Handler h = (MethodName, HandlerBody h) +-- | Keep track pending queries made by /this/ node and handle queries +-- made by /remote/ nodes. data Manager h = Manager { sock :: !Socket , queryTimeout :: !Int -- ^ in seconds @@ -61,12 +66,15 @@ data Manager h = Manager , handlers :: [Handler h] } +-- | A monad which can perform or handle queries. class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where + -- | Ask for manager. getManager :: m (Manager h) default getManager :: MonadReader (Manager h) m => m (Manager h) getManager = ask + -- | Can be used to add logging for instance. liftHandler :: h a -> m a default liftHandler :: m a -> m a @@ -87,7 +95,11 @@ seedTransaction = 0 defaultQueryTimeout :: Int defaultQueryTimeout = 120 -newManager :: SockAddr -> [Handler h] -> IO (Manager h) +-- | Bind socket to the specified address. To enable query handling +-- run 'listen'. +newManager :: SockAddr -- ^ address to listen on; + -> [Handler h] -- ^ handlers to run on incoming queries. + -> IO (Manager h) -- ^ new manager. newManager servAddr handlers = do sock <- bindServ tref <- newEmptyMVar @@ -110,18 +122,19 @@ closeManager Manager {..} = do -- TODO unblock calls close sock --- | Normally you should use Control.Monad.Trans.allocate function. +-- | Normally you should use Control.Monad.Trans.Resource.allocate +-- function. withManager :: SockAddr -> [Handler h] -> (Manager h -> IO a) -> IO a withManager addr hs = bracket (newManager addr hs) closeManager -sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () -sendMessage sock addr a = do - liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr - {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} +sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () +sendMessage sock addr a = do + liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr + genTransactionId :: TransactionCounter -> IO TransactionId genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) @@ -148,7 +161,7 @@ queryResponse ares = do Right r -> pure r Left e -> throwIO $ decodeError e respId --- | +-- | Enqueue query to the given node. -- -- This function will throw exception if quered node respond with -- @error@ message or timeout expires. @@ -178,8 +191,8 @@ query addr params = do -- Handlers -----------------------------------------------------------------------} --- | Any thrown exception will be supressed and send over wire back to --- the quering node. +-- | Make handler from handler function. Any thrown exception will be +-- supressed and send over the wire back to the querying node. handler :: forall h a b. (KRPC a b, Monad h) => (SockAddr -> a -> h b) -> Handler h handler body = (name, wrapper) -- cgit v1.2.3 From 9a9a7d5750e24ee0810006f3dd2a7e7879b521e2 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 25 Dec 2013 05:13:38 +0400 Subject: Prettify documentation a bit --- src/Network/KRPC/Manager.hs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index a883a34a..6bc448c6 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -1,3 +1,12 @@ +-- | +-- Copyright : (c) Sam Truzjan 2013 +-- License : BSD3 +-- Maintainer : pxqr.sta@gmail.com +-- Stability : experimental +-- Portability : portable +-- +-- Normally, you don't need to import this module. +-- {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DefaultSignatures #-} @@ -5,18 +14,18 @@ {-# LANGUAGE FunctionalDependencies #-} {-# LANGUAGE FlexibleInstances #-} module Network.KRPC.Manager - ( MonadKRPC (..) + ( -- * Manager + MonadKRPC (..) , Manager , newManager , closeManager , withManager + , listen + -- * Queries , query - , Handler , handler - , listener - , listen ) where import Control.Applicative -- cgit v1.2.3 From 3a6bedc8da60ff422e0603552d9ab1cd7abb0f9f Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Mon, 6 Jan 2014 23:41:59 +0400 Subject: Add logging to query function --- krpc.cabal | 8 ++++--- src/Network/KRPC/Manager.hs | 55 ++++++++++++++++++++++++++++++--------------- src/Network/KRPC/Method.hs | 6 +++-- 3 files changed, 46 insertions(+), 23 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/krpc.cabal b/krpc.cabal index e902c9c3..7b0cafa2 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -44,13 +44,15 @@ library Network.KRPC.Method Network.KRPC.Manager build-depends: base == 4.* - , bytestring >= 0.10 + , bytestring >= 0.10 + , text >= 0.11 , lifted-base >= 0.1.1 , transformers >= 0.2 , mtl , monad-control >= 0.3 - , bencoding >= 0.4.3 - , network >= 2.3 + , monad-logger >= 0.3 + , bencoding >= 0.4.3 + , network >= 2.3 , containers if impl(ghc < 7.6) build-depends: ghc-prim diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 6bc448c6..cc2e383e 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -7,12 +7,14 @@ -- -- Normally, you don't need to import this module. -- -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE DefaultSignatures #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DefaultSignatures #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE TemplateHaskell #-} module Network.KRPC.Manager ( -- * Manager MonadKRPC (..) @@ -35,6 +37,7 @@ import Control.Concurrent.Lifted (fork) import Control.Exception hiding (Handler) import Control.Exception.Lifted as Lifted (catch) import Control.Monad +import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control import Data.BEncode as BE @@ -43,6 +46,9 @@ import Data.ByteString.Lazy as BL import Data.IORef import Data.List as L import Data.Map as M +import Data.Monoid +import Data.Text as T +import Data.Text.Encoding as T import Data.Tuple import Network.KRPC.Message import Network.KRPC.Method @@ -76,7 +82,9 @@ data Manager h = Manager } -- | A monad which can perform or handle queries. -class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where +class (MonadBaseControl IO m, MonadLogger m, MonadIO m) + => MonadKRPC h m | m -> h where + -- | Ask for manager. getManager :: m (Manager h) @@ -89,8 +97,9 @@ class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where default liftHandler :: m a -> m a liftHandler = id -instance (MonadBaseControl IO h, MonadIO h) +instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) => MonadKRPC h (ReaderT (Manager h) h) where + liftHandler = lift sockAddrFamily :: SockAddr -> Family @@ -178,23 +187,33 @@ queryResponse ares = do query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do Manager {..} <- getManager - liftIO $ do - tid <- genTransactionId transactionCounter - let Method name = method :: Method a b - let q = KQuery (toBEncode params) name tid - + tid <- liftIO $ genTransactionId transactionCounter + let queryMethod = method :: Method a b + let signature = T.pack (show queryMethod) + <> " @" <> T.pack (show addr) + <> " #" <> T.decodeUtf8 tid + $(logDebugS) "query.sending" signature + + mres <- liftIO $ do ares <- registerQuery (tid, addr) pendingCalls + + let q = KQuery (toBEncode params) (methodName queryMethod) tid sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - mres <- timeout (queryTimeout * 10 ^ (6 :: Int)) $ do + timeout (queryTimeout * 10 ^ (6 :: Int)) $ do queryResponse ares - case mres of - Just res -> return res - Nothing -> do - _ <- unregisterQuery (tid, addr) pendingCalls - throwIO $ timeoutExpired tid + case mres of + Just res -> do + $(logDebugS) "query.responded" $ signature + return res + + Nothing -> do + _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls + $(logWarnS) "query.not_responding" $ signature + <> " for " <> T.pack (show queryTimeout) <> " seconds" + throw $ timeoutExpired tid {----------------------------------------------------------------------- -- Handlers diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs index f70923f5..68f1fa4e 100644 --- a/src/Network/KRPC/Method.hs +++ b/src/Network/KRPC/Method.hs @@ -38,7 +38,7 @@ import Network.KRPC.Message -- -- * result: Type of return value of the method. -- -newtype Method param result = Method MethodName +newtype Method param result = Method { methodName :: MethodName } deriving (Eq, Ord, IsString, BEncode) -- | Example: @@ -74,7 +74,9 @@ showsMethod (Method name) = -- method = \"ping\" -- @ -- -class (BEncode req, BEncode resp) => KRPC req resp | req -> resp where +class (Typeable req, BEncode req, Typeable resp, BEncode resp) + => KRPC req resp | req -> resp where + -- | Method name. Default implementation uses lowercased @req@ -- datatype name. -- -- cgit v1.2.3 From 4e1a833637bf613a4674c7c35d4f12c811e9bf7b Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 7 Jan 2014 00:34:42 +0400 Subject: Add logging at handlers --- src/Network/KRPC/Manager.hs | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index cc2e383e..ee336a4d 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -31,7 +31,6 @@ module Network.KRPC.Manager ) where import Control.Applicative -import Control.Arrow import Control.Concurrent import Control.Concurrent.Lifted (fork) import Control.Exception hiding (Handler) @@ -145,6 +144,17 @@ closeManager Manager {..} = do withManager :: SockAddr -> [Handler h] -> (Manager h -> IO a) -> IO a withManager addr hs = bracket (newManager addr hs) closeManager +{----------------------------------------------------------------------- +-- Logging +-----------------------------------------------------------------------} + +querySignature :: MethodName -> TransactionId -> SockAddr -> Text +querySignature name transaction addr = T.concat + [ "&", T.decodeUtf8 name + , " #", T.decodeUtf8 transaction + , " @", T.pack (show addr) + ] + {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} @@ -189,9 +199,7 @@ query addr params = do Manager {..} <- getManager tid <- liftIO $ genTransactionId transactionCounter let queryMethod = method :: Method a b - let signature = T.pack (show queryMethod) - <> " @" <> T.pack (show addr) - <> " #" <> T.decodeUtf8 tid + let signature = querySignature (methodName queryMethod) tid addr $(logDebugS) "query.sending" signature mres <- liftIO $ do @@ -233,12 +241,28 @@ handler body = (name, wrapper) r <- body addr a return $ Right $ toBEncode r -runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult +runHandler :: MonadKRPC h m + => HandlerBody h -> SockAddr -> KQuery -> m KResult runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback where - wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId)) - <$> liftHandler (h addr queryArgs) - failback e = return $ Left $ serverError e queryId + signature = querySignature queryMethod queryId addr + + wrapper = do + $(logDebugS) "handler.quered" signature + result <- liftHandler (h addr queryArgs) + + case result of + Left msg -> do + $(logDebugS) "handler.failed" $ signature <> " !" <> T.pack msg + return $ Left $ decodeError msg queryId + + Right a -> do + $(logDebugS) "handler.success" signature + return $ Right $ a `KResponse` queryId + + failback e = do + $(logDebugS) "handler.errored" signature + return $ Left $ serverError e queryId dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult dispatchHandler q @ KQuery {..} addr = do -- cgit v1.2.3 From 2812bdadb55e1ca7a1e5685f3fb2dafe19259970 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 7 Jan 2014 02:33:46 +0400 Subject: Ignore EOF exception at recvFrom call --- src/Network/KRPC/Manager.hs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index ee336a4d..4d1cfb69 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -40,6 +40,7 @@ import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control import Data.BEncode as BE +import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.IORef @@ -53,6 +54,7 @@ import Network.KRPC.Message import Network.KRPC.Method import Network.Socket hiding (listen) import Network.Socket.ByteString as BS +import System.IO.Error import System.Timeout @@ -303,10 +305,17 @@ listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager forever $ do - (bs, addr) <- liftIO $ BS.recvFrom sock maxMsgSize + (bs, addr) <- liftIO $ handle exceptions $ BS.recvFrom sock maxMsgSize case BE.decode bs of - Left e -> liftIO $ sendMessage sock addr $ unknownMessage e + -- TODO ignore unknown messages at all? + Left e -> liftIO $ sendMessage sock addr $ unknownMessage e Right m -> handleMessage m addr + where + exceptions :: IOError -> IO (BS.ByteString, SockAddr) + exceptions e + -- packets with empty payload may trigger eof exception + | isEOFError e = return ("", SockAddrInet 0 0) + | otherwise = throwIO e -- | Should be run before any 'query', otherwise they will never -- succeed. -- cgit v1.2.3 From 6e77e14e2c011760eccc9d6989cd229420bdc741 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 7 Jan 2014 03:53:05 +0400 Subject: Allow to pass options from outside --- bench/Main.hs | 2 +- krpc.cabal | 1 + src/Network/KRPC.hs | 3 +++ src/Network/KRPC/Manager.hs | 60 +++++++++++++++++++++++++++++++++++---------- tests/Network/KRPCSpec.hs | 7 ++++-- 5 files changed, 57 insertions(+), 16 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/bench/Main.hs b/bench/Main.hs index 13727ff9..8466f4a3 100644 --- a/bench/Main.hs +++ b/bench/Main.hs @@ -22,7 +22,7 @@ addr :: SockAddr addr = SockAddrInet 6000 (256 * 256 * 256 + 127) main :: IO () -main = withManager addr [echo] $ \ m -> (`runReaderT` m) $ do +main = withManager def addr [echo] $ \ m -> (`runReaderT` m) $ do listen liftIO $ defaultMain (benchmarks m) where diff --git a/krpc.cabal b/krpc.cabal index b5004026..be19775f 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -46,6 +46,7 @@ library build-depends: base == 4.* , bytestring >= 0.10 , text >= 0.11 + , data-default-class , lifted-base >= 0.1.1 , transformers >= 0.2 , mtl diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index a1767161..7c02702c 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -62,6 +62,8 @@ module Network.KRPC -- * Manager , MonadKRPC (..) + , Options (..) + , def , Manager , newManager , closeManager @@ -76,6 +78,7 @@ module Network.KRPC , SockAddr (..) ) where +import Data.Default.Class import Network.KRPC.Message import Network.KRPC.Method import Network.KRPC.Manager diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4d1cfb69..7edcf72d 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -18,6 +18,7 @@ module Network.KRPC.Manager ( -- * Manager MonadKRPC (..) + , Options (..) , Manager , newManager , closeManager @@ -43,6 +44,7 @@ import Data.BEncode as BE import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL +import Data.Default.Class import Data.IORef import Data.List as L import Data.Map as M @@ -58,6 +60,41 @@ import System.IO.Error import System.Timeout +{----------------------------------------------------------------------- +-- Options +-----------------------------------------------------------------------} + +-- | RPC manager options. +data Options = Options + { -- | Initial 'TransactionId' incremented with each 'query'; + optSeedTransaction :: Int + + -- | Time to wait for response from remote node, in seconds. + , optQueryTimeout :: Int + } deriving (Show, Eq) + +defaultSeedTransaction :: Int +defaultSeedTransaction = 0 + +defaultQueryTimeout :: Int +defaultQueryTimeout = 120 + +-- | Permissive defaults. +instance Default Options where + def = Options + { optSeedTransaction = defaultSeedTransaction + , optQueryTimeout = defaultQueryTimeout + } + +validateOptions :: Options -> IO () +validateOptions Options {..} + | optQueryTimeout < 1 = throwIO (userError "non-positive query timeout") + | otherwise = return () + +{----------------------------------------------------------------------- +-- Options +-----------------------------------------------------------------------} + type KResult = Either KError KResponse type TransactionCounter = IORef Int @@ -108,23 +145,19 @@ sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX -seedTransaction :: Int -seedTransaction = 0 - -defaultQueryTimeout :: Int -defaultQueryTimeout = 120 - -- | Bind socket to the specified address. To enable query handling -- run 'listen'. -newManager :: SockAddr -- ^ address to listen on; +newManager :: Options -- ^ various protocol options; + -> SockAddr -- ^ address to listen on; -> [Handler h] -- ^ handlers to run on incoming queries. - -> IO (Manager h) -- ^ new manager. -newManager servAddr handlers = do + -> IO (Manager h) -- ^ new rpc manager. +newManager opts @ Options {..} servAddr handlers = do + validateOptions opts sock <- bindServ tref <- newEmptyMVar - tran <- newIORef seedTransaction + tran <- newIORef optSeedTransaction calls <- newIORef M.empty - return $ Manager sock defaultQueryTimeout tref tran calls handlers + return $ Manager sock optQueryTimeout tref tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -143,8 +176,9 @@ closeManager Manager {..} = do -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. -withManager :: SockAddr -> [Handler h] -> (Manager h -> IO a) -> IO a -withManager addr hs = bracket (newManager addr hs) closeManager +withManager :: Options -> SockAddr -> [Handler h] + -> (Manager h -> IO a) -> IO a +withManager opts addr hs = bracket (newManager opts addr hs) closeManager {----------------------------------------------------------------------- -- Logging diff --git a/tests/Network/KRPCSpec.hs b/tests/Network/KRPCSpec.hs index 7f5b2794..e73b1ec0 100644 --- a/tests/Network/KRPCSpec.hs +++ b/tests/Network/KRPCSpec.hs @@ -20,18 +20,21 @@ handlers = instance MonadLogger IO where monadLoggerLog _ _ _ _ = return () +opts :: Options +opts = def { optQueryTimeout = 1 } + spec :: Spec spec = do describe "query" $ do it "run handlers" $ do let int = 0xabcd :: Int - (withManager servAddr handlers $ runReaderT $ do + (withManager opts servAddr handlers $ runReaderT $ do listen query servAddr (Echo int)) `shouldReturn` Echo int it "throw timeout exception" $ do - (withManager servAddr handlers $ runReaderT $ do + (withManager opts servAddr handlers $ runReaderT $ do query servAddr (Echo (0xabcd :: Int)) ) `shouldThrow` (== KError GenericError "timeout expired" "0") -- cgit v1.2.3 From 018afe46b911c14472cf1a8cf315912e5c687e04 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 7 Jan 2014 04:18:50 +0400 Subject: Fix listenerThread mvar state tracking --- src/Network/KRPC/Manager.hs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 7edcf72d..d561d7b1 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -35,7 +35,7 @@ import Control.Applicative import Control.Concurrent import Control.Concurrent.Lifted (fork) import Control.Exception hiding (Handler) -import Control.Exception.Lifted as Lifted (catch) +import Control.Exception.Lifted as Lifted (catch, finally) import Control.Monad import Control.Monad.Logger import Control.Monad.Reader @@ -332,6 +332,7 @@ handleMessage (Q q) = handleQuery q handleMessage (R r) = handleResponse (Right r) handleMessage (E e) = handleResponse (Left e) +-- TODO to options maxMsgSize :: Int maxMsgSize = 64 * 1024 @@ -356,5 +357,7 @@ listener = do listen :: MonadKRPC h m => m () listen = do Manager {..} <- getManager - tid <- fork $ listener + tid <- fork $ do + listener `Lifted.finally` + liftIO (takeMVar listenerThread) liftIO $ putMVar listenerThread tid -- cgit v1.2.3 From a9a0be92f7db16e1d7afe3422e56b7d7d2a63ec9 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Tue, 7 Jan 2014 04:38:02 +0400 Subject: Allow to configure max buffer size --- src/Network/KRPC/Manager.hs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index d561d7b1..bf142738 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -67,10 +67,13 @@ import System.Timeout -- | RPC manager options. data Options = Options { -- | Initial 'TransactionId' incremented with each 'query'; - optSeedTransaction :: Int + optSeedTransaction :: {-# UNPACK #-} !Int -- | Time to wait for response from remote node, in seconds. - , optQueryTimeout :: Int + , optQueryTimeout :: {-# UNPACK #-} !Int + + -- | Maximum number of bytes to receive. + , optMaxMsgSize :: {-# UNPACK #-} !Int } deriving (Show, Eq) defaultSeedTransaction :: Int @@ -79,16 +82,23 @@ defaultSeedTransaction = 0 defaultQueryTimeout :: Int defaultQueryTimeout = 120 +defaultMaxMsgSize :: Int +defaultMaxMsgSize = 64 * 1024 + -- | Permissive defaults. instance Default Options where def = Options { optSeedTransaction = defaultSeedTransaction , optQueryTimeout = defaultQueryTimeout + , optMaxMsgSize = defaultMaxMsgSize } validateOptions :: Options -> IO () validateOptions Options {..} - | optQueryTimeout < 1 = throwIO (userError "non-positive query timeout") + | optQueryTimeout < 1 + = throwIO (userError "krpc: non-positive query timeout") + | optMaxMsgSize < 1 + = throwIO (userError "krpc: non-positive buffer size") | otherwise = return () {----------------------------------------------------------------------- @@ -112,7 +122,7 @@ type Handler h = (MethodName, HandlerBody h) -- made by /remote/ nodes. data Manager h = Manager { sock :: !Socket - , queryTimeout :: !Int -- ^ in seconds + , options :: !Options , listenerThread :: !(MVar ThreadId) , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls @@ -157,7 +167,7 @@ newManager opts @ Options {..} servAddr handlers = do tref <- newEmptyMVar tran <- newIORef optSeedTransaction calls <- newIORef M.empty - return $ Manager sock optQueryTimeout tref tran calls handlers + return $ Manager sock opts tref tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -245,7 +255,7 @@ query addr params = do sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - timeout (queryTimeout * 10 ^ (6 :: Int)) $ do + timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do queryResponse ares case mres of @@ -255,8 +265,8 @@ query addr params = do Nothing -> do _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls - $(logWarnS) "query.not_responding" $ signature - <> " for " <> T.pack (show queryTimeout) <> " seconds" + $(logWarnS) "query.not_responding" $ signature <> " for " <> + T.pack (show (optQueryTimeout options)) <> " seconds" throw $ timeoutExpired tid {----------------------------------------------------------------------- @@ -332,15 +342,13 @@ handleMessage (Q q) = handleQuery q handleMessage (R r) = handleResponse (Right r) handleMessage (E e) = handleResponse (Left e) --- TODO to options -maxMsgSize :: Int -maxMsgSize = 64 * 1024 - listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager forever $ do - (bs, addr) <- liftIO $ handle exceptions $ BS.recvFrom sock maxMsgSize + (bs, addr) <- liftIO $ do + handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) + case BE.decode bs of -- TODO ignore unknown messages at all? Left e -> liftIO $ sendMessage sock addr $ unknownMessage e -- cgit v1.2.3 From fe87b6cec9504114dafca26166b51f6c48250106 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 02:46:32 +0400 Subject: Introduce QueryFailure exceptions --- src/Network/KRPC.hs | 3 +++ src/Network/KRPC/Manager.hs | 30 +++++++++++++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 7c02702c..96971803 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -58,6 +58,9 @@ module Network.KRPC -- * RPC , Handler , handler + + -- ** Query + , QueryFailure (..) , query -- * Manager diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index bf142738..6799277f 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -14,6 +14,7 @@ {-# LANGUAGE DefaultSignatures #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE TemplateHaskell #-} module Network.KRPC.Manager ( -- * Manager @@ -26,7 +27,10 @@ module Network.KRPC.Manager , listen -- * Queries + , QueryFailure (..) , query + + -- * Handlers , Handler , handler ) where @@ -52,6 +56,7 @@ import Data.Monoid import Data.Text as T import Data.Text.Encoding as T import Data.Tuple +import Data.Typeable import Network.KRPC.Message import Network.KRPC.Method import Network.Socket hiding (listen) @@ -204,6 +209,15 @@ querySignature name transaction addr = T.concat {----------------------------------------------------------------------- -- Client -----------------------------------------------------------------------} +-- we don't need to know about TransactionId while performing query, +-- so we introduce QueryFailure exceptions + +data QueryFailure + = QueryFailed ErrorCode Text + | TimeoutExpired + deriving (Show, Eq, Typeable) + +instance Exception QueryFailure sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () sendMessage sock addr a = do @@ -230,15 +244,17 @@ unregisterQuery cid ref = do queryResponse :: BEncode a => CallRes -> IO a queryResponse ares = do res <- readMVar ares - KResponse {..} <- either throwIO pure res - case fromBEncode respVals of - Right r -> pure r - Left e -> throwIO $ decodeError e respId + case res of + Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) + Right (KResponse {..}) -> + case fromBEncode respVals of + Right r -> pure r + Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) -- | Enqueue query to the given node. -- --- This function will throw exception if quered node respond with --- @error@ message or timeout expires. +-- This function should throw 'QueryFailure' exception if quered node +-- respond with @error@ message or the query timeout expires. -- query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do @@ -267,7 +283,7 @@ query addr params = do _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls $(logWarnS) "query.not_responding" $ signature <> " for " <> T.pack (show (optQueryTimeout options)) <> " seconds" - throw $ timeoutExpired tid + throw $ TimeoutExpired {----------------------------------------------------------------------- -- Handlers -- cgit v1.2.3 From 6f909c0d81d04b997f8c81ec1ac05e94d7d1e5b6 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 06:26:35 +0400 Subject: Add HandlerFailure exceptions --- src/Network/KRPC.hs | 13 +++++----- src/Network/KRPC/Manager.hs | 60 ++++++++++++++++++++++++++++++++++++++------- src/Network/KRPC/Message.hs | 17 ------------- tests/Network/KRPCSpec.hs | 2 +- 4 files changed, 58 insertions(+), 34 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 96971803..69a4efca 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -56,13 +56,15 @@ module Network.KRPC , KRPC (..) -- * RPC - , Handler - , handler - -- ** Query , QueryFailure (..) , query + -- ** Handler + , HandlerFailure (..) + , Handler + , handler + -- * Manager , MonadKRPC (..) , Options (..) @@ -73,11 +75,8 @@ module Network.KRPC , withManager , listen - -- * Exceptions - , KError (..) + -- * Re-expor , ErrorCode (..) - - -- * Re-export , SockAddr (..) ) where diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 6799277f..222b961a 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -31,6 +31,7 @@ module Network.KRPC.Manager , query -- * Handlers + , HandlerFailure (..) , Handler , handler ) where @@ -39,7 +40,8 @@ import Control.Applicative import Control.Concurrent import Control.Concurrent.Lifted (fork) import Control.Exception hiding (Handler) -import Control.Exception.Lifted as Lifted (catch, finally) +import qualified Control.Exception.Lifted as E (Handler (..)) +import Control.Exception.Lifted as Lifted (catches, finally) import Control.Monad import Control.Monad.Logger import Control.Monad.Reader @@ -288,9 +290,38 @@ query addr params = do {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} +-- we already throw: +-- +-- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; +-- +-- * ErrorCode(ServerError) in the 'runHandler'; (those can be +-- async exception too) +-- +-- * ErrorCode(GenericError) on + +-- | Used to signal protocol errors. +data HandlerFailure + = BadAddress -- ^ for e.g.: node calls herself; + | InvalidParameter Text -- ^ for e.g.: bad session token. + deriving (Show, Eq, Typeable) + +instance Exception HandlerFailure + +prettyHF :: HandlerFailure -> BS.ByteString +prettyHF BadAddress = T.encodeUtf8 "bad address" +prettyHF (InvalidParameter reason) = T.encodeUtf8 $ + "invalid parameter: " <> reason + +prettyQF :: QueryFailure -> BS.ByteString +prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " + <> T.pack (show e) -- | Make handler from handler function. Any thrown exception will be -- supressed and send over the wire back to the querying node. +-- +-- If the handler make some 'query' normally it /should/ handle +-- corresponding 'QueryFailure's. +-- handler :: forall h a b. (KRPC a b, Monad h) => (SockAddr -> a -> h b) -> Handler h handler body = (name, wrapper) @@ -305,7 +336,7 @@ handler body = (name, wrapper) runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult -runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback +runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks where signature = querySignature queryMethod queryId addr @@ -315,22 +346,33 @@ runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback case result of Left msg -> do - $(logDebugS) "handler.failed" $ signature <> " !" <> T.pack msg - return $ Left $ decodeError msg queryId + $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg + return $ Left $ KError ProtocolError (BC.pack msg) queryId Right a -> do $(logDebugS) "handler.success" signature - return $ Right $ a `KResponse` queryId + return $ Right $ KResponse a queryId + + failbacks = + [ E.Handler $ \ (e :: HandlerFailure) -> do + $(logDebugS) "handler.failed" signature + return $ Left $ KError ProtocolError (prettyHF e) queryId + + -- may happen if handler makes query and fail + , E.Handler $ \ (e :: QueryFailure) -> do + return $ Left $ KError ServerError (prettyQF e) queryId - failback e = do - $(logDebugS) "handler.errored" signature - return $ Left $ serverError e queryId + -- since handler thread exit after sendMessage we can safely + -- suppress async exception here + , E.Handler $ \ (e :: SomeException) -> do + return $ Left $ KError GenericError (BC.pack (show e)) queryId + ] dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult dispatchHandler q @ KQuery {..} addr = do Manager {..} <- getManager case L.lookup queryMethod handlers of - Nothing -> return $ Left $ unknownMethod queryMethod queryId + Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId Just h -> runHandler h addr q {----------------------------------------------------------------------- diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs index d6279f11..96945843 100644 --- a/src/Network/KRPC/Message.hs +++ b/src/Network/KRPC/Message.hs @@ -26,11 +26,8 @@ module Network.KRPC.Message -- * Error , ErrorCode (..) , KError(..) - , serverError , decodeError - , unknownMethod , unknownMessage - , timeoutExpired -- * Query , KQuery(..) @@ -143,29 +140,15 @@ instance BEncode KError where instance Exception KError --- | Happen when some query handler fail. -serverError :: SomeException -> TransactionId -> KError -serverError e = KError ServerError (BC.pack (show e)) - -- | Received 'queryArgs' or 'respVals' can not be decoded. decodeError :: String -> TransactionId -> KError decodeError msg = KError ProtocolError (BC.pack msg) --- | If /remote/ node send query /this/ node doesn't know about then --- this error message should be sent in response. -unknownMethod :: MethodName -> TransactionId -> KError -unknownMethod = KError MethodUnknown - -- | A remote node has send some 'KMessage' this node is unable to -- decode. unknownMessage :: String -> KError unknownMessage msg = KError ProtocolError (BC.pack msg) unknownTransaction --- | A /remote/ node is not responding to the /our/ request the for --- specified period of time. -timeoutExpired :: TransactionId -> KError -timeoutExpired = KError GenericError "timeout expired" - {----------------------------------------------------------------------- -- Query messages -----------------------------------------------------------------------} diff --git a/tests/Network/KRPCSpec.hs b/tests/Network/KRPCSpec.hs index e73b1ec0..756c6855 100644 --- a/tests/Network/KRPCSpec.hs +++ b/tests/Network/KRPCSpec.hs @@ -37,4 +37,4 @@ spec = do (withManager opts servAddr handlers $ runReaderT $ do query servAddr (Echo (0xabcd :: Int)) ) - `shouldThrow` (== KError GenericError "timeout expired" "0") + `shouldThrow` (== TimeoutExpired) -- cgit v1.2.3 From e26ef0001157a1ff6b3a1ec809e5c53c37472161 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 06:48:34 +0400 Subject: Handle sendmsg failures --- src/Network/KRPC/Manager.hs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 222b961a..468744c1 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -214,9 +214,11 @@ querySignature name transaction addr = T.concat -- we don't need to know about TransactionId while performing query, -- so we introduce QueryFailure exceptions +-- | Used to signal 'query' errors. data QueryFailure - = QueryFailed ErrorCode Text - | TimeoutExpired + = SendFailed -- ^ unable to send query; + | QueryFailed ErrorCode Text -- ^ remote node return error; + | TimeoutExpired -- ^ remote node not responding. deriving (Show, Eq, Typeable) instance Exception QueryFailure @@ -253,6 +255,13 @@ queryResponse ares = do Right r -> pure r Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) +-- (sendmsg EINVAL) +sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () +sendQuery sock addr q = handle sockError $ sendMessage sock addr q + where + sockError :: IOError -> IO () + sockError _ = throwIO SendFailed + -- | Enqueue query to the given node. -- -- This function should throw 'QueryFailure' exception if quered node @@ -270,7 +279,7 @@ query addr params = do ares <- registerQuery (tid, addr) pendingCalls let q = KQuery (toBEncode params) (methodName queryMethod) tid - sendMessage sock addr q + sendQuery sock addr q `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do -- cgit v1.2.3 From 1fb619d9d5edc1c352e2b72cbf5dfcf5c64d05ff Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 06:56:28 +0400 Subject: Allow to ask for query count --- src/Network/KRPC.hs | 1 + src/Network/KRPC/Manager.hs | 8 ++++++++ tests/Network/KRPCSpec.hs | 9 +++++++++ 3 files changed, 18 insertions(+) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 69a4efca..3b722ac2 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -59,6 +59,7 @@ module Network.KRPC -- ** Query , QueryFailure (..) , query + , getQueryCount -- ** Handler , HandlerFailure (..) diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 468744c1..e2b60b6a 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -29,6 +29,7 @@ module Network.KRPC.Manager -- * Queries , QueryFailure (..) , query + , getQueryCount -- * Handlers , HandlerFailure (..) @@ -232,6 +233,13 @@ genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) return $ BC.pack (show cur) +-- | How many times 'query' call have been performed. +getQueryCount :: MonadKRPC h m => m Int +getQueryCount = do + Manager {..} <- getManager + curTrans <- liftIO $ readIORef transactionCounter + return $ curTrans - optSeedTransaction options + registerQuery :: CallId -> PendingCalls -> IO CallRes registerQuery cid ref = do ares <- newEmptyMVar diff --git a/tests/Network/KRPCSpec.hs b/tests/Network/KRPCSpec.hs index 756c6855..e695a646 100644 --- a/tests/Network/KRPCSpec.hs +++ b/tests/Network/KRPCSpec.hs @@ -33,6 +33,15 @@ spec = do query servAddr (Echo int)) `shouldReturn` Echo int + it "count transactions properly" $ do + (withManager opts servAddr handlers $ runReaderT $ do + listen + _ <- query servAddr (Echo (0xabcd :: Int)) + _ <- query servAddr (Echo (0xabcd :: Int)) + getQueryCount + ) + `shouldReturn` 2 + it "throw timeout exception" $ do (withManager opts servAddr handlers $ runReaderT $ do query servAddr (Echo (0xabcd :: Int)) -- cgit v1.2.3 From 73ce8f14a938326975050691042b93ad5eedca66 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 07:01:19 +0400 Subject: Add some options TODOs --- src/Network/KRPC/Manager.hs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index e2b60b6a..22bfe477 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -202,6 +202,7 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager -- Logging -----------------------------------------------------------------------} +-- TODO prettify log messages querySignature :: MethodName -> TransactionId -> SockAddr -> Text querySignature name transaction addr = T.concat [ "&", T.decodeUtf8 name @@ -311,10 +312,12 @@ query addr params = do -- -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; -- --- * ErrorCode(ServerError) in the 'runHandler'; (those can be +-- * ErrorCode(ServerError) in the 'runHandler'; +-- +-- * ErrorCode(GenericError) in the 'runHandler' (those can be -- async exception too) -- --- * ErrorCode(GenericError) on +-- so HandlerFailure should cover *only* 'ProtocolError's. -- | Used to signal protocol errors. data HandlerFailure @@ -396,6 +399,13 @@ dispatchHandler q @ KQuery {..} addr = do -- Listener -----------------------------------------------------------------------} +-- TODO bound amount of parallel handler *threads*: +-- +-- peer A flooding with find_node +-- peer B trying to ping peer C +-- peer B fork too many threads +-- ... space leak +-- handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m () handleQuery q addr = void $ fork $ do Manager {..} <- getManager -- cgit v1.2.3 From 41ca2fc6ece3e24542703035c4249f409eca3906 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 8 Jan 2014 07:11:37 +0400 Subject: Bump version number --- ChangeLog | 11 +++++++++++ krpc.cabal | 6 +++--- src/Network/KRPC.hs | 2 +- src/Network/KRPC/Manager.hs | 2 +- src/Network/KRPC/Message.hs | 2 +- src/Network/KRPC/Method.hs | 2 +- 6 files changed, 18 insertions(+), 7 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/ChangeLog b/ChangeLog index 48a67416..c65825a3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,14 @@ +2014-01-08 Sam Truzjan + + 0.6.0.0: Logging + exceptions. + + API changes: + + * MonadLogger is superclass of MonadKRPC; + * KError hidden from Network.KRPC; + * HandlerFailure added; + * QueryFailure and getQueryCount added. + 2013-12-25 Sam Truzjan 0.5.0.0: Major API changes. diff --git a/krpc.cabal b/krpc.cabal index be19775f..fc350367 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -1,10 +1,10 @@ name: krpc -version: 0.5.0.0 +version: 0.6.0.0 license: BSD3 license-file: LICENSE author: Sam Truzjan maintainer: Sam Truzjan -copyright: (c) 2013, Sam Truzjan +copyright: (c) 2013-2014 Sam Truzjan category: Network build-type: Simple cabal-version: >= 1.10 @@ -32,7 +32,7 @@ source-repository this type: git location: git://github.com/cobit/krpc.git branch: master - tag: v0.5.0.0 + tag: v0.6.0.0 library default-language: Haskell2010 diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index bb7f7127..f7b8378a 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -1,5 +1,5 @@ -- | --- Copyright : (c) Sam Truzjan 2013 +-- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 22bfe477..e0ea9618 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -1,5 +1,5 @@ -- | --- Copyright : (c) Sam Truzjan 2013 +-- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs index 96945843..ebf5573e 100644 --- a/src/Network/KRPC/Message.hs +++ b/src/Network/KRPC/Message.hs @@ -1,5 +1,5 @@ -- | --- Copyright : (c) Sam Truzjan 2013 +-- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs index 68f1fa4e..10f988b8 100644 --- a/src/Network/KRPC/Method.hs +++ b/src/Network/KRPC/Method.hs @@ -1,5 +1,5 @@ -- | --- Copyright : (c) Sam Truzjan 2013 +-- Copyright : (c) Sam Truzjan 2013, 2014 -- License : BSD3 -- Maintainer : pxqr.sta@gmail.com -- Stability : experimental -- cgit v1.2.3 From 9dda7109e1877821612488602cbea3014a3e8566 Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 19 Feb 2014 05:16:09 +0400 Subject: Add function isActive --- src/Network/KRPC.hs | 1 + src/Network/KRPC/Manager.hs | 7 +++++++ 2 files changed, 8 insertions(+) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index f7b8378a..b15927cf 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -74,6 +74,7 @@ module Network.KRPC , newManager , closeManager , withManager + , isActive , listen -- * Re-exports diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index e0ea9618..4436a9ba 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -24,6 +24,7 @@ module Network.KRPC.Manager , newManager , closeManager , withManager + , isActive , listen -- * Queries @@ -192,6 +193,12 @@ closeManager Manager {..} = do -- TODO unblock calls close sock +-- | Check if the manager is still active. Manager becomes active +-- until 'closeManager' called. +isActive :: Manager m -> IO Bool +isActive Manager {..} = liftIO $ isBound sock +{-# INLINE isActive #-} + -- | Normally you should use Control.Monad.Trans.Resource.allocate -- function. withManager :: Options -> SockAddr -> [Handler h] -- cgit v1.2.3 From 5d0791e6ed2e500c08e7dadda39a254c8340cef5 Mon Sep 17 00:00:00 2001 From: joe Date: Tue, 17 Jan 2017 18:42:09 -0500 Subject: Handle reflected IP addresses (see bep 42). --- krpc.cabal | 14 +++++++-- src/Network/KRPC.hs | 4 ++- src/Network/KRPC/Manager.hs | 61 ++++++++++++++++++++++++--------------- src/Network/KRPC/Message.hs | 45 ++++++++++++++++++++++++++--- src/Network/KRPC/Method.hs | 3 +- tests/Network/KRPC/MessageSpec.hs | 7 +++-- 6 files changed, 99 insertions(+), 35 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/krpc.cabal b/krpc.cabal index c565bd2a..66c08ccb 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -34,6 +34,11 @@ source-repository this branch: master tag: v0.6.1.0 +flag builder + description: Use older bytestring package and bytestring-builder. + default: False + + library default-language: Haskell2010 default-extensions: PatternGuards @@ -44,7 +49,6 @@ library Network.KRPC.Method Network.KRPC.Manager build-depends: base == 4.* - , bytestring >= 0.10 , text >= 0.11 , data-default-class , lifted-base >= 0.1.1 @@ -54,7 +58,13 @@ library , monad-logger >= 0.3 , bencoding >= 0.4.3 , network >= 2.3 + , cereal , containers + if flag(builder) + build-depends: bytestring >= 0.9, bytestring-builder + else + build-depends: bytestring >= 0.10 + if impl(ghc < 7.6) build-depends: ghc-prim ghc-options: -Wall @@ -89,4 +99,4 @@ benchmark bench , monad-logger , criterion , krpc - ghc-options: -O2 -fforce-recomp \ No newline at end of file + ghc-options: -O2 -fforce-recomp diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index b15927cf..d185fb4c 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -59,6 +59,8 @@ module Network.KRPC -- ** Query , QueryFailure (..) , query + , query' + , queryRaw , getQueryCount -- ** Handler @@ -86,4 +88,4 @@ import Data.Default.Class import Network.KRPC.Message import Network.KRPC.Method import Network.KRPC.Manager -import Network.Socket (SockAddr (..)) \ No newline at end of file +import Network.Socket (SockAddr (..)) diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4436a9ba..9477d23c 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -30,6 +30,8 @@ module Network.KRPC.Manager -- * Queries , QueryFailure (..) , query + , query' + , queryRaw , getQueryCount -- * Handlers @@ -49,6 +51,7 @@ import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control import Data.BEncode as BE +import Data.BEncode.Internal as BE import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL @@ -118,7 +121,7 @@ type KResult = Either KError KResponse type TransactionCounter = IORef Int type CallId = (TransactionId, SockAddr) -type CallRes = MVar KResult +type CallRes = MVar (BValue, KResult) type PendingCalls = IORef (Map CallId CallRes) type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) @@ -163,6 +166,7 @@ sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 sockAddrFamily (SockAddrUnix _ ) = AF_UNIX +sockAddrFamily (SockAddrCan _ ) = AF_CAN -- | Bind socket to the specified address. To enable query handling -- run 'listen'. @@ -261,15 +265,6 @@ unregisterQuery cid ref = do atomicModifyIORef' ref $ swap . M.updateLookupWithKey (const (const Nothing)) cid -queryResponse :: BEncode a => CallRes -> IO a -queryResponse ares = do - res <- readMVar ares - case res of - Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) - Right (KResponse {..}) -> - case fromBEncode respVals of - Right r -> pure r - Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) -- (sendmsg EINVAL) sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () @@ -284,7 +279,21 @@ sendQuery sock addr q = handle sockError $ sendMessage sock addr q -- respond with @error@ message or the query timeout expires. -- query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b -query addr params = do +query addr params = queryK addr params (\_ x _ -> x) + +-- | Like 'query' but possibly returns your externally routable IP address. +query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) +query' addr params = queryK addr params (const (,)) + +-- | Enqueue a query, but give us the complete BEncoded content sent by the +-- remote Node. This is useful for handling extensions that this library does +-- not otherwise support. +queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) +queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) + +queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => + SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x +queryK addr params kont = do Manager {..} <- getManager tid <- liftIO $ genTransactionId transactionCounter let queryMethod = method :: Method a b @@ -299,7 +308,13 @@ query addr params = do `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do - queryResponse ares + (raw,res) <- readMVar ares + case res of + Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) + Right (KResponse {..}) -> + case fromBEncode respVals of + Right r -> pure $ kont raw r respIP + Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) case mres of Just res -> do @@ -378,7 +393,7 @@ runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks Right a -> do $(logDebugS) "handler.success" signature - return $ Right $ KResponse a queryId + return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do @@ -419,20 +434,20 @@ handleQuery q addr = void $ fork $ do res <- dispatchHandler q addr sendMessage sock addr $ either toBEncode toBEncode res -handleResponse :: MonadKRPC h m => KResult -> SockAddr -> m () -handleResponse result addr = do +handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () +handleResponse raw result addr = do Manager {..} <- getManager liftIO $ do let resultId = either errorId respId result mcall <- unregisterQuery (resultId, addr) pendingCalls case mcall of Nothing -> return () - Just ares -> putMVar ares result + Just ares -> putMVar ares (raw,result) -handleMessage :: MonadKRPC h m => KMessage -> SockAddr -> m () -handleMessage (Q q) = handleQuery q -handleMessage (R r) = handleResponse (Right r) -handleMessage (E e) = handleResponse (Left e) +handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () +handleMessage _ (Q q) = handleQuery q +handleMessage raw (R r) = handleResponse raw (Right r) +handleMessage raw (E e) = handleResponse raw (Left e) listener :: MonadKRPC h m => m () listener = do @@ -441,10 +456,10 @@ listener = do (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) - case BE.decode bs of + case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of -- TODO ignore unknown messages at all? - Left e -> liftIO $ sendMessage sock addr $ unknownMessage e - Right m -> handleMessage m addr + Left e -> liftIO $ sendMessage sock addr $ unknownMessage e + Right (raw,m) -> handleMessage raw m addr where exceptions :: IOError -> IO (BS.ByteString, SockAddr) exceptions e diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs index ebf5573e..6f4ae620 100644 --- a/src/Network/KRPC/Message.hs +++ b/src/Network/KRPC/Message.hs @@ -35,17 +35,22 @@ module Network.KRPC.Message -- * Response , KResponse(..) + , ReflectedIP(..) -- * Message , KMessage (..) ) where import Control.Applicative +import Control.Arrow import Control.Exception.Lifted as Lifted import Data.BEncode as BE import Data.ByteString as B import Data.ByteString.Char8 as BC +import qualified Data.Serialize as S +import Data.Word import Data.Typeable +import Network.Socket (SockAddr (..),PortNumber,HostAddress) -- | This transaction ID is generated by the querying node and is @@ -188,6 +193,35 @@ instance BEncode KQuery where KQuery <$>! "a" <*>! "q" <*>! "t" {-# INLINE fromBEncode #-} +newtype ReflectedIP = ReflectedIP SockAddr + deriving (Eq, Ord, Show) + +instance BEncode ReflectedIP where + toBEncode (ReflectedIP addr) = BString (encodeAddr addr) + fromBEncode (BString bs) = ReflectedIP <$> decodeAddr bs + fromBEncode _ = Left "ReflectedIP should be a bencoded string" + +port16 :: Word16 -> PortNumber +port16 = fromIntegral + +decodeAddr :: ByteString -> Either String SockAddr +decodeAddr bs | B.length bs == 6 + = ( \(a,p) -> SockAddrInet <$> fmap port16 p <*> a ) + $ (S.runGet S.getWord32host *** S.decode ) + $ B.splitAt 4 bs +decodeAddr bs | B.length bs == 18 + = ( \(a,p) -> flip SockAddrInet6 0 <$> fmap port16 p <*> a <*> pure 0 ) + $ (S.decode *** S.decode ) + $ B.splitAt 16 bs +decodeAddr _ = Left "incorrectly sized address and port" + +encodeAddr :: SockAddr -> ByteString +encodeAddr (SockAddrInet port addr) + = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) +encodeAddr (SockAddrInet6 port _ addr _) + = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) +encodeAddr _ = B.empty + {----------------------------------------------------------------------- -- Response messages -----------------------------------------------------------------------} @@ -206,7 +240,8 @@ instance BEncode KQuery where data KResponse = KResponse { respVals :: BValue -- ^ 'BDict' containing return values; , respId :: TransactionId -- ^ match to the corresponding 'queryId'. - } deriving (Show, Read, Eq, Ord, Typeable) + , respIP :: Maybe ReflectedIP + } deriving (Show, Eq, Ord, Typeable) -- | Responses, or KRPC message dictionaries with a \"y\" value of -- \"r\", contain one additional key \"r\". The value of \"r\" is a @@ -218,7 +253,8 @@ data KResponse = KResponse -- instance BEncode KResponse where toBEncode KResponse {..} = toDict $ - "r" .=! respVals + "ip" .=? respIP + .: "r" .=! respVals .: "t" .=! respId .: "y" .=! ("r" :: ByteString) .: endDict @@ -226,7 +262,8 @@ instance BEncode KResponse where fromBEncode = fromDict $ do lookAhead $ match "y" (BString "r") - KResponse <$>! "r" <*>! "t" + addr <- optional (field (req "ip")) + (\r t -> KResponse r t addr) <$>! "r" <*>! "t" {-# INLINE fromBEncode #-} {----------------------------------------------------------------------- @@ -249,4 +286,4 @@ instance BEncode KMessage where Q <$> fromBEncode b <|> R <$> fromBEncode b <|> E <$> fromBEncode b - <|> decodingError "KMessage: unknown message or message tag" \ No newline at end of file + <|> decodingError "KMessage: unknown message or message tag" diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs index ea9da958..916b38a8 100644 --- a/src/Network/KRPC/Method.hs +++ b/src/Network/KRPC/Method.hs @@ -47,8 +47,7 @@ newtype Method param result = Method { methodName :: MethodName } instance (Typeable a, Typeable b) => Show (Method a b) where showsPrec _ = showsMethod -showsMethod :: forall a. forall b. Typeable a => Typeable b - => Method a b -> ShowS +showsMethod :: forall a b. ( Typeable a , Typeable b ) => Method a b -> ShowS showsMethod (Method name) = showString (BC.unpack name) <> showString " :: " <> diff --git a/tests/Network/KRPC/MessageSpec.hs b/tests/Network/KRPC/MessageSpec.hs index 7aca4489..498ef679 100644 --- a/tests/Network/KRPC/MessageSpec.hs +++ b/tests/Network/KRPC/MessageSpec.hs @@ -20,7 +20,8 @@ instance Arbitrary KQuery where arbitrary = KQuery <$> pure (BInteger 0) <*> arbitrary <*> arbitrary instance Arbitrary KResponse where - arbitrary = KResponse <$> pure (BList []) <*> arbitrary + -- TODO: Abitrary instance for ReflectedIP + arbitrary = KResponse <$> pure (BList []) <*> arbitrary <*> pure Nothing instance Arbitrary KMessage where arbitrary = frequency @@ -64,8 +65,8 @@ spec = do it "properly bencoded" $ do BE.decode "d1:rle1:t2:aa1:y1:re" `shouldBe` - Right (KResponse (BList []) "aa") + Right (KResponse (BList []) "aa" Nothing) describe "generic message" $ do it "properly bencoded (iso)" $ property $ \ km -> - BE.decode (BL.toStrict (BE.encode km)) `shouldBe` Right (km :: KMessage) \ No newline at end of file + BE.decode (BL.toStrict (BE.encode km)) `shouldBe` Right (km :: KMessage) -- cgit v1.2.3 From a8498921ddf37e864968a3865e3e254352b5d285 Mon Sep 17 00:00:00 2001 From: joe Date: Wed, 18 Jan 2017 20:11:36 -0500 Subject: Aeson-based pretty-printing of server requests. --- krpc.cabal | 7 +++++ src/Data/BEncode/Pretty.hs | 75 +++++++++++++++++++++++++++++++++++++++++++++ src/Network/KRPC/Manager.hs | 15 ++++++--- 3 files changed, 93 insertions(+), 4 deletions(-) create mode 100644 src/Data/BEncode/Pretty.hs (limited to 'src/Network/KRPC/Manager.hs') diff --git a/krpc.cabal b/krpc.cabal index 66c08ccb..452f1132 100644 --- a/krpc.cabal +++ b/krpc.cabal @@ -38,6 +38,9 @@ flag builder description: Use older bytestring package and bytestring-builder. default: False +flag aeson + description: Use aeson for pretty-printing bencoded data. + default: True library default-language: Haskell2010 @@ -48,6 +51,7 @@ library Network.KRPC.Message Network.KRPC.Method Network.KRPC.Manager + Data.BEncode.Pretty build-depends: base == 4.* , text >= 0.11 , data-default-class @@ -60,6 +64,9 @@ library , network >= 2.3 , cereal , containers + if flag(aeson) + build-depends: aeson, aeson-pretty, unordered-containers, vector + ghc-options: -DBENCODE_AESON if flag(builder) build-depends: bytestring >= 0.9, bytestring-builder else diff --git a/src/Data/BEncode/Pretty.hs b/src/Data/BEncode/Pretty.hs new file mode 100644 index 00000000..7b0d46a0 --- /dev/null +++ b/src/Data/BEncode/Pretty.hs @@ -0,0 +1,75 @@ +{-# LANGUAGE CPP #-} +module Data.BEncode.Pretty where -- (showBEncode) where + +import Data.BEncode.Types +import qualified Data.ByteString as BS +import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Lazy.Char8 as BL8 +import Data.Text (Text) +import qualified Data.Text as T +#ifdef BENCODE_AESON +import Data.BEncode.BDict hiding (map) +import Data.Aeson.Types hiding (parse) +import Data.Aeson.Encode.Pretty +import qualified Data.HashMap.Strict as HashMap +import qualified Data.Vector as Vector +import Data.Foldable as Foldable +import Data.Text.Encoding +import Text.Printf +#endif + +#ifdef BENCODE_AESON + +unhex :: Text -> BS.ByteString +unhex t = BS.pack $ map unhex1 [0 .. BS.length nibs `div` 2] + where + nibs = encodeUtf8 t + unhex1 i = unnib (BS.index nibs (i * 2)) * 0x10 + + unnib (BS.index nibs (i * 2 + 1)) + unnib a | a <= 0x39 = a - 0x30 + | otherwise = a - (0x41 - 10) + +hex :: BS.ByteString -> Text +hex bs = T.concat $ map (T.pack . printf "%02X") $ BS.unpack bs + +quote_chr :: Char +quote_chr = ' ' + +quote :: Text -> Text +quote t = quote_chr `T.cons` t `T.snoc` quote_chr + + +instance ToJSON BValue where + toJSON (BInteger x) = Number $ fromIntegral x + toJSON (BString s) = String $ either (const $ hex s) quote $ decodeUtf8' s + toJSON (BList xs) = Array $ Vector.fromList $ map toJSON xs + toJSON (BDict d) = toJSON d + +instance ToJSON a => ToJSON (BDictMap a) where + toJSON d = Object $ HashMap.fromList $ map convert $ toAscList d + where + convert (k,v) = (decodeUtf8 k,toJSON v) + +instance FromJSON BValue where + parseJSON (Number x) = pure $ BInteger (truncate x) + parseJSON (Bool x) = pure $ BInteger $ if x then 1 else 0 + parseJSON (String s) + | T.head s==quote_chr = pure $ BString $ encodeUtf8 (T.takeWhile (/=quote_chr) $ T.drop 1 s) + | otherwise = pure $ BString $ unhex s + parseJSON (Array v) = BList <$> traverse parseJSON (Foldable.toList v) + parseJSON (Object d) = BDict <$> parseJSON (Object d) + parseJSON (Null) = pure $ BDict Nil + +instance FromJSON v => FromJSON (BDictMap v) where + parseJSON (Object d) = fromAscList <$> traverse convert (HashMap.toList d) + where + convert (k,v) = (,) (encodeUtf8 k) <$> parseJSON v + parseJSON _ = fail "Not a BDict" +#endif + +showBEncode :: BValue -> BL.ByteString +#ifdef BENCODE_AESON +showBEncode b = encodePretty $ toJSON b +#else +showBEncode b = BL8.pack (show b) +#endif diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 9477d23c..c90c92f9 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -52,6 +52,7 @@ import Control.Monad.Reader import Control.Monad.Trans.Control import Data.BEncode as BE import Data.BEncode.Internal as BE +import Data.BEncode.Pretty (showBEncode) import Data.ByteString as BS import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL @@ -428,11 +429,17 @@ dispatchHandler q @ KQuery {..} addr = do -- peer B fork too many threads -- ... space leak -- -handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m () -handleQuery q addr = void $ fork $ do +handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () +handleQuery raw q addr = void $ fork $ do Manager {..} <- getManager res <- dispatchHandler q addr - sendMessage sock addr $ either toBEncode toBEncode res + let resbe = either toBEncode toBEncode res + $(logOther "q") $ T.unlines + [ either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) + , "==>" + , either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) + ] + sendMessage sock addr resbe handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () handleResponse raw result addr = do @@ -445,7 +452,7 @@ handleResponse raw result addr = do Just ares -> putMVar ares (raw,result) handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () -handleMessage _ (Q q) = handleQuery q +handleMessage raw (Q q) = handleQuery raw q handleMessage raw (R r) = handleResponse raw (Right r) handleMessage raw (E e) = handleResponse raw (Left e) -- cgit v1.2.3