diff options
author | joe <joe@jerkface.net> | 2017-01-18 21:24:38 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-01-18 21:24:38 -0500 |
commit | 1d7dd944e0a13d3f09b65f7629d1f96098ea7974 (patch) | |
tree | 6c02f4d9d6e95f9a2d596c1854d5938daeeeddcc /src/Network | |
parent | 3c9e37d4f349ba2b4395cb10b5a3671decf89d68 (diff) | |
parent | a8498921ddf37e864968a3865e3e254352b5d285 (diff) |
Merge branch 'krpc' into dht-only
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/KRPC.hs | 91 | ||||
-rw-r--r-- | src/Network/KRPC/Manager.hs | 485 | ||||
-rw-r--r-- | src/Network/KRPC/Message.hs | 289 | ||||
-rw-r--r-- | src/Network/KRPC/Method.hs | 87 |
4 files changed, 952 insertions, 0 deletions
diff --git a/src/Network/KRPC.hs b/src/Network/KRPC.hs new file mode 100644 index 00000000..d185fb4c --- /dev/null +++ b/src/Network/KRPC.hs | |||
@@ -0,0 +1,91 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- This module provides safe remote procedure call. One important | ||
9 | -- point is exceptions and errors, so to be able handle them | ||
10 | -- properly we need to investigate a bit about how this all works. | ||
11 | -- Internally, in order to make method invokation KRPC makes the | ||
12 | -- following steps: | ||
13 | -- | ||
14 | -- * Caller serialize arguments to bencoded bytestrings; | ||
15 | -- | ||
16 | -- * Caller send bytestring data over UDP to the callee; | ||
17 | -- | ||
18 | -- * Callee receive and decode arguments to the method and method | ||
19 | -- name. If it can't decode then it send 'ProtocolError' back to the | ||
20 | -- caller; | ||
21 | -- | ||
22 | -- * Callee search for the @method name@ in the method table. | ||
23 | -- If it not present in the table then callee send 'MethodUnknown' | ||
24 | -- back to the caller; | ||
25 | -- | ||
26 | -- * Callee check if argument names match. If not it send | ||
27 | -- 'ProtocolError' back; | ||
28 | -- | ||
29 | -- * Callee make the actuall call to the plain old haskell | ||
30 | -- function. If the function throw exception then callee send | ||
31 | -- 'ServerError' back. | ||
32 | -- | ||
33 | -- * Callee serialize result of the function to bencoded bytestring. | ||
34 | -- | ||
35 | -- * Callee encode result to bencoded bytestring and send it back | ||
36 | -- to the caller. | ||
37 | -- | ||
38 | -- * Caller check if return values names match with the signature | ||
39 | -- it called in the first step. | ||
40 | -- | ||
41 | -- * Caller extracts results and finally return results of the | ||
42 | -- procedure call as ordinary haskell values. | ||
43 | -- | ||
44 | -- If every other error occurred then caller get the | ||
45 | -- 'GenericError'. All errors returned by callee are throwed as | ||
46 | -- ordinary haskell exceptions at caller side. Also note that both | ||
47 | -- caller and callee use plain UDP, so KRPC is unreliable. | ||
48 | -- | ||
49 | -- For async 'query' use @async@ package. | ||
50 | -- | ||
51 | -- For protocol details see "Network.KRPC.Message" module. | ||
52 | -- | ||
53 | module Network.KRPC | ||
54 | ( -- * Methods | ||
55 | Method | ||
56 | , KRPC (..) | ||
57 | |||
58 | -- * RPC | ||
59 | -- ** Query | ||
60 | , QueryFailure (..) | ||
61 | , query | ||
62 | , query' | ||
63 | , queryRaw | ||
64 | , getQueryCount | ||
65 | |||
66 | -- ** Handler | ||
67 | , HandlerFailure (..) | ||
68 | , Handler | ||
69 | , handler | ||
70 | |||
71 | -- * Manager | ||
72 | , MonadKRPC (..) | ||
73 | , Options (..) | ||
74 | , def | ||
75 | , Manager | ||
76 | , newManager | ||
77 | , closeManager | ||
78 | , withManager | ||
79 | , isActive | ||
80 | , listen | ||
81 | |||
82 | -- * Re-exports | ||
83 | , ErrorCode (..) | ||
84 | , SockAddr (..) | ||
85 | ) where | ||
86 | |||
87 | import Data.Default.Class | ||
88 | import Network.KRPC.Message | ||
89 | import Network.KRPC.Method | ||
90 | import Network.KRPC.Manager | ||
91 | import Network.Socket (SockAddr (..)) | ||
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs new file mode 100644 index 00000000..c90c92f9 --- /dev/null +++ b/src/Network/KRPC/Manager.hs | |||
@@ -0,0 +1,485 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally, you don't need to import this module. | ||
9 | -- | ||
10 | {-# LANGUAGE OverloadedStrings #-} | ||
11 | {-# LANGUAGE FlexibleInstances #-} | ||
12 | {-# LANGUAGE FlexibleContexts #-} | ||
13 | {-# LANGUAGE ScopedTypeVariables #-} | ||
14 | {-# LANGUAGE DefaultSignatures #-} | ||
15 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
16 | {-# LANGUAGE FunctionalDependencies #-} | ||
17 | {-# LANGUAGE DeriveDataTypeable #-} | ||
18 | {-# LANGUAGE TemplateHaskell #-} | ||
19 | module Network.KRPC.Manager | ||
20 | ( -- * Manager | ||
21 | MonadKRPC (..) | ||
22 | , Options (..) | ||
23 | , Manager | ||
24 | , newManager | ||
25 | , closeManager | ||
26 | , withManager | ||
27 | , isActive | ||
28 | , listen | ||
29 | |||
30 | -- * Queries | ||
31 | , QueryFailure (..) | ||
32 | , query | ||
33 | , query' | ||
34 | , queryRaw | ||
35 | , getQueryCount | ||
36 | |||
37 | -- * Handlers | ||
38 | , HandlerFailure (..) | ||
39 | , Handler | ||
40 | , handler | ||
41 | ) where | ||
42 | |||
43 | import Control.Applicative | ||
44 | import Control.Concurrent | ||
45 | import Control.Concurrent.Lifted (fork) | ||
46 | import Control.Exception hiding (Handler) | ||
47 | import qualified Control.Exception.Lifted as E (Handler (..)) | ||
48 | import Control.Exception.Lifted as Lifted (catches, finally) | ||
49 | import Control.Monad | ||
50 | import Control.Monad.Logger | ||
51 | import Control.Monad.Reader | ||
52 | import Control.Monad.Trans.Control | ||
53 | import Data.BEncode as BE | ||
54 | import Data.BEncode.Internal as BE | ||
55 | import Data.BEncode.Pretty (showBEncode) | ||
56 | import Data.ByteString as BS | ||
57 | import Data.ByteString.Char8 as BC | ||
58 | import Data.ByteString.Lazy as BL | ||
59 | import Data.Default.Class | ||
60 | import Data.IORef | ||
61 | import Data.List as L | ||
62 | import Data.Map as M | ||
63 | import Data.Monoid | ||
64 | import Data.Text as T | ||
65 | import Data.Text.Encoding as T | ||
66 | import Data.Tuple | ||
67 | import Data.Typeable | ||
68 | import Network.KRPC.Message | ||
69 | import Network.KRPC.Method | ||
70 | import Network.Socket hiding (listen) | ||
71 | import Network.Socket.ByteString as BS | ||
72 | import System.IO.Error | ||
73 | import System.Timeout | ||
74 | |||
75 | |||
76 | {----------------------------------------------------------------------- | ||
77 | -- Options | ||
78 | -----------------------------------------------------------------------} | ||
79 | |||
80 | -- | RPC manager options. | ||
81 | data Options = Options | ||
82 | { -- | Initial 'TransactionId' incremented with each 'query'; | ||
83 | optSeedTransaction :: {-# UNPACK #-} !Int | ||
84 | |||
85 | -- | Time to wait for response from remote node, in seconds. | ||
86 | , optQueryTimeout :: {-# UNPACK #-} !Int | ||
87 | |||
88 | -- | Maximum number of bytes to receive. | ||
89 | , optMaxMsgSize :: {-# UNPACK #-} !Int | ||
90 | } deriving (Show, Eq) | ||
91 | |||
92 | defaultSeedTransaction :: Int | ||
93 | defaultSeedTransaction = 0 | ||
94 | |||
95 | defaultQueryTimeout :: Int | ||
96 | defaultQueryTimeout = 120 | ||
97 | |||
98 | defaultMaxMsgSize :: Int | ||
99 | defaultMaxMsgSize = 64 * 1024 | ||
100 | |||
101 | -- | Permissive defaults. | ||
102 | instance Default Options where | ||
103 | def = Options | ||
104 | { optSeedTransaction = defaultSeedTransaction | ||
105 | , optQueryTimeout = defaultQueryTimeout | ||
106 | , optMaxMsgSize = defaultMaxMsgSize | ||
107 | } | ||
108 | |||
109 | validateOptions :: Options -> IO () | ||
110 | validateOptions Options {..} | ||
111 | | optQueryTimeout < 1 | ||
112 | = throwIO (userError "krpc: non-positive query timeout") | ||
113 | | optMaxMsgSize < 1 | ||
114 | = throwIO (userError "krpc: non-positive buffer size") | ||
115 | | otherwise = return () | ||
116 | |||
117 | {----------------------------------------------------------------------- | ||
118 | -- Options | ||
119 | -----------------------------------------------------------------------} | ||
120 | |||
121 | type KResult = Either KError KResponse | ||
122 | |||
123 | type TransactionCounter = IORef Int | ||
124 | type CallId = (TransactionId, SockAddr) | ||
125 | type CallRes = MVar (BValue, KResult) | ||
126 | type PendingCalls = IORef (Map CallId CallRes) | ||
127 | |||
128 | type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) | ||
129 | |||
130 | -- | Handler is a function which will be invoked then some /remote/ | ||
131 | -- node querying /this/ node. | ||
132 | type Handler h = (MethodName, HandlerBody h) | ||
133 | |||
134 | -- | Keep track pending queries made by /this/ node and handle queries | ||
135 | -- made by /remote/ nodes. | ||
136 | data Manager h = Manager | ||
137 | { sock :: !Socket | ||
138 | , options :: !Options | ||
139 | , listenerThread :: !(MVar ThreadId) | ||
140 | , transactionCounter :: {-# UNPACK #-} !TransactionCounter | ||
141 | , pendingCalls :: {-# UNPACK #-} !PendingCalls | ||
142 | , handlers :: [Handler h] | ||
143 | } | ||
144 | |||
145 | -- | A monad which can perform or handle queries. | ||
146 | class (MonadBaseControl IO m, MonadLogger m, MonadIO m) | ||
147 | => MonadKRPC h m | m -> h where | ||
148 | |||
149 | -- | Ask for manager. | ||
150 | getManager :: m (Manager h) | ||
151 | |||
152 | default getManager :: MonadReader (Manager h) m => m (Manager h) | ||
153 | getManager = ask | ||
154 | |||
155 | -- | Can be used to add logging for instance. | ||
156 | liftHandler :: h a -> m a | ||
157 | |||
158 | default liftHandler :: m a -> m a | ||
159 | liftHandler = id | ||
160 | |||
161 | instance (MonadBaseControl IO h, MonadLogger h, MonadIO h) | ||
162 | => MonadKRPC h (ReaderT (Manager h) h) where | ||
163 | |||
164 | liftHandler = lift | ||
165 | |||
166 | sockAddrFamily :: SockAddr -> Family | ||
167 | sockAddrFamily (SockAddrInet _ _ ) = AF_INET | ||
168 | sockAddrFamily (SockAddrInet6 _ _ _ _) = AF_INET6 | ||
169 | sockAddrFamily (SockAddrUnix _ ) = AF_UNIX | ||
170 | sockAddrFamily (SockAddrCan _ ) = AF_CAN | ||
171 | |||
172 | -- | Bind socket to the specified address. To enable query handling | ||
173 | -- run 'listen'. | ||
174 | newManager :: Options -- ^ various protocol options; | ||
175 | -> SockAddr -- ^ address to listen on; | ||
176 | -> [Handler h] -- ^ handlers to run on incoming queries. | ||
177 | -> IO (Manager h) -- ^ new rpc manager. | ||
178 | newManager opts @ Options {..} servAddr handlers = do | ||
179 | validateOptions opts | ||
180 | sock <- bindServ | ||
181 | tref <- newEmptyMVar | ||
182 | tran <- newIORef optSeedTransaction | ||
183 | calls <- newIORef M.empty | ||
184 | return $ Manager sock opts tref tran calls handlers | ||
185 | where | ||
186 | bindServ = do | ||
187 | let family = sockAddrFamily servAddr | ||
188 | sock <- socket family Datagram defaultProtocol | ||
189 | when (family == AF_INET6) $ do | ||
190 | setSocketOption sock IPv6Only 0 | ||
191 | bindSocket sock servAddr | ||
192 | return sock | ||
193 | |||
194 | -- | Unblock all pending calls and close socket. | ||
195 | closeManager :: Manager m -> IO () | ||
196 | closeManager Manager {..} = do | ||
197 | maybe (return ()) killThread =<< tryTakeMVar listenerThread | ||
198 | -- TODO unblock calls | ||
199 | close sock | ||
200 | |||
201 | -- | Check if the manager is still active. Manager becomes active | ||
202 | -- until 'closeManager' called. | ||
203 | isActive :: Manager m -> IO Bool | ||
204 | isActive Manager {..} = liftIO $ isBound sock | ||
205 | {-# INLINE isActive #-} | ||
206 | |||
207 | -- | Normally you should use Control.Monad.Trans.Resource.allocate | ||
208 | -- function. | ||
209 | withManager :: Options -> SockAddr -> [Handler h] | ||
210 | -> (Manager h -> IO a) -> IO a | ||
211 | withManager opts addr hs = bracket (newManager opts addr hs) closeManager | ||
212 | |||
213 | {----------------------------------------------------------------------- | ||
214 | -- Logging | ||
215 | -----------------------------------------------------------------------} | ||
216 | |||
217 | -- TODO prettify log messages | ||
218 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | ||
219 | querySignature name transaction addr = T.concat | ||
220 | [ "&", T.decodeUtf8 name | ||
221 | , " #", T.decodeUtf8 transaction | ||
222 | , " @", T.pack (show addr) | ||
223 | ] | ||
224 | |||
225 | {----------------------------------------------------------------------- | ||
226 | -- Client | ||
227 | -----------------------------------------------------------------------} | ||
228 | -- we don't need to know about TransactionId while performing query, | ||
229 | -- so we introduce QueryFailure exceptions | ||
230 | |||
231 | -- | Used to signal 'query' errors. | ||
232 | data QueryFailure | ||
233 | = SendFailed -- ^ unable to send query; | ||
234 | | QueryFailed ErrorCode Text -- ^ remote node return error; | ||
235 | | TimeoutExpired -- ^ remote node not responding. | ||
236 | deriving (Show, Eq, Typeable) | ||
237 | |||
238 | instance Exception QueryFailure | ||
239 | |||
240 | sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () | ||
241 | sendMessage sock addr a = do | ||
242 | liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr | ||
243 | |||
244 | genTransactionId :: TransactionCounter -> IO TransactionId | ||
245 | genTransactionId ref = do | ||
246 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | ||
247 | return $ BC.pack (show cur) | ||
248 | |||
249 | -- | How many times 'query' call have been performed. | ||
250 | getQueryCount :: MonadKRPC h m => m Int | ||
251 | getQueryCount = do | ||
252 | Manager {..} <- getManager | ||
253 | curTrans <- liftIO $ readIORef transactionCounter | ||
254 | return $ curTrans - optSeedTransaction options | ||
255 | |||
256 | registerQuery :: CallId -> PendingCalls -> IO CallRes | ||
257 | registerQuery cid ref = do | ||
258 | ares <- newEmptyMVar | ||
259 | atomicModifyIORef' ref $ \ m -> (M.insert cid ares m, ()) | ||
260 | return ares | ||
261 | |||
262 | -- simultaneous M.lookup and M.delete guarantees that we never get two | ||
263 | -- or more responses to the same query | ||
264 | unregisterQuery :: CallId -> PendingCalls -> IO (Maybe CallRes) | ||
265 | unregisterQuery cid ref = do | ||
266 | atomicModifyIORef' ref $ swap . | ||
267 | M.updateLookupWithKey (const (const Nothing)) cid | ||
268 | |||
269 | |||
270 | -- (sendmsg EINVAL) | ||
271 | sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () | ||
272 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | ||
273 | where | ||
274 | sockError :: IOError -> IO () | ||
275 | sockError _ = throwIO SendFailed | ||
276 | |||
277 | -- | Enqueue query to the given node. | ||
278 | -- | ||
279 | -- This function should throw 'QueryFailure' exception if quered node | ||
280 | -- respond with @error@ message or the query timeout expires. | ||
281 | -- | ||
282 | query :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m b | ||
283 | query addr params = queryK addr params (\_ x _ -> x) | ||
284 | |||
285 | -- | Like 'query' but possibly returns your externally routable IP address. | ||
286 | query' :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, Maybe ReflectedIP) | ||
287 | query' addr params = queryK addr params (const (,)) | ||
288 | |||
289 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | ||
290 | -- remote Node. This is useful for handling extensions that this library does | ||
291 | -- not otherwise support. | ||
292 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) | ||
293 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | ||
294 | |||
295 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | ||
296 | SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x | ||
297 | queryK addr params kont = do | ||
298 | Manager {..} <- getManager | ||
299 | tid <- liftIO $ genTransactionId transactionCounter | ||
300 | let queryMethod = method :: Method a b | ||
301 | let signature = querySignature (methodName queryMethod) tid addr | ||
302 | $(logDebugS) "query.sending" signature | ||
303 | |||
304 | mres <- liftIO $ do | ||
305 | ares <- registerQuery (tid, addr) pendingCalls | ||
306 | |||
307 | let q = KQuery (toBEncode params) (methodName queryMethod) tid | ||
308 | sendQuery sock addr q | ||
309 | `onException` unregisterQuery (tid, addr) pendingCalls | ||
310 | |||
311 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | ||
312 | (raw,res) <- readMVar ares | ||
313 | case res of | ||
314 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | ||
315 | Right (KResponse {..}) -> | ||
316 | case fromBEncode respVals of | ||
317 | Right r -> pure $ kont raw r respIP | ||
318 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | ||
319 | |||
320 | case mres of | ||
321 | Just res -> do | ||
322 | $(logDebugS) "query.responded" $ signature | ||
323 | return res | ||
324 | |||
325 | Nothing -> do | ||
326 | _ <- liftIO $ unregisterQuery (tid, addr) pendingCalls | ||
327 | $(logWarnS) "query.not_responding" $ signature <> " for " <> | ||
328 | T.pack (show (optQueryTimeout options)) <> " seconds" | ||
329 | throw $ TimeoutExpired | ||
330 | |||
331 | {----------------------------------------------------------------------- | ||
332 | -- Handlers | ||
333 | -----------------------------------------------------------------------} | ||
334 | -- we already throw: | ||
335 | -- | ||
336 | -- * ErrorCode(MethodUnknown) in the 'dispatchHandler'; | ||
337 | -- | ||
338 | -- * ErrorCode(ServerError) in the 'runHandler'; | ||
339 | -- | ||
340 | -- * ErrorCode(GenericError) in the 'runHandler' (those can be | ||
341 | -- async exception too) | ||
342 | -- | ||
343 | -- so HandlerFailure should cover *only* 'ProtocolError's. | ||
344 | |||
345 | -- | Used to signal protocol errors. | ||
346 | data HandlerFailure | ||
347 | = BadAddress -- ^ for e.g.: node calls herself; | ||
348 | | InvalidParameter Text -- ^ for e.g.: bad session token. | ||
349 | deriving (Show, Eq, Typeable) | ||
350 | |||
351 | instance Exception HandlerFailure | ||
352 | |||
353 | prettyHF :: HandlerFailure -> BS.ByteString | ||
354 | prettyHF BadAddress = T.encodeUtf8 "bad address" | ||
355 | prettyHF (InvalidParameter reason) = T.encodeUtf8 $ | ||
356 | "invalid parameter: " <> reason | ||
357 | |||
358 | prettyQF :: QueryFailure -> BS.ByteString | ||
359 | prettyQF e = T.encodeUtf8 $ "handler fail while performing query: " | ||
360 | <> T.pack (show e) | ||
361 | |||
362 | -- | Make handler from handler function. Any thrown exception will be | ||
363 | -- supressed and send over the wire back to the querying node. | ||
364 | -- | ||
365 | -- If the handler make some 'query' normally it /should/ handle | ||
366 | -- corresponding 'QueryFailure's. | ||
367 | -- | ||
368 | handler :: forall h a b. (KRPC a b, Monad h) | ||
369 | => (SockAddr -> a -> h b) -> Handler h | ||
370 | handler body = (name, wrapper) | ||
371 | where | ||
372 | Method name = method :: Method a b | ||
373 | wrapper addr args = | ||
374 | case fromBEncode args of | ||
375 | Left e -> return $ Left e | ||
376 | Right a -> do | ||
377 | r <- body addr a | ||
378 | return $ Right $ toBEncode r | ||
379 | |||
380 | runHandler :: MonadKRPC h m | ||
381 | => HandlerBody h -> SockAddr -> KQuery -> m KResult | ||
382 | runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks | ||
383 | where | ||
384 | signature = querySignature queryMethod queryId addr | ||
385 | |||
386 | wrapper = do | ||
387 | $(logDebugS) "handler.quered" signature | ||
388 | result <- liftHandler (h addr queryArgs) | ||
389 | |||
390 | case result of | ||
391 | Left msg -> do | ||
392 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | ||
393 | return $ Left $ KError ProtocolError (BC.pack msg) queryId | ||
394 | |||
395 | Right a -> do | ||
396 | $(logDebugS) "handler.success" signature | ||
397 | return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) | ||
398 | |||
399 | failbacks = | ||
400 | [ E.Handler $ \ (e :: HandlerFailure) -> do | ||
401 | $(logDebugS) "handler.failed" signature | ||
402 | return $ Left $ KError ProtocolError (prettyHF e) queryId | ||
403 | |||
404 | -- may happen if handler makes query and fail | ||
405 | , E.Handler $ \ (e :: QueryFailure) -> do | ||
406 | return $ Left $ KError ServerError (prettyQF e) queryId | ||
407 | |||
408 | -- since handler thread exit after sendMessage we can safely | ||
409 | -- suppress async exception here | ||
410 | , E.Handler $ \ (e :: SomeException) -> do | ||
411 | return $ Left $ KError GenericError (BC.pack (show e)) queryId | ||
412 | ] | ||
413 | |||
414 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | ||
415 | dispatchHandler q @ KQuery {..} addr = do | ||
416 | Manager {..} <- getManager | ||
417 | case L.lookup queryMethod handlers of | ||
418 | Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId | ||
419 | Just h -> runHandler h addr q | ||
420 | |||
421 | {----------------------------------------------------------------------- | ||
422 | -- Listener | ||
423 | -----------------------------------------------------------------------} | ||
424 | |||
425 | -- TODO bound amount of parallel handler *threads*: | ||
426 | -- | ||
427 | -- peer A flooding with find_node | ||
428 | -- peer B trying to ping peer C | ||
429 | -- peer B fork too many threads | ||
430 | -- ... space leak | ||
431 | -- | ||
432 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () | ||
433 | handleQuery raw q addr = void $ fork $ do | ||
434 | Manager {..} <- getManager | ||
435 | res <- dispatchHandler q addr | ||
436 | let resbe = either toBEncode toBEncode res | ||
437 | $(logOther "q") $ T.unlines | ||
438 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | ||
439 | , "==>" | ||
440 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | ||
441 | ] | ||
442 | sendMessage sock addr resbe | ||
443 | |||
444 | handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () | ||
445 | handleResponse raw result addr = do | ||
446 | Manager {..} <- getManager | ||
447 | liftIO $ do | ||
448 | let resultId = either errorId respId result | ||
449 | mcall <- unregisterQuery (resultId, addr) pendingCalls | ||
450 | case mcall of | ||
451 | Nothing -> return () | ||
452 | Just ares -> putMVar ares (raw,result) | ||
453 | |||
454 | handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () | ||
455 | handleMessage raw (Q q) = handleQuery raw q | ||
456 | handleMessage raw (R r) = handleResponse raw (Right r) | ||
457 | handleMessage raw (E e) = handleResponse raw (Left e) | ||
458 | |||
459 | listener :: MonadKRPC h m => m () | ||
460 | listener = do | ||
461 | Manager {..} <- getManager | ||
462 | forever $ do | ||
463 | (bs, addr) <- liftIO $ do | ||
464 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | ||
465 | |||
466 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | ||
467 | -- TODO ignore unknown messages at all? | ||
468 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e | ||
469 | Right (raw,m) -> handleMessage raw m addr | ||
470 | where | ||
471 | exceptions :: IOError -> IO (BS.ByteString, SockAddr) | ||
472 | exceptions e | ||
473 | -- packets with empty payload may trigger eof exception | ||
474 | | isEOFError e = return ("", SockAddrInet 0 0) | ||
475 | | otherwise = throwIO e | ||
476 | |||
477 | -- | Should be run before any 'query', otherwise they will never | ||
478 | -- succeed. | ||
479 | listen :: MonadKRPC h m => m () | ||
480 | listen = do | ||
481 | Manager {..} <- getManager | ||
482 | tid <- fork $ do | ||
483 | listener `Lifted.finally` | ||
484 | liftIO (takeMVar listenerThread) | ||
485 | liftIO $ putMVar listenerThread tid | ||
diff --git a/src/Network/KRPC/Message.hs b/src/Network/KRPC/Message.hs new file mode 100644 index 00000000..6f4ae620 --- /dev/null +++ b/src/Network/KRPC/Message.hs | |||
@@ -0,0 +1,289 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- KRPC messages types used in communication. All messages are | ||
9 | -- encoded as bencode dictionary. | ||
10 | -- | ||
11 | -- Normally, you don't need to import this module. | ||
12 | -- | ||
13 | -- See <http://www.bittorrent.org/beps/bep_0005.html#krpc-protocol> | ||
14 | -- | ||
15 | {-# LANGUAGE OverloadedStrings #-} | ||
16 | {-# LANGUAGE FlexibleContexts #-} | ||
17 | {-# LANGUAGE TypeSynonymInstances #-} | ||
18 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
19 | {-# LANGUAGE FunctionalDependencies #-} | ||
20 | {-# LANGUAGE DefaultSignatures #-} | ||
21 | {-# LANGUAGE DeriveDataTypeable #-} | ||
22 | module Network.KRPC.Message | ||
23 | ( -- * Transaction | ||
24 | TransactionId | ||
25 | |||
26 | -- * Error | ||
27 | , ErrorCode (..) | ||
28 | , KError(..) | ||
29 | , decodeError | ||
30 | , unknownMessage | ||
31 | |||
32 | -- * Query | ||
33 | , KQuery(..) | ||
34 | , MethodName | ||
35 | |||
36 | -- * Response | ||
37 | , KResponse(..) | ||
38 | , ReflectedIP(..) | ||
39 | |||
40 | -- * Message | ||
41 | , KMessage (..) | ||
42 | ) where | ||
43 | |||
44 | import Control.Applicative | ||
45 | import Control.Arrow | ||
46 | import Control.Exception.Lifted as Lifted | ||
47 | import Data.BEncode as BE | ||
48 | import Data.ByteString as B | ||
49 | import Data.ByteString.Char8 as BC | ||
50 | import qualified Data.Serialize as S | ||
51 | import Data.Word | ||
52 | import Data.Typeable | ||
53 | import Network.Socket (SockAddr (..),PortNumber,HostAddress) | ||
54 | |||
55 | |||
56 | -- | This transaction ID is generated by the querying node and is | ||
57 | -- echoed in the response, so responses may be correlated with | ||
58 | -- multiple queries to the same node. The transaction ID should be | ||
59 | -- encoded as a short string of binary numbers, typically 2 characters | ||
60 | -- are enough as they cover 2^16 outstanding queries. | ||
61 | type TransactionId = ByteString | ||
62 | |||
63 | unknownTransaction :: TransactionId | ||
64 | unknownTransaction = "" | ||
65 | |||
66 | {----------------------------------------------------------------------- | ||
67 | -- Error messages | ||
68 | -----------------------------------------------------------------------} | ||
69 | |||
70 | -- | Types of RPC errors. | ||
71 | data ErrorCode | ||
72 | -- | Some error doesn't fit in any other category. | ||
73 | = GenericError | ||
74 | |||
75 | -- | Occur when server fail to process procedure call. | ||
76 | | ServerError | ||
77 | |||
78 | -- | Malformed packet, invalid arguments or bad token. | ||
79 | | ProtocolError | ||
80 | |||
81 | -- | Occur when client trying to call method server don't know. | ||
82 | | MethodUnknown | ||
83 | deriving (Show, Read, Eq, Ord, Bounded, Typeable) | ||
84 | |||
85 | -- | According to the table: | ||
86 | -- <http://bittorrent.org/beps/bep_0005.html#errors> | ||
87 | instance Enum ErrorCode where | ||
88 | fromEnum GenericError = 201 | ||
89 | fromEnum ServerError = 202 | ||
90 | fromEnum ProtocolError = 203 | ||
91 | fromEnum MethodUnknown = 204 | ||
92 | {-# INLINE fromEnum #-} | ||
93 | |||
94 | toEnum 201 = GenericError | ||
95 | toEnum 202 = ServerError | ||
96 | toEnum 203 = ProtocolError | ||
97 | toEnum 204 = MethodUnknown | ||
98 | toEnum _ = GenericError | ||
99 | {-# INLINE toEnum #-} | ||
100 | |||
101 | instance BEncode ErrorCode where | ||
102 | toBEncode = toBEncode . fromEnum | ||
103 | {-# INLINE toBEncode #-} | ||
104 | |||
105 | fromBEncode b = toEnum <$> fromBEncode b | ||
106 | {-# INLINE fromBEncode #-} | ||
107 | |||
108 | -- | Errors are sent when a query cannot be fulfilled. Error message | ||
109 | -- can be send only from server to client but not in the opposite | ||
110 | -- direction. | ||
111 | -- | ||
112 | data KError = KError | ||
113 | { errorCode :: !ErrorCode -- ^ the type of error; | ||
114 | , errorMessage :: !ByteString -- ^ human-readable text message; | ||
115 | , errorId :: !TransactionId -- ^ match to the corresponding 'queryId'. | ||
116 | } deriving (Show, Read, Eq, Ord, Typeable) | ||
117 | |||
118 | -- | Errors, or KRPC message dictionaries with a \"y\" value of \"e\", | ||
119 | -- contain one additional key \"e\". The value of \"e\" is a | ||
120 | -- list. The first element is an integer representing the error | ||
121 | -- code. The second element is a string containing the error | ||
122 | -- message. | ||
123 | -- | ||
124 | -- Example Error Packet: | ||
125 | -- | ||
126 | -- > { "t": "aa", "y":"e", "e":[201, "A Generic Error Ocurred"]} | ||
127 | -- | ||
128 | -- or bencoded: | ||
129 | -- | ||
130 | -- > d1:eli201e23:A Generic Error Ocurrede1:t2:aa1:y1:ee | ||
131 | -- | ||
132 | instance BEncode KError where | ||
133 | toBEncode KError {..} = toDict $ | ||
134 | "e" .=! (errorCode, errorMessage) | ||
135 | .: "t" .=! errorId | ||
136 | .: "y" .=! ("e" :: ByteString) | ||
137 | .: endDict | ||
138 | {-# INLINE toBEncode #-} | ||
139 | |||
140 | fromBEncode = fromDict $ do | ||
141 | lookAhead $ match "y" (BString "e") | ||
142 | (code, msg) <- field (req "e") | ||
143 | KError code msg <$>! "t" | ||
144 | {-# INLINE fromBEncode #-} | ||
145 | |||
146 | instance Exception KError | ||
147 | |||
148 | -- | Received 'queryArgs' or 'respVals' can not be decoded. | ||
149 | decodeError :: String -> TransactionId -> KError | ||
150 | decodeError msg = KError ProtocolError (BC.pack msg) | ||
151 | |||
152 | -- | A remote node has send some 'KMessage' this node is unable to | ||
153 | -- decode. | ||
154 | unknownMessage :: String -> KError | ||
155 | unknownMessage msg = KError ProtocolError (BC.pack msg) unknownTransaction | ||
156 | |||
157 | {----------------------------------------------------------------------- | ||
158 | -- Query messages | ||
159 | -----------------------------------------------------------------------} | ||
160 | |||
161 | type MethodName = ByteString | ||
162 | |||
163 | -- | Query used to signal that caller want to make procedure call to | ||
164 | -- callee and pass arguments in. Therefore query may be only sent from | ||
165 | -- client to server but not in the opposite direction. | ||
166 | -- | ||
167 | data KQuery = KQuery | ||
168 | { queryArgs :: !BValue -- ^ values to be passed to method; | ||
169 | , queryMethod :: !MethodName -- ^ method to call; | ||
170 | , queryId :: !TransactionId -- ^ one-time query token. | ||
171 | } deriving (Show, Read, Eq, Ord, Typeable) | ||
172 | |||
173 | -- | Queries, or KRPC message dictionaries with a \"y\" value of | ||
174 | -- \"q\", contain two additional keys; \"q\" and \"a\". Key \"q\" has | ||
175 | -- a string value containing the method name of the query. Key \"a\" | ||
176 | -- has a dictionary value containing named arguments to the query. | ||
177 | -- | ||
178 | -- Example Query packet: | ||
179 | -- | ||
180 | -- > { "t" : "aa", "y" : "q", "q" : "ping", "a" : { "msg" : "hi!" } } | ||
181 | -- | ||
182 | instance BEncode KQuery where | ||
183 | toBEncode KQuery {..} = toDict $ | ||
184 | "a" .=! queryArgs | ||
185 | .: "q" .=! queryMethod | ||
186 | .: "t" .=! queryId | ||
187 | .: "y" .=! ("q" :: ByteString) | ||
188 | .: endDict | ||
189 | {-# INLINE toBEncode #-} | ||
190 | |||
191 | fromBEncode = fromDict $ do | ||
192 | lookAhead $ match "y" (BString "q") | ||
193 | KQuery <$>! "a" <*>! "q" <*>! "t" | ||
194 | {-# INLINE fromBEncode #-} | ||
195 | |||
196 | newtype ReflectedIP = ReflectedIP SockAddr | ||
197 | deriving (Eq, Ord, Show) | ||
198 | |||
199 | instance BEncode ReflectedIP where | ||
200 | toBEncode (ReflectedIP addr) = BString (encodeAddr addr) | ||
201 | fromBEncode (BString bs) = ReflectedIP <$> decodeAddr bs | ||
202 | fromBEncode _ = Left "ReflectedIP should be a bencoded string" | ||
203 | |||
204 | port16 :: Word16 -> PortNumber | ||
205 | port16 = fromIntegral | ||
206 | |||
207 | decodeAddr :: ByteString -> Either String SockAddr | ||
208 | decodeAddr bs | B.length bs == 6 | ||
209 | = ( \(a,p) -> SockAddrInet <$> fmap port16 p <*> a ) | ||
210 | $ (S.runGet S.getWord32host *** S.decode ) | ||
211 | $ B.splitAt 4 bs | ||
212 | decodeAddr bs | B.length bs == 18 | ||
213 | = ( \(a,p) -> flip SockAddrInet6 0 <$> fmap port16 p <*> a <*> pure 0 ) | ||
214 | $ (S.decode *** S.decode ) | ||
215 | $ B.splitAt 16 bs | ||
216 | decodeAddr _ = Left "incorrectly sized address and port" | ||
217 | |||
218 | encodeAddr :: SockAddr -> ByteString | ||
219 | encodeAddr (SockAddrInet port addr) | ||
220 | = S.runPut (S.putWord32host addr >> S.put (fromIntegral port :: Word16)) | ||
221 | encodeAddr (SockAddrInet6 port _ addr _) | ||
222 | = S.runPut (S.put addr >> S.put (fromIntegral port :: Word16)) | ||
223 | encodeAddr _ = B.empty | ||
224 | |||
225 | {----------------------------------------------------------------------- | ||
226 | -- Response messages | ||
227 | -----------------------------------------------------------------------} | ||
228 | |||
229 | -- | Response messages are sent upon successful completion of a | ||
230 | -- query: | ||
231 | -- | ||
232 | -- * KResponse used to signal that callee successufully process a | ||
233 | -- procedure call and to return values from procedure. | ||
234 | -- | ||
235 | -- * KResponse should not be sent if error occurred during RPC, | ||
236 | -- 'KError' should be sent instead. | ||
237 | -- | ||
238 | -- * KResponse can be only sent from server to client. | ||
239 | -- | ||
240 | data KResponse = KResponse | ||
241 | { respVals :: BValue -- ^ 'BDict' containing return values; | ||
242 | , respId :: TransactionId -- ^ match to the corresponding 'queryId'. | ||
243 | , respIP :: Maybe ReflectedIP | ||
244 | } deriving (Show, Eq, Ord, Typeable) | ||
245 | |||
246 | -- | Responses, or KRPC message dictionaries with a \"y\" value of | ||
247 | -- \"r\", contain one additional key \"r\". The value of \"r\" is a | ||
248 | -- dictionary containing named return values. | ||
249 | -- | ||
250 | -- Example Response packet: | ||
251 | -- | ||
252 | -- > { "t" : "aa", "y" : "r", "r" : { "msg" : "you've sent: hi!" } } | ||
253 | -- | ||
254 | instance BEncode KResponse where | ||
255 | toBEncode KResponse {..} = toDict $ | ||
256 | "ip" .=? respIP | ||
257 | .: "r" .=! respVals | ||
258 | .: "t" .=! respId | ||
259 | .: "y" .=! ("r" :: ByteString) | ||
260 | .: endDict | ||
261 | {-# INLINE toBEncode #-} | ||
262 | |||
263 | fromBEncode = fromDict $ do | ||
264 | lookAhead $ match "y" (BString "r") | ||
265 | addr <- optional (field (req "ip")) | ||
266 | (\r t -> KResponse r t addr) <$>! "r" <*>! "t" | ||
267 | {-# INLINE fromBEncode #-} | ||
268 | |||
269 | {----------------------------------------------------------------------- | ||
270 | -- Summed messages | ||
271 | -----------------------------------------------------------------------} | ||
272 | |||
273 | -- | Generic KRPC message. | ||
274 | data KMessage | ||
275 | = Q KQuery | ||
276 | | R KResponse | ||
277 | | E KError | ||
278 | deriving (Show, Eq) | ||
279 | |||
280 | instance BEncode KMessage where | ||
281 | toBEncode (Q q) = toBEncode q | ||
282 | toBEncode (R r) = toBEncode r | ||
283 | toBEncode (E e) = toBEncode e | ||
284 | |||
285 | fromBEncode b = | ||
286 | Q <$> fromBEncode b | ||
287 | <|> R <$> fromBEncode b | ||
288 | <|> E <$> fromBEncode b | ||
289 | <|> decodingError "KMessage: unknown message or message tag" | ||
diff --git a/src/Network/KRPC/Method.hs b/src/Network/KRPC/Method.hs new file mode 100644 index 00000000..916b38a8 --- /dev/null +++ b/src/Network/KRPC/Method.hs | |||
@@ -0,0 +1,87 @@ | |||
1 | -- | | ||
2 | -- Copyright : (c) Sam Truzjan 2013, 2014 | ||
3 | -- License : BSD3 | ||
4 | -- Maintainer : pxqr.sta@gmail.com | ||
5 | -- Stability : experimental | ||
6 | -- Portability : portable | ||
7 | -- | ||
8 | -- Normally, you don't need to import this module. | ||
9 | -- | ||
10 | {-# LANGUAGE RankNTypes #-} | ||
11 | {-# LANGUAGE MultiParamTypeClasses #-} | ||
12 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
13 | {-# LANGUAGE ScopedTypeVariables #-} | ||
14 | {-# LANGUAGE DefaultSignatures #-} | ||
15 | module Network.KRPC.Method | ||
16 | ( Method (..) | ||
17 | , KRPC (..) | ||
18 | ) where | ||
19 | |||
20 | import Data.BEncode (BEncode) | ||
21 | import Data.ByteString.Char8 as BC | ||
22 | import Data.Char | ||
23 | import Data.Monoid | ||
24 | import Data.List as L | ||
25 | import Data.String | ||
26 | import Data.Typeable | ||
27 | import Network.KRPC.Message | ||
28 | |||
29 | |||
30 | -- | Method datatype used to describe method name, parameters and | ||
31 | -- return values of procedure. Client use a method to /invoke/, server | ||
32 | -- /implements/ the method to make the actual work. | ||
33 | -- | ||
34 | -- We use the following fantom types to ensure type-safiety: | ||
35 | -- | ||
36 | -- * param: Type of method parameters. | ||
37 | -- | ||
38 | -- * result: Type of return value of the method. | ||
39 | -- | ||
40 | newtype Method param result = Method { methodName :: MethodName } | ||
41 | deriving (Eq, Ord, IsString, BEncode) | ||
42 | |||
43 | -- | Example: | ||
44 | -- | ||
45 | -- @show (Method \"concat\" :: [Int] Int) == \"concat :: [Int] -> Int\"@ | ||
46 | -- | ||
47 | instance (Typeable a, Typeable b) => Show (Method a b) where | ||
48 | showsPrec _ = showsMethod | ||
49 | |||
50 | showsMethod :: forall a b. ( Typeable a , Typeable b ) => Method a b -> ShowS | ||
51 | showsMethod (Method name) = | ||
52 | showString (BC.unpack name) <> | ||
53 | showString " :: " <> | ||
54 | shows paramsTy <> | ||
55 | showString " -> " <> | ||
56 | shows valuesTy | ||
57 | where | ||
58 | impossible = error "KRPC.showsMethod: impossible" | ||
59 | paramsTy = typeOf (impossible :: a) | ||
60 | valuesTy = typeOf (impossible :: b) | ||
61 | |||
62 | -- | In order to perform or handle KRPC query you need to provide | ||
63 | -- corresponding 'KRPC' class. | ||
64 | -- | ||
65 | -- Example: | ||
66 | -- | ||
67 | -- @ | ||
68 | -- data Ping = Ping Text deriving BEncode | ||
69 | -- data Pong = Pong Text deriving BEncode | ||
70 | -- | ||
71 | -- instance 'KRPC' Ping Pong where | ||
72 | -- method = \"ping\" | ||
73 | -- @ | ||
74 | -- | ||
75 | class (Typeable req, BEncode req, Typeable resp, BEncode resp) | ||
76 | => KRPC req resp where | ||
77 | |||
78 | -- | Method name. Default implementation uses lowercased @req@ | ||
79 | -- datatype name. | ||
80 | -- | ||
81 | method :: Method req resp | ||
82 | |||
83 | -- TODO add underscores | ||
84 | default method :: Typeable req => Method req resp | ||
85 | method = Method $ fromString $ L.map toLower $ show $ typeOf hole | ||
86 | where | ||
87 | hole = error "krpc.method: impossible" :: req | ||