summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/Network/KRPC/Manager.hs34
1 files changed, 21 insertions, 13 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index d561d7b1..bf142738 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -67,10 +67,13 @@ import System.Timeout
67-- | RPC manager options. 67-- | RPC manager options.
68data Options = Options 68data Options = Options
69 { -- | Initial 'TransactionId' incremented with each 'query'; 69 { -- | Initial 'TransactionId' incremented with each 'query';
70 optSeedTransaction :: Int 70 optSeedTransaction :: {-# UNPACK #-} !Int
71 71
72 -- | Time to wait for response from remote node, in seconds. 72 -- | Time to wait for response from remote node, in seconds.
73 , optQueryTimeout :: Int 73 , optQueryTimeout :: {-# UNPACK #-} !Int
74
75 -- | Maximum number of bytes to receive.
76 , optMaxMsgSize :: {-# UNPACK #-} !Int
74 } deriving (Show, Eq) 77 } deriving (Show, Eq)
75 78
76defaultSeedTransaction :: Int 79defaultSeedTransaction :: Int
@@ -79,16 +82,23 @@ defaultSeedTransaction = 0
79defaultQueryTimeout :: Int 82defaultQueryTimeout :: Int
80defaultQueryTimeout = 120 83defaultQueryTimeout = 120
81 84
85defaultMaxMsgSize :: Int
86defaultMaxMsgSize = 64 * 1024
87
82-- | Permissive defaults. 88-- | Permissive defaults.
83instance Default Options where 89instance Default Options where
84 def = Options 90 def = Options
85 { optSeedTransaction = defaultSeedTransaction 91 { optSeedTransaction = defaultSeedTransaction
86 , optQueryTimeout = defaultQueryTimeout 92 , optQueryTimeout = defaultQueryTimeout
93 , optMaxMsgSize = defaultMaxMsgSize
87 } 94 }
88 95
89validateOptions :: Options -> IO () 96validateOptions :: Options -> IO ()
90validateOptions Options {..} 97validateOptions Options {..}
91 | optQueryTimeout < 1 = throwIO (userError "non-positive query timeout") 98 | optQueryTimeout < 1
99 = throwIO (userError "krpc: non-positive query timeout")
100 | optMaxMsgSize < 1
101 = throwIO (userError "krpc: non-positive buffer size")
92 | otherwise = return () 102 | otherwise = return ()
93 103
94{----------------------------------------------------------------------- 104{-----------------------------------------------------------------------
@@ -112,7 +122,7 @@ type Handler h = (MethodName, HandlerBody h)
112-- made by /remote/ nodes. 122-- made by /remote/ nodes.
113data Manager h = Manager 123data Manager h = Manager
114 { sock :: !Socket 124 { sock :: !Socket
115 , queryTimeout :: !Int -- ^ in seconds 125 , options :: !Options
116 , listenerThread :: !(MVar ThreadId) 126 , listenerThread :: !(MVar ThreadId)
117 , transactionCounter :: {-# UNPACK #-} !TransactionCounter 127 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
118 , pendingCalls :: {-# UNPACK #-} !PendingCalls 128 , pendingCalls :: {-# UNPACK #-} !PendingCalls
@@ -157,7 +167,7 @@ newManager opts @ Options {..} servAddr handlers = do
157 tref <- newEmptyMVar 167 tref <- newEmptyMVar
158 tran <- newIORef optSeedTransaction 168 tran <- newIORef optSeedTransaction
159 calls <- newIORef M.empty 169 calls <- newIORef M.empty
160 return $ Manager sock optQueryTimeout tref tran calls handlers 170 return $ Manager sock opts tref tran calls handlers
161 where 171 where
162 bindServ = do 172 bindServ = do
163 let family = sockAddrFamily servAddr 173 let family = sockAddrFamily servAddr
@@ -245,7 +255,7 @@ query addr params = do
245 sendMessage sock addr q 255 sendMessage sock addr q
246 `onException` unregisterQuery (tid, addr) pendingCalls 256 `onException` unregisterQuery (tid, addr) pendingCalls
247 257
248 timeout (queryTimeout * 10 ^ (6 :: Int)) $ do 258 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
249 queryResponse ares 259 queryResponse ares
250 260
251 case mres of 261 case mres of
@@ -255,8 +265,8 @@ query addr params = do
255 265
256 Nothing -> do 266 Nothing -> do
257 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls 267 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
258 $(logWarnS) "query.not_responding" $ signature 268 $(logWarnS) "query.not_responding" $ signature <> " for " <>
259 <> " for " <> T.pack (show queryTimeout) <> " seconds" 269 T.pack (show (optQueryTimeout options)) <> " seconds"
260 throw $ timeoutExpired tid 270 throw $ timeoutExpired tid
261 271
262{----------------------------------------------------------------------- 272{-----------------------------------------------------------------------
@@ -332,15 +342,13 @@ handleMessage (Q q) = handleQuery q
332handleMessage (R r) = handleResponse (Right r) 342handleMessage (R r) = handleResponse (Right r)
333handleMessage (E e) = handleResponse (Left e) 343handleMessage (E e) = handleResponse (Left e)
334 344
335-- TODO to options
336maxMsgSize :: Int
337maxMsgSize = 64 * 1024
338
339listener :: MonadKRPC h m => m () 345listener :: MonadKRPC h m => m ()
340listener = do 346listener = do
341 Manager {..} <- getManager 347 Manager {..} <- getManager
342 forever $ do 348 forever $ do
343 (bs, addr) <- liftIO $ handle exceptions $ BS.recvFrom sock maxMsgSize 349 (bs, addr) <- liftIO $ do
350 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
351
344 case BE.decode bs of 352 case BE.decode bs of
345 -- TODO ignore unknown messages at all? 353 -- TODO ignore unknown messages at all?
346 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e 354 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e