summaryrefslogtreecommitdiff
path: root/src/Network/KRPC/Manager.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/KRPC/Manager.hs')
-rw-r--r--src/Network/KRPC/Manager.hs105
1 files changed, 70 insertions, 35 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index 9aa1bea7..64b0dd62 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -1,29 +1,40 @@
1{-# LANGUAGE FlexibleContexts #-} 1{-# LANGUAGE FlexibleContexts #-}
2{-# LANGUAGE ScopedTypeVariables #-} 2{-# LANGUAGE ScopedTypeVariables #-}
3{-# LANGUAGE DefaultSignatures #-}
4{-# LANGUAGE MultiParamTypeClasses #-}
5{-# LANGUAGE FunctionalDependencies #-}
6{-# LANGUAGE FlexibleInstances #-}
3module Network.KRPC.Manager 7module Network.KRPC.Manager
4 ( MonadKRPC (..) 8 ( MonadKRPC (..)
9 , Manager
5 , newManager 10 , newManager
11 , closeManager
6 , query 12 , query
13
7 , handler 14 , handler
15 , listener
16 , listen
8 ) where 17 ) where
9 18
10import Control.Applicative 19import Control.Applicative
11import Control.Arrow 20import Control.Arrow
12import Control.Concurrent 21import Control.Concurrent
13--import Control.Exception hiding (Handler) 22import Control.Concurrent.Lifted (fork)
14import Control.Exception.Lifted as Lifted hiding (Handler) 23import Control.Exception hiding (Handler)
24import Control.Exception.Lifted as Lifted (catch)
15import Control.Monad 25import Control.Monad
26import Control.Monad.Reader
16import Control.Monad.Trans.Control 27import Control.Monad.Trans.Control
17import Control.Monad.IO.Class
18import Data.BEncode as BE 28import Data.BEncode as BE
19import Data.ByteString.Char8 as BC 29import Data.ByteString.Char8 as BC
20import Data.ByteString.Lazy as BL 30import Data.ByteString.Lazy as BL
21import Data.IORef 31import Data.IORef
22import Data.List as L 32import Data.List as L
23import Data.Map as M 33import Data.Map as M
34import Data.Tuple
24import Network.KRPC.Message 35import Network.KRPC.Message
25import Network.KRPC.Method 36import Network.KRPC.Method
26import Network.Socket 37import Network.Socket hiding (listen)
27import Network.Socket.ByteString as BS 38import Network.Socket.ByteString as BS
28 39
29 40
@@ -34,18 +45,27 @@ type CallId = (TransactionId, SockAddr)
34type CallRes = MVar KResult 45type CallRes = MVar KResult
35type PendingCalls = IORef (Map CallId CallRes) 46type PendingCalls = IORef (Map CallId CallRes)
36 47
37type HandlerBody m = SockAddr -> BValue -> m (BE.Result BValue) 48type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue)
38type Handler m = (MethodName, HandlerBody m) 49type Handler h = (MethodName, HandlerBody h)
39 50
40data Manager m = Manager 51data Manager h = Manager
41 { sock :: !Socket 52 { sock :: !Socket
42 , transactionCounter :: {-# UNPACK #-} !TransactionCounter 53 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
43 , pendingCalls :: {-# UNPACK #-} !PendingCalls 54 , pendingCalls :: {-# UNPACK #-} !PendingCalls
44 , handlers :: [Handler m] 55 , handlers :: [Handler h]
45 } 56 }
46 57
47class (MonadBaseControl IO m, MonadIO m) => MonadKRPC m where 58class (MonadBaseControl IO m, MonadIO m) => MonadKRPC h m | m -> h where
48 getManager :: m (Manager a) 59 getManager :: m (Manager h)
60
61 default getManager :: MonadReader (Manager h) m => m (Manager h)
62 getManager = ask
63
64 liftHandler :: h a -> m a
65
66instance (MonadBaseControl IO h, MonadIO h)
67 => MonadKRPC h (ReaderT (Manager h) h) where
68 liftHandler = lift
49 69
50sockAddrFamily :: SockAddr -> Family 70sockAddrFamily :: SockAddr -> Family
51sockAddrFamily (SockAddrInet _ _ ) = AF_INET 71sockAddrFamily (SockAddrInet _ _ ) = AF_INET
@@ -55,12 +75,12 @@ sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
55seedTransaction :: Int 75seedTransaction :: Int
56seedTransaction = 0 76seedTransaction = 0
57 77
58newManager :: SockAddr -> IO (Manager a) 78newManager :: SockAddr -> [Handler h] -> IO (Manager h)
59newManager servAddr = do 79newManager servAddr handlers = do
60 sock <- bindServ 80 sock <- bindServ
61 tran <- newIORef seedTransaction 81 tran <- newIORef seedTransaction
62 calls <- newIORef M.empty 82 calls <- newIORef M.empty
63 return $ Manager sock tran calls [] 83 return $ Manager sock tran calls handlers
64 where 84 where
65 bindServ = do 85 bindServ = do
66 let family = sockAddrFamily servAddr 86 let family = sockAddrFamily servAddr
@@ -70,8 +90,14 @@ newManager servAddr = do
70 bindSocket sock servAddr 90 bindSocket sock servAddr
71 return sock 91 return sock
72 92
93-- | Unblock all pending calls and close socket.
94closeManager :: Manager m -> IO ()
95closeManager Manager {..} = do
96 -- TODO unblock calls
97 close sock
98
73sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () 99sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m ()
74sendMessage sock addr a = 100sendMessage sock addr a = do
75 liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr 101 liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr
76 102
77{----------------------------------------------------------------------- 103{-----------------------------------------------------------------------
@@ -89,9 +115,10 @@ registerQuery cid ref = do
89 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) 115 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
90 return ares 116 return ares
91 117
92unregisterQuery :: CallId -> PendingCalls -> IO () 118unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
93unregisterQuery cid ref = do 119unregisterQuery cid ref = do
94 atomicModifyIORef' ref $ \ m -> (M.delete cid m, ()) 120 atomicModifyIORef' ref $ swap .
121 M.updateLookupWithKey (const (const Nothing)) cid
95 122
96queryResponse :: BEncode a => CallRes -> IO a 123queryResponse :: BEncode a => CallRes -> IO a
97queryResponse ares = do 124queryResponse ares = do
@@ -103,7 +130,7 @@ queryResponse ares = do
103 Left e -> throwIO (KError ProtocolError (BC.pack e) respId) 130 Left e -> throwIO (KError ProtocolError (BC.pack e) respId)
104 Right a -> return a 131 Right a -> return a
105 132
106query :: forall m a b. (MonadKRPC m, KRPC a b) => SockAddr -> a -> m b 133query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
107query addr params = do 134query addr params = do
108 Manager {..} <- getManager 135 Manager {..} <- getManager
109 liftIO $ do 136 liftIO $ do
@@ -113,55 +140,60 @@ query addr params = do
113 ares <- registerQuery (tid, addr) pendingCalls 140 ares <- registerQuery (tid, addr) pendingCalls
114 sendMessage sock addr q 141 sendMessage sock addr q
115 `onException` unregisterQuery (tid, addr) pendingCalls 142 `onException` unregisterQuery (tid, addr) pendingCalls
116 queryResponse ares 143 res <- queryResponse ares
144 return res
117 145
118{----------------------------------------------------------------------- 146{-----------------------------------------------------------------------
119-- Handlers 147-- Handlers
120-----------------------------------------------------------------------} 148-----------------------------------------------------------------------}
121 149
122handler :: forall m a b. (KRPC a b, MonadKRPC m) 150handler :: forall h a b. (KRPC a b, Monad h)
123 => (SockAddr -> a -> m b) -> Handler m 151 => (SockAddr -> a -> h b) -> Handler h
124handler body = (name, wrapper) 152handler body = (name, wrapper)
125 where 153 where
126 Method name = method :: Method a b 154 Method name = method :: Method a b
127 wrapper addr args = 155 wrapper addr args =
128 case fromBEncode args of 156 case fromBEncode args of
129 Left e -> return $ Left e 157 Left e -> return $ Left e
130 Right a -> (Right . toBEncode) <$> body addr a 158 Right a -> do
159 r <- body addr a
160 return $ Right $ toBEncode r
131 161
132runHandler :: MonadKRPC m => HandlerBody m -> SockAddr -> KQuery -> m KResult 162runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult
133runHandler handler addr KQuery {..} = wrapper `Lifted.catch` failback 163runHandler h addr KQuery {..} = wrapper `Lifted.catch` failback
134 where 164 where
135 wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId)) 165 wrapper = ((`decodeError` queryId) +++ (`KResponse` queryId))
136 <$> handler addr queryArgs 166 <$> liftHandler (h addr queryArgs)
137 failback e = return $ Left $ serverError e queryId 167 failback e = return $ Left $ serverError e queryId
138 168
139dispatchHandler :: MonadKRPC m => KQuery -> SockAddr -> m KResult 169dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
140dispatchHandler q @ KQuery {..} addr = do 170dispatchHandler q @ KQuery {..} addr = do
141 Manager {..} <- getManager 171 Manager {..} <- getManager
142 case L.lookup queryMethod handlers of 172 case L.lookup queryMethod handlers of
143 Nothing -> return $ Left $ unknownMethod queryMethod queryId 173 Nothing -> return $ Left $ unknownMethod queryMethod queryId
144 Just handler -> runHandler handler addr q 174 Just h -> runHandler h addr q
145 175
146{----------------------------------------------------------------------- 176{-----------------------------------------------------------------------
147-- Listener 177-- Listener
148-----------------------------------------------------------------------} 178-----------------------------------------------------------------------}
149 179
150handleQuery :: MonadKRPC m => KQuery -> SockAddr -> m () 180handleQuery :: MonadKRPC h m => KQuery -> SockAddr -> m ()
151handleQuery q addr = do 181handleQuery q addr = do
152 Manager {..} <- getManager 182 Manager {..} <- getManager
153 res <- dispatchHandler q addr 183 res <- dispatchHandler q addr
154 sendMessage sock addr $ either toBEncode toBEncode res 184 sendMessage sock addr $ either toBEncode toBEncode res
155 185
156handleResponse :: MonadKRPC m => KResult -> SockAddr -> m () 186handleResponse :: MonadKRPC h m => KResult -> SockAddr -> m ()
157handleResponse result addr = do 187handleResponse result addr = do
158 Manager {..} <- getManager 188 Manager {..} <- getManager
159 mcall <- undefined (addr, respId) pendingCalls 189 liftIO $ do
160 case mcall of 190 let resultId = either errorId respId result
161 Nothing -> return () 191 mcall <- unregisterQuery (resultId, addr) pendingCalls
162 Just ares -> liftIO $ putMVar ares result 192 case mcall of
193 Nothing -> return ()
194 Just ares -> putMVar ares result
163 195
164handleMessage :: MonadKRPC m => KMessage -> SockAddr -> m () 196handleMessage :: MonadKRPC h m => KMessage -> SockAddr -> m ()
165handleMessage (Q q) = handleQuery q 197handleMessage (Q q) = handleQuery q
166handleMessage (R r) = handleResponse (Right r) 198handleMessage (R r) = handleResponse (Right r)
167handleMessage (E e) = handleResponse (Left e) 199handleMessage (E e) = handleResponse (Left e)
@@ -169,7 +201,7 @@ handleMessage (E e) = handleResponse (Left e)
169maxMsgSize :: Int 201maxMsgSize :: Int
170maxMsgSize = 64 * 1024 202maxMsgSize = 64 * 1024
171 203
172listener :: MonadKRPC m => m () 204listener :: MonadKRPC h m => m ()
173listener = do 205listener = do
174 Manager {..} <- getManager 206 Manager {..} <- getManager
175 forever $ do 207 forever $ do
@@ -177,3 +209,6 @@ listener = do
177 case BE.decode bs of 209 case BE.decode bs of
178 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e 210 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e
179 Right m -> handleMessage m addr 211 Right m -> handleMessage m addr
212
213listen :: MonadKRPC h m => m ThreadId
214listen = fork $ listener