From 3cbfda28704a6963baf8bcd919826b0ae67b2a5a Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Sat, 21 Dec 2013 01:25:33 +0400 Subject: Separate KRPC monad from Handler monad --- src/Network/KRPC.hs | 14 ++---- src/Network/KRPC/Manager.hs | 105 +++++++++++++++++++++++++++++--------------- 2 files changed, 73 insertions(+), 46 deletions(-) (limited to 'src') diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs index 09d1c5b2..6809a330 100644 --- a/src/Network/KRPC.hs +++ b/src/Network/KRPC.hs @@ -86,16 +86,6 @@ -- -- For protocol details see 'Remote.KRPC.Protocol' module. -- -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE ViewPatterns #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE ExplicitForAll #-} -{-# LANGUAGE KindSignatures #-} -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE MultiParamTypeClasses #-} -{-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE FunctionalDependencies #-} module Network.KRPC ( -- * Methods Method @@ -103,10 +93,12 @@ module Network.KRPC -- * RPC , handler + , listen , query -- * Manager , MonadKRPC (..) + , Manager , newManager -- , closeManager @@ -116,4 +108,4 @@ module Network.KRPC import Network.KRPC.Message import Network.KRPC.Method -import Network.KRPC.Manager \ No newline at end of file +import Network.KRPC.Manager diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 9aa1bea7..64b0dd62 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -1,29 +1,40 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE DefaultSignatures #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE FlexibleInstances #-} module Network.KRPC.Manager ( MonadKRPC (..) + , Manager , newManager + , closeManager , query + , handler + , listener + , listen ) where import Control.Applicative import Control.Arrow import Control.Concurrent ---import Control.Exception hiding (Handler) -import Control.Exception.Lifted as Lifted hiding (Handler) +import Control.Concurrent.Lifted (fork) +import Control.Exception hiding (Handler) +import Control.Exception.Lifted as Lifted (catch) import Control.Monad +import Control.Monad.Reader import Control.Monad.Trans.Control -import Control.Monad.IO.Class import Data.BEncode as BE import Data.ByteString.Char8 as BC import Data.ByteString.Lazy as BL import Data.IORef import Data.List as L import Data.Map as M +import Data.Tuple import Network.KRPC.Message import Network.KRPC.Method -import Network.Socket +import Network.Socket hiding (listen) import Network.Socket.ByteString as BS @@ -34,18 +45,27 @@ type CallId = (TransactionId, SockAddr) type CallRes = MVar KResult type PendingCalls = IORef (Map CallId CallRes) -type HandlerBody m = SockAddr -> BValue -> m (BE.Result BValue) -type Handler m = (MethodName, HandlerBody m) +type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) +type Handler h = (MethodName, HandlerBody h) -data Manager m = Manager +data Manager h = Manager { sock :: !Socket , transactionCounter :: {-# UNPACK #-} !TransactionCounter , pendingCalls :: {-# UNPACK #-} !PendingCalls - , handlers :: [Handler m] + , handlers :: [Handler h] } -class (MonadBaseControl IO m, MonadIO m) => MonadKRPC m where - getManager :: m (Manager a) +class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where + getManager :: m (Manager h) + + default getManager :: MonadReader (Manager h) m => m (Manager h) + getManager = ask + + liftHandler :: h a -> m a + +instance (MonadBaseControl IO h, MonadIO h) + => MonadKRPC h (ReaderT (Manager h) h) where + liftHandler = lift sockAddrFamily :: SockAddr -> Family sockAddrFamily (SockAddrInet _ _ ) = AF_INET @@ -55,12 +75,12 @@ sockAddrFamily (SockAddrUnix _ ) = AF_UNIX seedTransaction :: Int seedTransaction = 0 -newManager :: SockAddr -> IO (Manager a) -newManager servAddr = do +newManager :: SockAddr -> [Handler h] -> IO (Manager h) +newManager servAddr handlers = do sock <- bindServ tran <- newIORef seedTransaction calls <- newIORef M.empty - return $ Manager sock tran calls [] + return $ Manager sock tran calls handlers where bindServ = do let family = sockAddrFamily servAddr @@ -70,8 +90,14 @@ newManager servAddr = do bindSocket sock servAddr return sock +-- | Unblock all pending calls and close socket. +closeManager :: Manager m -> IO () +closeManager Manager {..} = do + -- TODO unblock calls + close sock + sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () -sendMessage sock addr a = +sendMessage sock addr a = do liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr {----------------------------------------------------------------------- @@ -89,9 +115,10 @@ registerQuery cid ref = do atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) return ares -unregisterQuery :: CallId -> PendingCalls -> IO () +unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) unregisterQuery cid ref = do - atomicModifyIORef' ref $ \ m -> (M.delete cid m, ()) + atomicModifyIORef' ref $ swap . + M.updateLookupWithKey (const (const Nothing)) cid queryResponse :: BEncode a => CallRes -> IO a queryResponse ares = do @@ -103,7 +130,7 @@ queryResponse ares = do Left e -> throwIO (KError ProtocolError (BC.pack e) respId) Right a -> return a -query :: forall m a b. (MonadKRPC m, KRPC a b) => SockAddr -> a -> m b +query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b query addr params = do Manager {..} <- getManager liftIO $ do @@ -113,55 +140,60 @@ query addr params = do ares <- registerQuery (tid, addr) pendingCalls sendMessage sock addr q `onException` unregisterQuery (tid, addr) pendingCalls - queryResponse ares + res <- queryResponse ares + return res {----------------------------------------------------------------------- -- Handlers -----------------------------------------------------------------------} -handler :: forall m a b. (KRPC a b, MonadKRPC m) - => (SockAddr -> a -> m b) -> Handler m +handler :: forall h a b. (KRPC a b, Monad h) + => (SockAddr -> a -> h b) -> Handler h handler body = (name, wrapper) where Method name = method :: Method a b wrapper addr args = case fromBEncode args of Left e -> return $ Left e - Right a -> (Right . toBEncode) <$> body addr a + Right a -> do + r <- body addr a + return $ Right $ toBEncode r -runHandler :: MonadKRPC m => HandlerBody m -> SockAddr -> KQuery -> m KResult -runHandler handler addr KQuery {..} = wrapper `Lifted.catch` failback +runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult +runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback where wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId)) - <$> handler addr queryArgs + <$> liftHandler (h addr queryArgs) failback e = return $ Left $ serverError e queryId -dispatchHandler :: MonadKRPC m => KQuery -> SockAddr -> m KResult +dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult dispatchHandler q @ KQuery {..} addr = do Manager {..} <- getManager case L.lookup queryMethod handlers of - Nothing -> return $ Left $ unknownMethod queryMethod queryId - Just handler -> runHandler handler addr q + Nothing -> return $ Left $ unknownMethod queryMethod queryId + Just h -> runHandler h addr q {----------------------------------------------------------------------- -- Listener -----------------------------------------------------------------------} -handleQuery :: MonadKRPC m => KQuery -> SockAddr -> m () +handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m () handleQuery q addr = do Manager {..} <- getManager res <- dispatchHandler q addr sendMessage sock addr $ either toBEncode toBEncode res -handleResponse :: MonadKRPC m => KResult -> SockAddr -> m () +handleResponse :: MonadKRPC h m => KResult -> SockAddr -> m () handleResponse result addr = do Manager {..} <- getManager - mcall <- undefined (addr, respId) pendingCalls - case mcall of - Nothing -> return () - Just ares -> liftIO $ putMVar ares result + liftIO $ do + let resultId = either errorId respId result + mcall <- unregisterQuery (resultId, addr) pendingCalls + case mcall of + Nothing -> return () + Just ares -> putMVar ares result -handleMessage :: MonadKRPC m => KMessage -> SockAddr -> m () +handleMessage :: MonadKRPC h m => KMessage -> SockAddr -> m () handleMessage (Q q) = handleQuery q handleMessage (R r) = handleResponse (Right r) handleMessage (E e) = handleResponse (Left e) @@ -169,7 +201,7 @@ handleMessage (E e) = handleResponse (Left e) maxMsgSize :: Int maxMsgSize = 64 * 1024 -listener :: MonadKRPC m => m () +listener :: MonadKRPC h m => m () listener = do Manager {..} <- getManager forever $ do @@ -177,3 +209,6 @@ listener = do case BE.decode bs of Left e -> liftIO $ sendMessage sock addr $ unknownMessage e Right m -> handleMessage m addr + +listen :: MonadKRPC h m => m ThreadId +listen = fork $ listener -- cgit v1.2.3