diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-07 04:38:02 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-01-07 04:38:02 +0400 |
commit | a9a0be92f7db16e1d7afe3422e56b7d7d2a63ec9 (patch) | |
tree | 1bc56ddf219bedb3211c33286cf14574e481f290 | |
parent | 018afe46b911c14472cf1a8cf315912e5c687e04 (diff) |
Allow to configure max buffer size
-rw-r--r-- | src/Network/KRPC/Manager.hs | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index d561d7b1..bf142738 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs | |||
@@ -67,10 +67,13 @@ import System.Timeout | |||
67 | -- | RPC manager options. | 67 | -- | RPC manager options. |
68 | data Options = Options | 68 | data Options = Options |
69 | { -- | Initial 'TransactionId' incremented with each 'query'; | 69 | { -- | Initial 'TransactionId' incremented with each 'query'; |
70 | optSeedTransaction :: Int | 70 | optSeedTransaction :: {-# UNPACK #-} !Int |
71 | 71 | ||
72 | -- | Time to wait for response from remote node, in seconds. | 72 | -- | Time to wait for response from remote node, in seconds. |
73 | , optQueryTimeout :: Int | 73 | , optQueryTimeout :: {-# UNPACK #-} !Int |
74 | |||
75 | -- | Maximum number of bytes to receive. | ||
76 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
74 | } deriving (Show, Eq) | 77 | } deriving (Show, Eq) |
75 | 78 | ||
76 | defaultSeedTransaction :: Int | 79 | defaultSeedTransaction :: Int |
@@ -79,16 +82,23 @@ defaultSeedTransaction = 0 | |||
79 | defaultQueryTimeout :: Int | 82 | defaultQueryTimeout :: Int |
80 | defaultQueryTimeout = 120 | 83 | defaultQueryTimeout = 120 |
81 | 84 | ||
85 | defaultMaxMsgSize :: Int | ||
86 | defaultMaxMsgSize = 64 * 1024 | ||
87 | |||
82 | -- | Permissive defaults. | 88 | -- | Permissive defaults. |
83 | instance Default Options where | 89 | instance Default Options where |
84 | def = Options | 90 | def = Options |
85 | { optSeedTransaction = defaultSeedTransaction | 91 | { optSeedTransaction = defaultSeedTransaction |
86 | , optQueryTimeout = defaultQueryTimeout | 92 | , optQueryTimeout = defaultQueryTimeout |
93 | , optMaxMsgSize = defaultMaxMsgSize | ||
87 | } | 94 | } |
88 | 95 | ||
89 | validateOptions :: Options -> IO () | 96 | validateOptions :: Options -> IO () |
90 | validateOptions Options {..} | 97 | validateOptions Options {..} |
91 | | optQueryTimeout < 1 = throwIO (userError "non-positive query timeout") | 98 | | optQueryTimeout < 1 |
99 | = throwIO (userError "krpc: non-positive query timeout") | ||
100 | | optMaxMsgSize < 1 | ||
101 | = throwIO (userError "krpc: non-positive buffer size") | ||
92 | | otherwise = return () | 102 | | otherwise = return () |
93 | 103 | ||
94 | {----------------------------------------------------------------------- | 104 | {----------------------------------------------------------------------- |
@@ -112,7 +122,7 @@ type Handler h = (MethodName, HandlerBody h) | |||
112 | -- made by /remote/ nodes. | 122 | -- made by /remote/ nodes. |
113 | data Manager h = Manager | 123 | data Manager h = Manager |
114 | { sock :: !Socket | 124 | { sock :: !Socket |
115 | , queryTimeout :: !Int -- ^ in seconds | 125 | , options :: !Options |
116 | , listenerThread :: !(MVar ThreadId) | 126 | , listenerThread :: !(MVar ThreadId) |
117 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | 127 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter |
118 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | 128 | , pendingCalls :: {-# UNPACK #-} !PendingCalls |
@@ -157,7 +167,7 @@ newManager opts @ Options {..} servAddr handlers = do | |||
157 | tref <- newEmptyMVar | 167 | tref <- newEmptyMVar |
158 | tran <- newIORef optSeedTransaction | 168 | tran <- newIORef optSeedTransaction |
159 | calls <- newIORef M.empty | 169 | calls <- newIORef M.empty |
160 | return $ Manager sock optQueryTimeout tref tran calls handlers | 170 | return $ Manager sock opts tref tran calls handlers |
161 | where | 171 | where |
162 | bindServ = do | 172 | bindServ = do |
163 | let family = sockAddrFamily servAddr | 173 | let family = sockAddrFamily servAddr |
@@ -245,7 +255,7 @@ query addr params = do | |||
245 | sendMessage sock addr q | 255 | sendMessage sock addr q |
246 | `onException` unregisterQuery (tid, addr) pendingCalls | 256 | `onException` unregisterQuery (tid, addr) pendingCalls |
247 | 257 | ||
248 | timeout (queryTimeout * 10 ^ (6 :: Int)) $ do | 258 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do |
249 | queryResponse ares | 259 | queryResponse ares |
250 | 260 | ||
251 | case mres of | 261 | case mres of |
@@ -255,8 +265,8 @@ query addr params = do | |||
255 | 265 | ||
256 | Nothing -> do | 266 | Nothing -> do |
257 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | 267 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls |
258 | $(logWarnS) "query.not_responding" $ signature | 268 | $(logWarnS) "query.not_responding" $ signature <> " for " <> |
259 | <> " for " <> T.pack (show queryTimeout) <> " seconds" | 269 | T.pack (show (optQueryTimeout options)) <> " seconds" |
260 | throw $ timeoutExpired tid | 270 | throw $ timeoutExpired tid |
261 | 271 | ||
262 | {----------------------------------------------------------------------- | 272 | {----------------------------------------------------------------------- |
@@ -332,15 +342,13 @@ handleMessage (Q q) = handleQuery q | |||
332 | handleMessage (R r) = handleResponse (Right r) | 342 | handleMessage (R r) = handleResponse (Right r) |
333 | handleMessage (E e) = handleResponse (Left e) | 343 | handleMessage (E e) = handleResponse (Left e) |
334 | 344 | ||
335 | -- TODO to options | ||
336 | maxMsgSize :: Int | ||
337 | maxMsgSize = 64 * 1024 | ||
338 | |||
339 | listener :: MonadKRPC h m => m () | 345 | listener :: MonadKRPC h m => m () |
340 | listener = do | 346 | listener = do |
341 | Manager {..} <- getManager | 347 | Manager {..} <- getManager |
342 | forever $ do | 348 | forever $ do |
343 | (bs, addr) <- liftIO $ handle exceptions $ BS.recvFrom sock maxMsgSize | 349 | (bs, addr) <- liftIO $ do |
350 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
351 | |||
344 | case BE.decode bs of | 352 | case BE.decode bs of |
345 | -- TODO ignore unknown messages at all? | 353 | -- TODO ignore unknown messages at all? |
346 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e | 354 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e |