diff options
Diffstat (limited to 'src/Network/KRPC')
-rw-r--r-- | src/Network/KRPC/Manager.hs | 31 | ||||
-rw-r--r-- | src/Network/KRPC/Message.hs | 14 |
2 files changed, 34 insertions, 11 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4a3dc93f..0b090e6b 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs | |||
@@ -37,6 +37,7 @@ import Network.KRPC.Message | |||
37 | import Network.KRPC.Method | 37 | import Network.KRPC.Method |
38 | import Network.Socket hiding (listen) | 38 | import Network.Socket hiding (listen) |
39 | import Network.Socket.ByteString as BS | 39 | import Network.Socket.ByteString as BS |
40 | import System.Timeout | ||
40 | 41 | ||
41 | 42 | ||
42 | type KResult = Either KError KResponse | 43 | type KResult = Either KError KResponse |
@@ -51,6 +52,7 @@ type Handler h = (MethodName, HandlerBody h) | |||
51 | 52 | ||
52 | data Manager h = Manager | 53 | data Manager h = Manager |
53 | { sock :: !Socket | 54 | { sock :: !Socket |
55 | , queryTimeout :: !Int -- ^ in seconds | ||
54 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | 56 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter |
55 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | 57 | , pendingCalls :: {-# UNPACK #-} !PendingCalls |
56 | , handlers :: [Handler h] | 58 | , handlers :: [Handler h] |
@@ -76,12 +78,15 @@ sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | |||
76 | seedTransaction :: Int | 78 | seedTransaction :: Int |
77 | seedTransaction = 0 | 79 | seedTransaction = 0 |
78 | 80 | ||
81 | defaultQueryTimeout :: Int | ||
82 | defaultQueryTimeout = 10 | ||
83 | |||
79 | newManager :: SockAddr -> [Handler h] -> IO (Manager h) | 84 | newManager :: SockAddr -> [Handler h] -> IO (Manager h) |
80 | newManager servAddr handlers = do | 85 | newManager servAddr handlers = do |
81 | sock <- bindServ | 86 | sock <- bindServ |
82 | tran <- newIORef seedTransaction | 87 | tran <- newIORef seedTransaction |
83 | calls <- newIORef M.empty | 88 | calls <- newIORef M.empty |
84 | return $ Manager sock tran calls handlers | 89 | return $ Manager sock defaultQueryTimeout tran calls handlers |
85 | where | 90 | where |
86 | bindServ = do | 91 | bindServ = do |
87 | let family = sockAddrFamily servAddr | 92 | let family = sockAddrFamily servAddr |
@@ -116,6 +121,8 @@ registerQuery cid ref = do | |||
116 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | 121 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) |
117 | return ares | 122 | return ares |
118 | 123 | ||
124 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
125 | -- or more responses to the same query | ||
119 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | 126 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) |
120 | unregisterQuery cid ref = do | 127 | unregisterQuery cid ref = do |
121 | atomicModifyIORef' ref $ swap . | 128 | atomicModifyIORef' ref $ swap . |
@@ -123,13 +130,11 @@ unregisterQuery cid ref = do | |||
123 | 130 | ||
124 | queryResponse :: BEncode a => CallRes -> IO a | 131 | queryResponse :: BEncode a => CallRes -> IO a |
125 | queryResponse ares = do | 132 | queryResponse ares = do |
126 | res <- readMVar ares | 133 | res <- readMVar ares |
127 | case res of | 134 | KResponse {..} <- either throwIO pure res |
128 | Left e -> throwIO e | 135 | case fromBEncode respVals of |
129 | Right (KResponse {..}) -> | 136 | Right r -> pure r |
130 | case fromBEncode respVals of | 137 | Left e -> throwIO $ decodeError e respId |
131 | Left e -> throwIO (KError ProtocolError (BC.pack e) respId) | ||
132 | Right a -> return a | ||
133 | 138 | ||
134 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | 139 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b |
135 | query addr params = do | 140 | query addr params = do |
@@ -138,11 +143,17 @@ query addr params = do | |||
138 | tid <- genTransactionId transactionCounter | 143 | tid <- genTransactionId transactionCounter |
139 | let Method name = method :: Method a b | 144 | let Method name = method :: Method a b |
140 | let q = KQuery (toBEncode params) name tid | 145 | let q = KQuery (toBEncode params) name tid |
146 | |||
141 | ares <- registerQuery (tid, addr) pendingCalls | 147 | ares <- registerQuery (tid, addr) pendingCalls |
142 | sendMessage sock addr q | 148 | sendMessage sock addr q |
143 | `onException` unregisterQuery (tid, addr) pendingCalls | 149 | `onException` unregisterQuery (tid, addr) pendingCalls |
144 | res <- queryResponse ares | 150 | |
145 | return res | 151 | mres <- timeout (queryTimeout * 10 ^ 6) $ queryResponse ares |
152 | case mres of | ||
153 | Just res -> return res | ||
154 | Nothing -> do | ||
155 | unregisterQuery (tid, addr) pendingCalls | ||
156 | throwIO $ timeoutExpired tid | ||
146 | 157 | ||
147 | {----------------------------------------------------------------------- | 158 | {----------------------------------------------------------------------- |
148 | -- Handlers | 159 | -- Handlers |
diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs index 3bbfb1db..0bd34400 100644 --- a/src/Network/KRPC/Message.hs +++ b/src/Network/KRPC/Message.hs | |||
@@ -30,6 +30,7 @@ module Network.KRPC.Message | |||
30 | , decodeError | 30 | , decodeError |
31 | , unknownMethod | 31 | , unknownMethod |
32 | , unknownMessage | 32 | , unknownMessage |
33 | , timeoutExpired | ||
33 | 34 | ||
34 | -- * Query | 35 | -- * Query |
35 | , KQuery(..) | 36 | , KQuery(..) |
@@ -130,17 +131,28 @@ instance BEncode KError where | |||
130 | 131 | ||
131 | instance Exception KError | 132 | instance Exception KError |
132 | 133 | ||
134 | -- | Happen when some handler fail. | ||
133 | serverError :: SomeException -> TransactionId -> KError | 135 | serverError :: SomeException -> TransactionId -> KError |
134 | serverError e = KError ServerError (BC.pack (show e)) | 136 | serverError e = KError ServerError (BC.pack (show e)) |
135 | 137 | ||
138 | -- | Received 'queryArgs' or 'respVals' can not be decoded. | ||
136 | decodeError :: String -> TransactionId -> KError | 139 | decodeError :: String -> TransactionId -> KError |
137 | decodeError msg = KError ProtocolError (BC.pack msg) | 140 | decodeError msg = KError ProtocolError (BC.pack msg) |
138 | 141 | ||
142 | -- | If /remote/ node send query /this/ node doesn't know about then | ||
143 | -- this error message should be sent in response. | ||
139 | unknownMethod :: MethodName -> TransactionId -> KError | 144 | unknownMethod :: MethodName -> TransactionId -> KError |
140 | unknownMethod = KError MethodUnknown | 145 | unknownMethod = KError MethodUnknown |
141 | 146 | ||
147 | -- | A remote node has send some 'KMessage' this node is unable to | ||
148 | -- decode. | ||
142 | unknownMessage :: String -> KError | 149 | unknownMessage :: String -> KError |
143 | unknownMessage msg = KError ProtocolError (BC.pack msg) "" | 150 | unknownMessage msg = KError ProtocolError (BC.pack msg) unknownTransaction |
151 | |||
152 | -- | A /remote/ node is not responding to the /our/ request the for | ||
153 | -- specified period of time. | ||
154 | timeoutExpired :: TransactionId -> KError | ||
155 | timeoutExpired = KError GenericError "timeout expired" | ||
144 | 156 | ||
145 | {----------------------------------------------------------------------- | 157 | {----------------------------------------------------------------------- |
146 | -- Query messages | 158 | -- Query messages |