summaryrefslogtreecommitdiff
path: root/src/Network/KRPC/Manager.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/KRPC/Manager.hs')
-rw-r--r--src/Network/KRPC/Manager.hs485
1 files changed, 485 insertions, 0 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
new file mode 100644
index 00000000..c90c92f9
--- /dev/null
+++ b/src/Network/KRPC/Manager.hs
@@ -0,0 +1,485 @@
1-- |
2-- Copyright : (c) Sam Truzjan 2013, 2014
3-- License : BSD3
4-- Maintainer : pxqr.sta@gmail.com
5-- Stability : experimental
6-- Portability : portable
7--
8-- Normally, you don't need to import this module.
9--
10{-# LANGUAGE OverloadedStrings #-}
11{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE ScopedTypeVariables #-}
14{-# LANGUAGE DefaultSignatures #-}
15{-# LANGUAGE MultiParamTypeClasses #-}
16{-# LANGUAGE FunctionalDependencies #-}
17{-# LANGUAGE DeriveDataTypeable #-}
18{-# LANGUAGE TemplateHaskell #-}
19module Network.KRPC.Manager
20 ( -- * Manager
21 MonadKRPC (..)
22 , Options (..)
23 , Manager
24 , newManager
25 , closeManager
26 , withManager
27 , isActive
28 , listen
29
30 -- * Queries
31 , QueryFailure (..)
32 , query
33 , query'
34 , queryRaw
35 , getQueryCount
36
37 -- * Handlers
38 , HandlerFailure (..)
39 , Handler
40 , handler
41 ) where
42
43import Control.Applicative
44import Control.Concurrent
45import Control.Concurrent.Lifted (fork)
46import Control.Exception hiding (Handler)
47import qualified Control.Exception.Lifted as E (Handler (..))
48import Control.Exception.Lifted as Lifted (catches, finally)
49import Control.Monad
50import Control.Monad.Logger
51import Control.Monad.Reader
52import Control.Monad.Trans.Control
53import Data.BEncode as BE
54import Data.BEncode.Internal as BE
55import Data.BEncode.Pretty (showBEncode)
56import Data.ByteString as BS
57import Data.ByteString.Char8 as BC
58import Data.ByteString.Lazy as BL
59import Data.Default.Class
60import Data.IORef
61import Data.List as L
62import Data.Map as M
63import Data.Monoid
64import Data.Text as T
65import Data.Text.Encoding as T
66import Data.Tuple
67import Data.Typeable
68import Network.KRPC.Message
69import Network.KRPC.Method
70import Network.Socket hiding (listen)
71import Network.Socket.ByteString as BS
72import System.IO.Error
73import System.Timeout
74
75
76{-----------------------------------------------------------------------
77-- Options
78-----------------------------------------------------------------------}
79
80-- | RPC manager options.
81data Options = Options
82 { -- | Initial 'TransactionId' incremented with each 'query';
83 optSeedTransaction :: {-# UNPACK #-} !Int
84
85 -- | Time to wait for response from remote node, in seconds.
86 , optQueryTimeout :: {-# UNPACK #-} !Int
87
88 -- | Maximum number of bytes to receive.
89 , optMaxMsgSize :: {-# UNPACK #-} !Int
90 } deriving (Show, Eq)
91
92defaultSeedTransaction :: Int
93defaultSeedTransaction = 0
94
95defaultQueryTimeout :: Int
96defaultQueryTimeout = 120
97
98defaultMaxMsgSize :: Int
99defaultMaxMsgSize = 64 * 1024
100
101-- | Permissive defaults.
102instance Default Options where
103 def = Options
104 { optSeedTransaction = defaultSeedTransaction
105 , optQueryTimeout = defaultQueryTimeout
106 , optMaxMsgSize = defaultMaxMsgSize
107 }
108
109validateOptions :: Options -> IO ()
110validateOptions Options {..}
111 | optQueryTimeout < 1
112 = throwIO (userError "krpc: non-positive query timeout")
113 | optMaxMsgSize < 1
114 = throwIO (userError "krpc: non-positive buffer size")
115 | otherwise = return ()
116
117{-----------------------------------------------------------------------
118-- Options
119-----------------------------------------------------------------------}
120
121type KResult = Either KError KResponse
122
123type TransactionCounter = IORef Int
124type CallId = (TransactionId, SockAddr)
125type CallRes = MVar (BValue, KResult)
126type PendingCalls = IORef (Map CallId CallRes)
127
128type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue)
129
130-- | Handler is a function which will be invoked then some /remote/
131-- node querying /this/ node.
132type Handler h = (MethodName, HandlerBody h)
133
134-- | Keep track pending queries made by /this/ node and handle queries
135-- made by /remote/ nodes.
136data Manager h = Manager
137 { sock :: !Socket
138 , options :: !Options
139 , listenerThread :: !(MVar ThreadId)
140 , transactionCounter :: {-# UNPACK #-} !TransactionCounter
141 , pendingCalls :: {-# UNPACK #-} !PendingCalls
142 , handlers :: [Handler h]
143 }
144
145-- | A monad which can perform or handle queries.
146class (MonadBaseControl IO m, MonadLogger m, MonadIO m)
147 => MonadKRPC h m | m -> h where
148
149 -- | Ask for manager.
150 getManager :: m (Manager h)
151
152 default getManager :: MonadReader (Manager h) m => m (Manager h)
153 getManager = ask
154
155 -- | Can be used to add logging for instance.
156 liftHandler :: h a -> m a
157
158 default liftHandler :: m a -> m a
159 liftHandler = id
160
161instance (MonadBaseControl IO h, MonadLogger h, MonadIO h)
162 => MonadKRPC h (ReaderT (Manager h) h) where
163
164 liftHandler = lift
165
166sockAddrFamily :: SockAddr -> Family
167sockAddrFamily (SockAddrInet _ _ ) = AF_INET
168sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6
169sockAddrFamily (SockAddrUnix _ ) = AF_UNIX
170sockAddrFamily (SockAddrCan _ ) = AF_CAN
171
172-- | Bind socket to the specified address. To enable query handling
173-- run 'listen'.
174newManager :: Options -- ^ various protocol options;
175 -> SockAddr -- ^ address to listen on;
176 -> [Handler h] -- ^ handlers to run on incoming queries.
177 -> IO (Manager h) -- ^ new rpc manager.
178newManager opts @ Options {..} servAddr handlers = do
179 validateOptions opts
180 sock <- bindServ
181 tref <- newEmptyMVar
182 tran <- newIORef optSeedTransaction
183 calls <- newIORef M.empty
184 return $ Manager sock opts tref tran calls handlers
185 where
186 bindServ = do
187 let family = sockAddrFamily servAddr
188 sock <- socket family Datagram defaultProtocol
189 when (family == AF_INET6) $ do
190 setSocketOption sock IPv6Only 0
191 bindSocket sock servAddr
192 return sock
193
194-- | Unblock all pending calls and close socket.
195closeManager :: Manager m -> IO ()
196closeManager Manager {..} = do
197 maybe (return ()) killThread =<< tryTakeMVar listenerThread
198 -- TODO unblock calls
199 close sock
200
201-- | Check if the manager is still active. Manager becomes active
202-- until 'closeManager' called.
203isActive :: Manager m -> IO Bool
204isActive Manager {..} = liftIO $ isBound sock
205{-# INLINE isActive #-}
206
207-- | Normally you should use Control.Monad.Trans.Resource.allocate
208-- function.
209withManager :: Options -> SockAddr -> [Handler h]
210 -> (Manager h -> IO a) -> IO a
211withManager opts addr hs = bracket (newManager opts addr hs) closeManager
212
213{-----------------------------------------------------------------------
214-- Logging
215-----------------------------------------------------------------------}
216
217-- TODO prettify log messages
218querySignature :: MethodName -> TransactionId -> SockAddr -> Text
219querySignature name transaction addr = T.concat
220 [ "&", T.decodeUtf8 name
221 , " #", T.decodeUtf8 transaction
222 , " @", T.pack (show addr)
223 ]
224
225{-----------------------------------------------------------------------
226-- Client
227-----------------------------------------------------------------------}
228-- we don't need to know about TransactionId while performing query,
229-- so we introduce QueryFailure exceptions
230
231-- | Used to signal 'query' errors.
232data QueryFailure
233 = SendFailed -- ^ unable to send query;
234 | QueryFailed ErrorCode Text -- ^ remote node return error;
235 | TimeoutExpired -- ^ remote node not responding.
236 deriving (Show, Eq, Typeable)
237
238instance Exception QueryFailure
239
240sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m ()
241sendMessage sock addr a = do
242 liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr
243
244genTransactionId :: TransactionCounter -> IO TransactionId
245genTransactionId ref = do
246 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
247 return $ BC.pack (show cur)
248
249-- | How many times 'query' call have been performed.
250getQueryCount :: MonadKRPC h m => m Int
251getQueryCount = do
252 Manager {..} <- getManager
253 curTrans <- liftIO $ readIORef transactionCounter
254 return $ curTrans - optSeedTransaction options
255
256registerQuery :: CallId -> PendingCalls -> IO CallRes
257registerQuery cid ref = do
258 ares <- newEmptyMVar
259 atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ())
260 return ares
261
262-- simultaneous M.lookup and M.delete guarantees that we never get two
263-- or more responses to the same query
264unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes)
265unregisterQuery cid ref = do
266 atomicModifyIORef' ref $ swap .
267 M.updateLookupWithKey (const (const Nothing)) cid
268
269
270-- (sendmsg EINVAL)
271sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO ()
272sendQuery sock addr q = handle sockError $ sendMessage sock addr q
273 where
274 sockError :: IOError -> IO ()
275 sockError _ = throwIO SendFailed
276
277-- | Enqueue query to the given node.
278--
279-- This function should throw 'QueryFailure' exception if quered node
280-- respond with @error@ message or the query timeout expires.
281--
282query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b
283query addr params = queryK addr params (\_ x _ -> x)
284
285-- | Like 'query' but possibly returns your externally routable IP address.
286query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP)
287query' addr params = queryK addr params (const (,))
288
289-- | Enqueue a query, but give us the complete BEncoded content sent by the
290-- remote Node. This is useful for handling extensions that this library does
291-- not otherwise support.
292queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue)
293queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
294
295queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
296 SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x
297queryK addr params kont = do
298 Manager {..} <- getManager
299 tid <- liftIO $ genTransactionId transactionCounter
300 let queryMethod = method :: Method a b
301 let signature = querySignature (methodName queryMethod) tid addr
302 $(logDebugS) "query.sending" signature
303
304 mres <- liftIO $ do
305 ares <- registerQuery (tid, addr) pendingCalls
306
307 let q = KQuery (toBEncode params) (methodName queryMethod) tid
308 sendQuery sock addr q
309 `onException` unregisterQuery (tid, addr) pendingCalls
310
311 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
312 (raw,res) <- readMVar ares
313 case res of
314 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
315 Right (KResponse {..}) ->
316 case fromBEncode respVals of
317 Right r -> pure $ kont raw r respIP
318 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
319
320 case mres of
321 Just res -> do
322 $(logDebugS) "query.responded" $ signature
323 return res
324
325 Nothing -> do
326 _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls
327 $(logWarnS) "query.not_responding" $ signature <> " for " <>
328 T.pack (show (optQueryTimeout options)) <> " seconds"
329 throw $ TimeoutExpired
330
331{-----------------------------------------------------------------------
332-- Handlers
333-----------------------------------------------------------------------}
334-- we already throw:
335--
336-- * ErrorCode(MethodUnknown) in the 'dispatchHandler';
337--
338-- * ErrorCode(ServerError) in the 'runHandler';
339--
340-- * ErrorCode(GenericError) in the 'runHandler' (those can be
341-- async exception too)
342--
343-- so HandlerFailure should cover *only* 'ProtocolError's.
344
345-- | Used to signal protocol errors.
346data HandlerFailure
347 = BadAddress -- ^ for e.g.: node calls herself;
348 | InvalidParameter Text -- ^ for e.g.: bad session token.
349 deriving (Show, Eq, Typeable)
350
351instance Exception HandlerFailure
352
353prettyHF :: HandlerFailure -> BS.ByteString
354prettyHF BadAddress = T.encodeUtf8 "bad address"
355prettyHF (InvalidParameter reason) = T.encodeUtf8 $
356 "invalid parameter: " <> reason
357
358prettyQF :: QueryFailure -> BS.ByteString
359prettyQF e = T.encodeUtf8 $ "handler fail while performing query: "
360 <> T.pack (show e)
361
362-- | Make handler from handler function. Any thrown exception will be
363-- supressed and send over the wire back to the querying node.
364--
365-- If the handler make some 'query' normally it /should/ handle
366-- corresponding 'QueryFailure's.
367--
368handler :: forall h a b. (KRPC a b, Monad h)
369 => (SockAddr -> a -> h b) -> Handler h
370handler body = (name, wrapper)
371 where
372 Method name = method :: Method a b
373 wrapper addr args =
374 case fromBEncode args of
375 Left e -> return $ Left e
376 Right a -> do
377 r <- body addr a
378 return $ Right $ toBEncode r
379
380runHandler :: MonadKRPC h m
381 => HandlerBody h -> SockAddr -> KQuery -> m KResult
382runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks
383 where
384 signature = querySignature queryMethod queryId addr
385
386 wrapper = do
387 $(logDebugS) "handler.quered" signature
388 result <- liftHandler (h addr queryArgs)
389
390 case result of
391 Left msg -> do
392 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
393 return $ Left $ KError ProtocolError (BC.pack msg) queryId
394
395 Right a -> do
396 $(logDebugS) "handler.success" signature
397 return $ Right $ KResponse a queryId (Just $ ReflectedIP addr)
398
399 failbacks =
400 [ E.Handler $ \ (e :: HandlerFailure) -> do
401 $(logDebugS) "handler.failed" signature
402 return $ Left $ KError ProtocolError (prettyHF e) queryId
403
404 -- may happen if handler makes query and fail
405 , E.Handler $ \ (e :: QueryFailure) -> do
406 return $ Left $ KError ServerError (prettyQF e) queryId
407
408 -- since handler thread exit after sendMessage we can safely
409 -- suppress async exception here
410 , E.Handler $ \ (e :: SomeException) -> do
411 return $ Left $ KError GenericError (BC.pack (show e)) queryId
412 ]
413
414dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
415dispatchHandler q @ KQuery {..} addr = do
416 Manager {..} <- getManager
417 case L.lookup queryMethod handlers of
418 Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId
419 Just h -> runHandler h addr q
420
421{-----------------------------------------------------------------------
422-- Listener
423-----------------------------------------------------------------------}
424
425-- TODO bound amount of parallel handler *threads*:
426--
427-- peer A flooding with find_node
428-- peer B trying to ping peer C
429-- peer B fork too many threads
430-- ... space leak
431--
432handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m ()
433handleQuery raw q addr = void $ fork $ do
434 Manager {..} <- getManager
435 res <- dispatchHandler q addr
436 let resbe = either toBEncode toBEncode res
437 $(logOther "q") $ T.unlines
438 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
439 , "==>"
440 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
441 ]
442 sendMessage sock addr resbe
443
444handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m ()
445handleResponse raw result addr = do
446 Manager {..} <- getManager
447 liftIO $ do
448 let resultId = either errorId respId result
449 mcall <- unregisterQuery (resultId, addr) pendingCalls
450 case mcall of
451 Nothing -> return ()
452 Just ares -> putMVar ares (raw,result)
453
454handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m ()
455handleMessage raw (Q q) = handleQuery raw q
456handleMessage raw (R r) = handleResponse raw (Right r)
457handleMessage raw (E e) = handleResponse raw (Left e)
458
459listener :: MonadKRPC h m => m ()
460listener = do
461 Manager {..} <- getManager
462 forever $ do
463 (bs, addr) <- liftIO $ do
464 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
465
466 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
467 -- TODO ignore unknown messages at all?
468 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e
469 Right (raw,m) -> handleMessage raw m addr
470 where
471 exceptions :: IOError -> IO (BS.ByteString, SockAddr)
472 exceptions e
473 -- packets with empty payload may trigger eof exception
474 | isEOFError e = return ("", SockAddrInet 0 0)
475 | otherwise = throwIO e
476
477-- | Should be run before any 'query', otherwise they will never
478-- succeed.
479listen :: MonadKRPC h m => m ()
480listen = do
481 Manager {..} <- getManager
482 tid <- fork $ do
483 listener `Lifted.finally`
484 liftIO (takeMVar listenerThread)
485 liftIO $ putMVar listenerThread tid