summaryrefslogtreecommitdiff
path: root/src/Network/KRPC/Manager.hs
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-06-04 22:39:14 -0400
committerjoe <joe@jerkface.net>2017-06-04 22:39:14 -0400
commit219d72ebde4bab5a516a86608dcb3aede75c1611 (patch)
treedf111d38c3532b9342f30c1bad98ef095569d54f /src/Network/KRPC/Manager.hs
parent713cee07450697e40811e74059739da02dd604c7 (diff)
WIP: Adapting DHT to Tox network.
Diffstat (limited to 'src/Network/KRPC/Manager.hs')
-rw-r--r--src/Network/KRPC/Manager.hs137
1 files changed, 117 insertions, 20 deletions
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index 66de6548..e7f0563b 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -55,9 +55,13 @@ import Control.Monad
55import Control.Monad.Logger 55import Control.Monad.Logger
56import Control.Monad.Reader 56import Control.Monad.Reader
57import Control.Monad.Trans.Control 57import Control.Monad.Trans.Control
58#ifdef VERSION_bencoding
58import Data.BEncode as BE 59import Data.BEncode as BE
59import Data.BEncode.Internal as BE 60import Data.BEncode.Internal as BE
60import Data.BEncode.Pretty (showBEncode) 61import Data.BEncode.Pretty (showBEncode)
62#else
63import qualified Data.Tox as Tox
64#endif
61import qualified Data.ByteString.Base16 as Base16 65import qualified Data.ByteString.Base16 as Base16
62import Data.ByteString as BS 66import Data.ByteString as BS
63import Data.ByteString.Char8 as BC 67import Data.ByteString.Char8 as BC
@@ -67,6 +71,7 @@ import Data.IORef
67import Data.List as L 71import Data.List as L
68import Data.Map as M 72import Data.Map as M
69import Data.Monoid 73import Data.Monoid
74import Data.Serialize as S
70import Data.Text as T 75import Data.Text as T
71import Data.Text.Encoding as T 76import Data.Text.Encoding as T
72import Data.Tuple 77import Data.Tuple
@@ -128,10 +133,10 @@ type KResult = Either KError KResponse
128 133
129type TransactionCounter = IORef Int 134type TransactionCounter = IORef Int
130type CallId = (TransactionId, SockAddr) 135type CallId = (TransactionId, SockAddr)
131type CallRes = MVar (BValue, KResult) 136type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response)
132type PendingCalls = IORef (Map CallId CallRes) 137type PendingCalls = IORef (Map CallId CallRes)
133 138
134type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) 139type HandlerBody h = SockAddr -> KQueryArgs -> h (Either String KQueryArgs)
135 140
136-- | Handler is a function which will be invoked then some /remote/ 141-- | Handler is a function which will be invoked then some /remote/
137-- node querying /this/ node. 142-- node querying /this/ node.
@@ -223,8 +228,13 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager
223-- TODO prettify log messages 228-- TODO prettify log messages
224querySignature :: MethodName -> TransactionId -> SockAddr -> Text 229querySignature :: MethodName -> TransactionId -> SockAddr -> Text
225querySignature name transaction addr = T.concat 230querySignature name transaction addr = T.concat
231#ifdef VERSION_bencoding
226 [ "&", T.decodeUtf8 name 232 [ "&", T.decodeUtf8 name
227 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction 233 , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction
234#else
235 [ "&", T.pack (show name)
236 , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction)
237#endif
228 , " @", T.pack (show addr) 238 , " @", T.pack (show addr)
229 ] 239 ]
230 240
@@ -243,14 +253,24 @@ data QueryFailure
243 253
244instance Exception QueryFailure 254instance Exception QueryFailure
245 255
256#ifdef VERSION_bencoding
246sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () 257sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m ()
247sendMessage sock addr a = do 258sendMessage sock addr a = do
248 liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr 259 liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr
260#else
261sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m ()
262sendMessage sock addr a = do
263 liftIO $ sendManyTo sock [a] addr
264#endif
249 265
250genTransactionId :: TransactionCounter -> IO TransactionId 266genTransactionId :: TransactionCounter -> IO TransactionId
251genTransactionId ref = do 267genTransactionId ref = do
252 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) 268 cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur)
269#ifdef VERSION_bencoding
253 return $ BC.pack (show cur) 270 return $ BC.pack (show cur)
271#else
272 return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ')
273#endif
254 274
255-- | How many times 'query' call have been performed. 275-- | How many times 'query' call have been performed.
256getQueryCount :: MonadKRPC h m => m Int 276getQueryCount :: MonadKRPC h m => m Int
@@ -274,8 +294,13 @@ unregisterQuery cid ref = do
274 294
275 295
276-- (sendmsg EINVAL) 296-- (sendmsg EINVAL)
297#ifdef VERSION_bencoding
277sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () 298sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO ()
278sendQuery sock addr q = handle sockError $ sendMessage sock addr q 299sendQuery sock addr q = handle sockError $ sendMessage sock addr q
300#else
301sendQuery :: Serialize a => Socket -> SockAddr -> a -> IO ()
302sendQuery sock addr q = handle sockError $ sendMessage sock addr (S.encode q)
303#endif
279 where 304 where
280 sockError :: IOError -> IO () 305 sockError :: IOError -> IO ()
281 sockError _ = throwIO SendFailed 306 sockError _ = throwIO SendFailed
@@ -295,11 +320,11 @@ query' addr params = queryK addr params (const (,))
295-- | Enqueue a query, but give us the complete BEncoded content sent by the 320-- | Enqueue a query, but give us the complete BEncoded content sent by the
296-- remote Node. This is useful for handling extensions that this library does 321-- remote Node. This is useful for handling extensions that this library does
297-- not otherwise support. 322-- not otherwise support.
298queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) 323queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs)
299queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) 324queryRaw addr params = queryK addr params (\raw x _ -> (x,raw))
300 325
301queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => 326queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) =>
302 SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x 327 SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x
303queryK addr params kont = do 328queryK addr params kont = do
304 Manager {..} <- getManager 329 Manager {..} <- getManager
305 tid <- liftIO $ genTransactionId transactionCounter 330 tid <- liftIO $ genTransactionId transactionCounter
@@ -310,17 +335,29 @@ queryK addr params kont = do
310 mres <- liftIO $ do 335 mres <- liftIO $ do
311 ares <- registerQuery (tid, addr) pendingCalls 336 ares <- registerQuery (tid, addr) pendingCalls
312 337
338#ifdef VERSION_bencoding
313 let q = KQuery (toBEncode params) (methodName queryMethod) tid 339 let q = KQuery (toBEncode params) (methodName queryMethod) tid
340#else
341 let q = Tox.Message (methodName queryMethod) cli tid params
342 cli = error "TODO TOX client node id"
343#endif
314 sendQuery sock addr q 344 sendQuery sock addr q
315 `onException` unregisterQuery (tid, addr) pendingCalls 345 `onException` unregisterQuery (tid, addr) pendingCalls
316 346
317 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do 347 timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do
318 (raw,res) <- readMVar ares 348 (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult)
319 case res of 349 case res of
350#ifdef VERSION_bencoding
320 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) 351 Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m)
321 Right (KResponse {..}) -> 352 Right (KResponse {..}) ->
322 case fromBEncode respVals of 353 case fromBEncode respVals of
323 Right r -> pure $ kont raw r respIP 354 Right r -> pure $ kont raw r respIP
355#else
356 Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR"
357 Right (Tox.Message {..}) ->
358 case S.decode msgPayload of
359 Right r -> pure $ kont raw r Nothing
360#endif
324 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) 361 Left e -> throwIO $ QueryFailed ProtocolError (T.pack e)
325 362
326 case mres of 363 case mres of
@@ -377,51 +414,87 @@ handler body = (name, wrapper)
377 where 414 where
378 Method name = method :: Method a b 415 Method name = method :: Method a b
379 wrapper addr args = 416 wrapper addr args =
417#ifdef VERSION_bencoding
380 case fromBEncode args of 418 case fromBEncode args of
419#else
420 case S.decode args of
421#endif
381 Left e -> return $ Left e 422 Left e -> return $ Left e
382 Right a -> do 423 Right a -> do
383 r <- body addr a 424 r <- body addr a
425#ifdef VERSION_bencoding
384 return $ Right $ toBEncode r 426 return $ Right $ toBEncode r
427#else
428 return $ Right $ S.encode r
429#endif
385 430
386runHandler :: MonadKRPC h m 431runHandler :: MonadKRPC h m
387 => HandlerBody h -> SockAddr -> KQuery -> m KResult 432 => HandlerBody h -> SockAddr -> KQuery -> m KResult
388runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks 433runHandler h addr m = Lifted.catches wrapper failbacks
389 where 434 where
390 signature = querySignature queryMethod queryId addr 435 signature = querySignature (queryMethod m) (queryId m) addr
391 436
392 wrapper = do 437 wrapper = do
393 $(logDebugS) "handler.quered" signature 438 $(logDebugS) "handler.quered" signature
394 result <- liftHandler (h addr queryArgs) 439 result <- liftHandler (h addr (queryArgs m))
395 440
396 case result of 441 case result of
397 Left msg -> do 442 Left msg -> do
398 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg 443 $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg
399 return $ Left $ KError ProtocolError (BC.pack msg) queryId 444#ifdef VERSION_bencoding
445 return $ Left $ KError ProtocolError (BC.pack msg) (queryId m)
446#else
447 return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m)
448#endif
400 449
401 Right a -> do 450 Right a -> do -- KQueryArgs
402 $(logDebugS) "handler.success" signature 451 $(logDebugS) "handler.success" signature
403 return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) 452#ifdef VERSION_bencoding
453 return $ Right $ KResponse a (queryId m) (Just $ ReflectedIP addr)
454#else
455 let cli = error "TODO TOX client node id"
456 messageid = error "TODO TOX message response id"
457 -- TODO: ReflectedIP addr ??
458 return $ Right $ Tox.Message messageid cli (queryId m) a
459#endif
404 460
405 failbacks = 461 failbacks =
406 [ E.Handler $ \ (e :: HandlerFailure) -> do 462 [ E.Handler $ \ (e :: HandlerFailure) -> do
407 $(logDebugS) "handler.failed" signature 463 $(logDebugS) "handler.failed" signature
408 return $ Left $ KError ProtocolError (prettyHF e) queryId 464#ifdef VERSION_bencoding
465 return $ Left $ KError ProtocolError (prettyHF e) (queryId m)
466#else
467 return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m)
468#endif
469
409 470
410 -- may happen if handler makes query and fail 471 -- may happen if handler makes query and fail
411 , E.Handler $ \ (e :: QueryFailure) -> do 472 , E.Handler $ \ (e :: QueryFailure) -> do
412 return $ Left $ KError ServerError (prettyQF e) queryId 473#ifdef VERSION_bencoding
474 return $ Left $ KError ServerError (prettyQF e) (queryId m)
475#else
476 return $ Left $ decodeError "TODO TOX ServerError" (queryId m)
477#endif
413 478
414 -- since handler thread exit after sendMessage we can safely 479 -- since handler thread exit after sendMessage we can safely
415 -- suppress async exception here 480 -- suppress async exception here
416 , E.Handler $ \ (e :: SomeException) -> do 481 , E.Handler $ \ (e :: SomeException) -> do
417 return $ Left $ KError GenericError (BC.pack (show e)) queryId 482#ifdef VERSION_bencoding
483 return $ Left $ KError GenericError (BC.pack (show e)) (queryId m)
484#else
485 return $ Left $ decodeError "TODO TOX GenericError" (queryId m)
486#endif
418 ] 487 ]
419 488
420dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult 489dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult
421dispatchHandler q @ KQuery {..} addr = do 490dispatchHandler q addr = do
422 Manager {..} <- getManager 491 Manager {..} <- getManager
423 case L.lookup queryMethod handlers of 492 case L.lookup (queryMethod q) handlers of
424 Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId 493#ifdef VERSION_bencoding
494 Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q)
495#else
496 Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q)
497#endif
425 Just h -> runHandler h addr q 498 Just h -> runHandler h addr q
426 499
427{----------------------------------------------------------------------- 500{-----------------------------------------------------------------------
@@ -435,11 +508,12 @@ dispatchHandler q @ KQuery {..} addr = do
435-- peer B fork too many threads 508-- peer B fork too many threads
436-- ... space leak 509-- ... space leak
437-- 510--
438handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () 511handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m ()
439handleQuery raw q addr = void $ fork $ do 512handleQuery raw q addr = void $ fork $ do
440 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" 513 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
441 Manager {..} <- getManager 514 Manager {..} <- getManager
442 res <- dispatchHandler q addr 515 res <- dispatchHandler q addr
516#ifdef VERSION_bencoding
443 let resbe = either toBEncode toBEncode res 517 let resbe = either toBEncode toBEncode res
444 $(logOther "q") $ T.unlines 518 $(logOther "q") $ T.unlines
445 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) 519 [ either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw)
@@ -447,21 +521,36 @@ handleQuery raw q addr = void $ fork $ do
447 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) 521 , either (const "<unicode-fail>") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe)
448 ] 522 ]
449 sendMessage sock addr resbe 523 sendMessage sock addr resbe
524#else
525 -- Errors not sent for Tox.
526 either (const $ return ()) (sendMessage sock addr . S.encode) res
527#endif
450 528
451handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () 529handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m ()
452handleResponse raw result addr = do 530handleResponse raw result addr = do
453 Manager {..} <- getManager 531 Manager {..} <- getManager
454 liftIO $ do 532 liftIO $ do
533#ifdef VERSION_bencoding
455 let resultId = either errorId respId result 534 let resultId = either errorId respId result
535#else
536 let resultId = either Tox.msgNonce Tox.msgNonce result
537#endif
456 mcall <- unregisterQuery (resultId, addr) pendingCalls 538 mcall <- unregisterQuery (resultId, addr) pendingCalls
457 case mcall of 539 case mcall of
458 Nothing -> return () 540 Nothing -> return ()
459 Just ares -> putMVar ares (raw,result) 541 Just ares -> putMVar ares (raw,result)
460 542
461handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () 543#ifdef VERSION_bencoding
544handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m ()
462handleMessage raw (Q q) = handleQuery raw q 545handleMessage raw (Q q) = handleQuery raw q
463handleMessage raw (R r) = handleResponse raw (Right r) 546handleMessage raw (R r) = handleResponse raw (Right r)
464handleMessage raw (E e) = handleResponse raw (Left e) 547handleMessage raw (E e) = handleResponse raw (Left e)
548#else
549handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m ()
550handleMessage raw q | Tox.isQuery q = handleQuery raw q
551handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r)
552handleMessage raw e | Tox.isError e = handleResponse raw (Left e)
553#endif
465 554
466listener :: MonadKRPC h m => m () 555listener :: MonadKRPC h m => m ()
467listener = do 556listener = do
@@ -469,9 +558,17 @@ listener = do
469 fix $ \again -> do 558 fix $ \again -> do
470 (bs, addr) <- liftIO $ do 559 (bs, addr) <- liftIO $ do
471 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) 560 handle exceptions $ BS.recvFrom sock (optMaxMsgSize options)
561#ifdef VERSION_bencoding
472 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of 562 case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of
563#else
564 case return bs >>= \r -> (,) r <$> decode bs of
565#endif
473 -- TODO ignore unknown messages at all? 566 -- TODO ignore unknown messages at all?
567#ifdef VERSION_bencoding
474 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e 568 Left e -> liftIO $ sendMessage sock addr $ unknownMessage e
569#else
570 Left _ -> return () -- TODO TOX send unknownMessage error
571#endif
475 Right (raw,m) -> handleMessage raw m addr 572 Right (raw,m) -> handleMessage raw m addr
476 again 573 again
477 where 574 where