diff options
Diffstat (limited to 'src/Network/KRPC/Manager.hs')
-rw-r--r-- | src/Network/KRPC/Manager.hs | 137 |
1 files changed, 117 insertions, 20 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 66de6548..e7f0563b 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs | |||
@@ -55,9 +55,13 @@ import Control.Monad | |||
55 | import Control.Monad.Logger | 55 | import Control.Monad.Logger |
56 | import Control.Monad.Reader | 56 | import Control.Monad.Reader |
57 | import Control.Monad.Trans.Control | 57 | import Control.Monad.Trans.Control |
58 | #ifdef VERSION_bencoding | ||
58 | import Data.BEncode as BE | 59 | import Data.BEncode as BE |
59 | import Data.BEncode.Internal as BE | 60 | import Data.BEncode.Internal as BE |
60 | import Data.BEncode.Pretty (showBEncode) | 61 | import Data.BEncode.Pretty (showBEncode) |
62 | #else | ||
63 | import qualified Data.Tox as Tox | ||
64 | #endif | ||
61 | import qualified Data.ByteString.Base16 as Base16 | 65 | import qualified Data.ByteString.Base16 as Base16 |
62 | import Data.ByteString as BS | 66 | import Data.ByteString as BS |
63 | import Data.ByteString.Char8 as BC | 67 | import Data.ByteString.Char8 as BC |
@@ -67,6 +71,7 @@ import Data.IORef | |||
67 | import Data.List as L | 71 | import Data.List as L |
68 | import Data.Map as M | 72 | import Data.Map as M |
69 | import Data.Monoid | 73 | import Data.Monoid |
74 | import Data.Serialize as S | ||
70 | import Data.Text as T | 75 | import Data.Text as T |
71 | import Data.Text.Encoding as T | 76 | import Data.Text.Encoding as T |
72 | import Data.Tuple | 77 | import Data.Tuple |
@@ -128,10 +133,10 @@ type KResult = Either KError KResponse | |||
128 | 133 | ||
129 | type TransactionCounter = IORef Int | 134 | type TransactionCounter = IORef Int |
130 | type CallId = (TransactionId, SockAddr) | 135 | type CallId = (TransactionId, SockAddr) |
131 | type CallRes = MVar (BValue, KResult) | 136 | type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) |
132 | type PendingCalls = IORef (Map CallId CallRes) | 137 | type PendingCalls = IORef (Map CallId CallRes) |
133 | 138 | ||
134 | type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) | 139 | type HandlerBody h = SockAddr -> KQueryArgs -> h (Either String KQueryArgs) |
135 | 140 | ||
136 | -- | Handler is a function which will be invoked then some /remote/ | 141 | -- | Handler is a function which will be invoked then some /remote/ |
137 | -- node querying /this/ node. | 142 | -- node querying /this/ node. |
@@ -223,8 +228,13 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager | |||
223 | -- TODO prettify log messages | 228 | -- TODO prettify log messages |
224 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text | 229 | querySignature :: MethodName -> TransactionId -> SockAddr -> Text |
225 | querySignature name transaction addr = T.concat | 230 | querySignature name transaction addr = T.concat |
231 | #ifdef VERSION_bencoding | ||
226 | [ "&", T.decodeUtf8 name | 232 | [ "&", T.decodeUtf8 name |
227 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction | 233 | , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction |
234 | #else | ||
235 | [ "&", T.pack (show name) | ||
236 | , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) | ||
237 | #endif | ||
228 | , " @", T.pack (show addr) | 238 | , " @", T.pack (show addr) |
229 | ] | 239 | ] |
230 | 240 | ||
@@ -243,14 +253,24 @@ data QueryFailure | |||
243 | 253 | ||
244 | instance Exception QueryFailure | 254 | instance Exception QueryFailure |
245 | 255 | ||
256 | #ifdef VERSION_bencoding | ||
246 | sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () | 257 | sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () |
247 | sendMessage sock addr a = do | 258 | sendMessage sock addr a = do |
248 | liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr | 259 | liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr |
260 | #else | ||
261 | sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () | ||
262 | sendMessage sock addr a = do | ||
263 | liftIO $ sendManyTo sock [a] addr | ||
264 | #endif | ||
249 | 265 | ||
250 | genTransactionId :: TransactionCounter -> IO TransactionId | 266 | genTransactionId :: TransactionCounter -> IO TransactionId |
251 | genTransactionId ref = do | 267 | genTransactionId ref = do |
252 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) | 268 | cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) |
269 | #ifdef VERSION_bencoding | ||
253 | return $ BC.pack (show cur) | 270 | return $ BC.pack (show cur) |
271 | #else | ||
272 | return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') | ||
273 | #endif | ||
254 | 274 | ||
255 | -- | How many times 'query' call have been performed. | 275 | -- | How many times 'query' call have been performed. |
256 | getQueryCount :: MonadKRPC h m => m Int | 276 | getQueryCount :: MonadKRPC h m => m Int |
@@ -274,8 +294,13 @@ unregisterQuery cid ref = do | |||
274 | 294 | ||
275 | 295 | ||
276 | -- (sendmsg EINVAL) | 296 | -- (sendmsg EINVAL) |
297 | #ifdef VERSION_bencoding | ||
277 | sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () | 298 | sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () |
278 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q | 299 | sendQuery sock addr q = handle sockError $ sendMessage sock addr q |
300 | #else | ||
301 | sendQuery :: Serialize a => Socket -> SockAddr -> a -> IO () | ||
302 | sendQuery sock addr q = handle sockError $ sendMessage sock addr (S.encode q) | ||
303 | #endif | ||
279 | where | 304 | where |
280 | sockError :: IOError -> IO () | 305 | sockError :: IOError -> IO () |
281 | sockError _ = throwIO SendFailed | 306 | sockError _ = throwIO SendFailed |
@@ -295,11 +320,11 @@ query' addr params = queryK addr params (const (,)) | |||
295 | -- | Enqueue a query, but give us the complete BEncoded content sent by the | 320 | -- | Enqueue a query, but give us the complete BEncoded content sent by the |
296 | -- remote Node. This is useful for handling extensions that this library does | 321 | -- remote Node. This is useful for handling extensions that this library does |
297 | -- not otherwise support. | 322 | -- not otherwise support. |
298 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) | 323 | queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) |
299 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) | 324 | queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) |
300 | 325 | ||
301 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => | 326 | queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => |
302 | SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x | 327 | SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x |
303 | queryK addr params kont = do | 328 | queryK addr params kont = do |
304 | Manager {..} <- getManager | 329 | Manager {..} <- getManager |
305 | tid <- liftIO $ genTransactionId transactionCounter | 330 | tid <- liftIO $ genTransactionId transactionCounter |
@@ -310,17 +335,29 @@ queryK addr params kont = do | |||
310 | mres <- liftIO $ do | 335 | mres <- liftIO $ do |
311 | ares <- registerQuery (tid, addr) pendingCalls | 336 | ares <- registerQuery (tid, addr) pendingCalls |
312 | 337 | ||
338 | #ifdef VERSION_bencoding | ||
313 | let q = KQuery (toBEncode params) (methodName queryMethod) tid | 339 | let q = KQuery (toBEncode params) (methodName queryMethod) tid |
340 | #else | ||
341 | let q = Tox.Message (methodName queryMethod) cli tid params | ||
342 | cli = error "TODO TOX client node id" | ||
343 | #endif | ||
314 | sendQuery sock addr q | 344 | sendQuery sock addr q |
315 | `onException` unregisterQuery (tid, addr) pendingCalls | 345 | `onException` unregisterQuery (tid, addr) pendingCalls |
316 | 346 | ||
317 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do | 347 | timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do |
318 | (raw,res) <- readMVar ares | 348 | (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) |
319 | case res of | 349 | case res of |
350 | #ifdef VERSION_bencoding | ||
320 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) | 351 | Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) |
321 | Right (KResponse {..}) -> | 352 | Right (KResponse {..}) -> |
322 | case fromBEncode respVals of | 353 | case fromBEncode respVals of |
323 | Right r -> pure $ kont raw r respIP | 354 | Right r -> pure $ kont raw r respIP |
355 | #else | ||
356 | Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" | ||
357 | Right (Tox.Message {..}) -> | ||
358 | case S.decode msgPayload of | ||
359 | Right r -> pure $ kont raw r Nothing | ||
360 | #endif | ||
324 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) | 361 | Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) |
325 | 362 | ||
326 | case mres of | 363 | case mres of |
@@ -377,51 +414,87 @@ handler body = (name, wrapper) | |||
377 | where | 414 | where |
378 | Method name = method :: Method a b | 415 | Method name = method :: Method a b |
379 | wrapper addr args = | 416 | wrapper addr args = |
417 | #ifdef VERSION_bencoding | ||
380 | case fromBEncode args of | 418 | case fromBEncode args of |
419 | #else | ||
420 | case S.decode args of | ||
421 | #endif | ||
381 | Left e -> return $ Left e | 422 | Left e -> return $ Left e |
382 | Right a -> do | 423 | Right a -> do |
383 | r <- body addr a | 424 | r <- body addr a |
425 | #ifdef VERSION_bencoding | ||
384 | return $ Right $ toBEncode r | 426 | return $ Right $ toBEncode r |
427 | #else | ||
428 | return $ Right $ S.encode r | ||
429 | #endif | ||
385 | 430 | ||
386 | runHandler :: MonadKRPC h m | 431 | runHandler :: MonadKRPC h m |
387 | => HandlerBody h -> SockAddr -> KQuery -> m KResult | 432 | => HandlerBody h -> SockAddr -> KQuery -> m KResult |
388 | runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks | 433 | runHandler h addr m = Lifted.catches wrapper failbacks |
389 | where | 434 | where |
390 | signature = querySignature queryMethod queryId addr | 435 | signature = querySignature (queryMethod m) (queryId m) addr |
391 | 436 | ||
392 | wrapper = do | 437 | wrapper = do |
393 | $(logDebugS) "handler.quered" signature | 438 | $(logDebugS) "handler.quered" signature |
394 | result <- liftHandler (h addr queryArgs) | 439 | result <- liftHandler (h addr (queryArgs m)) |
395 | 440 | ||
396 | case result of | 441 | case result of |
397 | Left msg -> do | 442 | Left msg -> do |
398 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg | 443 | $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg |
399 | return $ Left $ KError ProtocolError (BC.pack msg) queryId | 444 | #ifdef VERSION_bencoding |
445 | return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) | ||
446 | #else | ||
447 | return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) | ||
448 | #endif | ||
400 | 449 | ||
401 | Right a -> do | 450 | Right a -> do -- KQueryArgs |
402 | $(logDebugS) "handler.success" signature | 451 | $(logDebugS) "handler.success" signature |
403 | return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) | 452 | #ifdef VERSION_bencoding |
453 | return $ Right $ KResponse a (queryId m) (Just $ ReflectedIP addr) | ||
454 | #else | ||
455 | let cli = error "TODO TOX client node id" | ||
456 | messageid = error "TODO TOX message response id" | ||
457 | -- TODO: ReflectedIP addr ?? | ||
458 | return $ Right $ Tox.Message messageid cli (queryId m) a | ||
459 | #endif | ||
404 | 460 | ||
405 | failbacks = | 461 | failbacks = |
406 | [ E.Handler $ \ (e :: HandlerFailure) -> do | 462 | [ E.Handler $ \ (e :: HandlerFailure) -> do |
407 | $(logDebugS) "handler.failed" signature | 463 | $(logDebugS) "handler.failed" signature |
408 | return $ Left $ KError ProtocolError (prettyHF e) queryId | 464 | #ifdef VERSION_bencoding |
465 | return $ Left $ KError ProtocolError (prettyHF e) (queryId m) | ||
466 | #else | ||
467 | return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) | ||
468 | #endif | ||
469 | |||
409 | 470 | ||
410 | -- may happen if handler makes query and fail | 471 | -- may happen if handler makes query and fail |
411 | , E.Handler $ \ (e :: QueryFailure) -> do | 472 | , E.Handler $ \ (e :: QueryFailure) -> do |
412 | return $ Left $ KError ServerError (prettyQF e) queryId | 473 | #ifdef VERSION_bencoding |
474 | return $ Left $ KError ServerError (prettyQF e) (queryId m) | ||
475 | #else | ||
476 | return $ Left $ decodeError "TODO TOX ServerError" (queryId m) | ||
477 | #endif | ||
413 | 478 | ||
414 | -- since handler thread exit after sendMessage we can safely | 479 | -- since handler thread exit after sendMessage we can safely |
415 | -- suppress async exception here | 480 | -- suppress async exception here |
416 | , E.Handler $ \ (e :: SomeException) -> do | 481 | , E.Handler $ \ (e :: SomeException) -> do |
417 | return $ Left $ KError GenericError (BC.pack (show e)) queryId | 482 | #ifdef VERSION_bencoding |
483 | return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) | ||
484 | #else | ||
485 | return $ Left $ decodeError "TODO TOX GenericError" (queryId m) | ||
486 | #endif | ||
418 | ] | 487 | ] |
419 | 488 | ||
420 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult | 489 | dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult |
421 | dispatchHandler q @ KQuery {..} addr = do | 490 | dispatchHandler q addr = do |
422 | Manager {..} <- getManager | 491 | Manager {..} <- getManager |
423 | case L.lookup queryMethod handlers of | 492 | case L.lookup (queryMethod q) handlers of |
424 | Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId | 493 | #ifdef VERSION_bencoding |
494 | Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) | ||
495 | #else | ||
496 | Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) | ||
497 | #endif | ||
425 | Just h -> runHandler h addr q | 498 | Just h -> runHandler h addr q |
426 | 499 | ||
427 | {----------------------------------------------------------------------- | 500 | {----------------------------------------------------------------------- |
@@ -435,11 +508,12 @@ dispatchHandler q @ KQuery {..} addr = do | |||
435 | -- peer B fork too many threads | 508 | -- peer B fork too many threads |
436 | -- ... space leak | 509 | -- ... space leak |
437 | -- | 510 | -- |
438 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () | 511 | handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () |
439 | handleQuery raw q addr = void $ fork $ do | 512 | handleQuery raw q addr = void $ fork $ do |
440 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | 513 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" |
441 | Manager {..} <- getManager | 514 | Manager {..} <- getManager |
442 | res <- dispatchHandler q addr | 515 | res <- dispatchHandler q addr |
516 | #ifdef VERSION_bencoding | ||
443 | let resbe = either toBEncode toBEncode res | 517 | let resbe = either toBEncode toBEncode res |
444 | $(logOther "q") $ T.unlines | 518 | $(logOther "q") $ T.unlines |
445 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) | 519 | [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) |
@@ -447,21 +521,36 @@ handleQuery raw q addr = void $ fork $ do | |||
447 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) | 521 | , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) |
448 | ] | 522 | ] |
449 | sendMessage sock addr resbe | 523 | sendMessage sock addr resbe |
524 | #else | ||
525 | -- Errors not sent for Tox. | ||
526 | either (const $ return ()) (sendMessage sock addr . S.encode) res | ||
527 | #endif | ||
450 | 528 | ||
451 | handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () | 529 | handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () |
452 | handleResponse raw result addr = do | 530 | handleResponse raw result addr = do |
453 | Manager {..} <- getManager | 531 | Manager {..} <- getManager |
454 | liftIO $ do | 532 | liftIO $ do |
533 | #ifdef VERSION_bencoding | ||
455 | let resultId = either errorId respId result | 534 | let resultId = either errorId respId result |
535 | #else | ||
536 | let resultId = either Tox.msgNonce Tox.msgNonce result | ||
537 | #endif | ||
456 | mcall <- unregisterQuery (resultId, addr) pendingCalls | 538 | mcall <- unregisterQuery (resultId, addr) pendingCalls |
457 | case mcall of | 539 | case mcall of |
458 | Nothing -> return () | 540 | Nothing -> return () |
459 | Just ares -> putMVar ares (raw,result) | 541 | Just ares -> putMVar ares (raw,result) |
460 | 542 | ||
461 | handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () | 543 | #ifdef VERSION_bencoding |
544 | handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () | ||
462 | handleMessage raw (Q q) = handleQuery raw q | 545 | handleMessage raw (Q q) = handleQuery raw q |
463 | handleMessage raw (R r) = handleResponse raw (Right r) | 546 | handleMessage raw (R r) = handleResponse raw (Right r) |
464 | handleMessage raw (E e) = handleResponse raw (Left e) | 547 | handleMessage raw (E e) = handleResponse raw (Left e) |
548 | #else | ||
549 | handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () | ||
550 | handleMessage raw q | Tox.isQuery q = handleQuery raw q | ||
551 | handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) | ||
552 | handleMessage raw e | Tox.isError e = handleResponse raw (Left e) | ||
553 | #endif | ||
465 | 554 | ||
466 | listener :: MonadKRPC h m => m () | 555 | listener :: MonadKRPC h m => m () |
467 | listener = do | 556 | listener = do |
@@ -469,9 +558,17 @@ listener = do | |||
469 | fix $ \again -> do | 558 | fix $ \again -> do |
470 | (bs, addr) <- liftIO $ do | 559 | (bs, addr) <- liftIO $ do |
471 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) | 560 | handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) |
561 | #ifdef VERSION_bencoding | ||
472 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of | 562 | case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of |
563 | #else | ||
564 | case return bs >>= \r -> (,) r <$> decode bs of | ||
565 | #endif | ||
473 | -- TODO ignore unknown messages at all? | 566 | -- TODO ignore unknown messages at all? |
567 | #ifdef VERSION_bencoding | ||
474 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e | 568 | Left e -> liftIO $ sendMessage sock addr $ unknownMessage e |
569 | #else | ||
570 | Left _ -> return () -- TODO TOX send unknownMessage error | ||
571 | #endif | ||
475 | Right (raw,m) -> handleMessage raw m addr | 572 | Right (raw,m) -> handleMessage raw m addr |
476 | again | 573 | again |
477 | where | 574 | where |