From 219d72ebde4bab5a516a86608dcb3aede75c1611 Mon Sep 17 00:00:00 2001 From: joe Date: Sun, 4 Jun 2017 22:39:14 -0400 Subject: WIP: Adapting DHT to Tox network. --- src/Network/KRPC/Manager.hs | 137 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 117 insertions(+), 20 deletions(-) (limited to 'src/Network/KRPC/Manager.hs') diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 66de6548..e7f0563b 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -55,9 +55,13 @@ import Control.Monad import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Control +#ifdef VERSION_bencoding import Data.BEncode as BE import Data.BEncode.Internal as BE import Data.BEncode.Pretty (showBEncode) +#else +import qualified Data.Tox as Tox +#endif import qualified Data.ByteString.Base16 as Base16 import Data.ByteString as BS import Data.ByteString.Char8 as BC @@ -67,6 +71,7 @@ import Data.IORef import Data.List as L import Data.Map as M import Data.Monoid +import Data.Serialize as S import Data.Text as T import Data.Text.Encoding as T import Data.Tuple @@ -128,10 +133,10 @@ type KResult = Either KError KResponse type TransactionCounter = IORef Int type CallId = (TransactionId, SockAddr) -type CallRes = MVar (BValue, KResult) +type CallRes = MVar (KQueryArgs, KResult) -- (raw response, decoded response) type PendingCalls = IORef (Map CallId CallRes) -type HandlerBody h = SockAddr -> BValue -> h (BE.Result BValue) +type HandlerBody h = SockAddr -> KQueryArgs -> h (Either String KQueryArgs) -- | Handler is a function which will be invoked then some /remote/ -- node querying /this/ node. @@ -223,8 +228,13 @@ withManager opts addr hs = bracket (newManager opts addr hs) closeManager -- TODO prettify log messages querySignature :: MethodName -> TransactionId -> SockAddr -> Text querySignature name transaction addr = T.concat +#ifdef VERSION_bencoding [ "&", T.decodeUtf8 name , " #", T.decodeUtf8 (Base16.encode transaction) -- T.decodeUtf8 transaction +#else + [ "&", T.pack (show name) + , " #", T.decodeUtf8 (Base16.encode $ S.encode transaction) +#endif , " @", T.pack (show addr) ] @@ -243,14 +253,24 @@ data QueryFailure instance Exception QueryFailure +#ifdef VERSION_bencoding sendMessage :: MonadIO m => BEncode a => Socket -> SockAddr -> a -> m () sendMessage sock addr a = do liftIO $ sendManyTo sock (BL.toChunks (BE.encode a)) addr +#else +sendMessage :: MonadIO m => Socket -> SockAddr -> BC.ByteString -> m () +sendMessage sock addr a = do + liftIO $ sendManyTo sock [a] addr +#endif genTransactionId :: TransactionCounter -> IO TransactionId genTransactionId ref = do cur <- atomicModifyIORef' ref $ \ cur -> (succ cur, cur) +#ifdef VERSION_bencoding return $ BC.pack (show cur) +#else + return $ either (error "failed to create TransactionId") id $ S.decode $ BC.pack (L.take 24 $ show cur ++ L.repeat ' ') +#endif -- | How many times 'query' call have been performed. getQueryCount :: MonadKRPC h m => m Int @@ -274,8 +294,13 @@ unregisterQuery cid ref = do -- (sendmsg EINVAL) +#ifdef VERSION_bencoding sendQuery :: BEncode a => Socket -> SockAddr -> a -> IO () sendQuery sock addr q = handle sockError $ sendMessage sock addr q +#else +sendQuery :: Serialize a => Socket -> SockAddr -> a -> IO () +sendQuery sock addr q = handle sockError $ sendMessage sock addr (S.encode q) +#endif where sockError :: IOError -> IO () sockError _ = throwIO SendFailed @@ -295,11 +320,11 @@ query' addr params = queryK addr params (const (,)) -- | Enqueue a query, but give us the complete BEncoded content sent by the -- remote Node. This is useful for handling extensions that this library does -- not otherwise support. -queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, BValue) +queryRaw :: forall h m a b. (MonadKRPC h m, KRPC a b) => SockAddr -> a -> m (b, KQueryArgs) queryRaw addr params = queryK addr params (\raw x _ -> (x,raw)) queryK :: forall h m a b x. (MonadKRPC h m, KRPC a b) => - SockAddr -> a -> (BValue -> b -> Maybe ReflectedIP -> x) -> m x + SockAddr -> a -> (KQueryArgs -> b -> Maybe ReflectedIP -> x) -> m x queryK addr params kont = do Manager {..} <- getManager tid <- liftIO $ genTransactionId transactionCounter @@ -310,17 +335,29 @@ queryK addr params kont = do mres <- liftIO $ do ares <- registerQuery (tid, addr) pendingCalls +#ifdef VERSION_bencoding let q = KQuery (toBEncode params) (methodName queryMethod) tid +#else + let q = Tox.Message (methodName queryMethod) cli tid params + cli = error "TODO TOX client node id" +#endif sendQuery sock addr q `onException` unregisterQuery (tid, addr) pendingCalls timeout (optQueryTimeout options * 10 ^ (6 :: Int)) $ do - (raw,res) <- readMVar ares + (raw,res) <- readMVar ares -- MVar (KQueryArgs, KResult) case res of +#ifdef VERSION_bencoding Left (KError c m _) -> throwIO $ QueryFailed c (T.decodeUtf8 m) Right (KResponse {..}) -> case fromBEncode respVals of Right r -> pure $ kont raw r respIP +#else + Left _ -> throwIO $ QueryFailed GenericError "TODO: TOX ERROR" + Right (Tox.Message {..}) -> + case S.decode msgPayload of + Right r -> pure $ kont raw r Nothing +#endif Left e -> throwIO $ QueryFailed ProtocolError (T.pack e) case mres of @@ -377,51 +414,87 @@ handler body = (name, wrapper) where Method name = method :: Method a b wrapper addr args = +#ifdef VERSION_bencoding case fromBEncode args of +#else + case S.decode args of +#endif Left e -> return $ Left e Right a -> do r <- body addr a +#ifdef VERSION_bencoding return $ Right $ toBEncode r +#else + return $ Right $ S.encode r +#endif runHandler :: MonadKRPC h m => HandlerBody h -> SockAddr -> KQuery -> m KResult -runHandler h addr KQuery {..} = Lifted.catches wrapper failbacks +runHandler h addr m = Lifted.catches wrapper failbacks where - signature = querySignature queryMethod queryId addr + signature = querySignature (queryMethod m) (queryId m) addr wrapper = do $(logDebugS) "handler.quered" signature - result <- liftHandler (h addr queryArgs) + result <- liftHandler (h addr (queryArgs m)) case result of Left msg -> do $(logDebugS) "handler.bad_query" $ signature <> " !" <> T.pack msg - return $ Left $ KError ProtocolError (BC.pack msg) queryId +#ifdef VERSION_bencoding + return $ Left $ KError ProtocolError (BC.pack msg) (queryId m) +#else + return $ Left $ decodeError "TODO TOX ProtocolError" (queryId m) +#endif - Right a -> do + Right a -> do -- KQueryArgs $(logDebugS) "handler.success" signature - return $ Right $ KResponse a queryId (Just $ ReflectedIP addr) +#ifdef VERSION_bencoding + return $ Right $ KResponse a (queryId m) (Just $ ReflectedIP addr) +#else + let cli = error "TODO TOX client node id" + messageid = error "TODO TOX message response id" + -- TODO: ReflectedIP addr ?? + return $ Right $ Tox.Message messageid cli (queryId m) a +#endif failbacks = [ E.Handler $ \ (e :: HandlerFailure) -> do $(logDebugS) "handler.failed" signature - return $ Left $ KError ProtocolError (prettyHF e) queryId +#ifdef VERSION_bencoding + return $ Left $ KError ProtocolError (prettyHF e) (queryId m) +#else + return $ Left $ decodeError "TODO TOX ProtocolError 2" (queryId m) +#endif + -- may happen if handler makes query and fail , E.Handler $ \ (e :: QueryFailure) -> do - return $ Left $ KError ServerError (prettyQF e) queryId +#ifdef VERSION_bencoding + return $ Left $ KError ServerError (prettyQF e) (queryId m) +#else + return $ Left $ decodeError "TODO TOX ServerError" (queryId m) +#endif -- since handler thread exit after sendMessage we can safely -- suppress async exception here , E.Handler $ \ (e :: SomeException) -> do - return $ Left $ KError GenericError (BC.pack (show e)) queryId +#ifdef VERSION_bencoding + return $ Left $ KError GenericError (BC.pack (show e)) (queryId m) +#else + return $ Left $ decodeError "TODO TOX GenericError" (queryId m) +#endif ] dispatchHandler :: MonadKRPC h m => KQuery -> SockAddr -> m KResult -dispatchHandler q @ KQuery {..} addr = do +dispatchHandler q addr = do Manager {..} <- getManager - case L.lookup queryMethod handlers of - Nothing -> return $ Left $ KError MethodUnknown queryMethod queryId + case L.lookup (queryMethod q) handlers of +#ifdef VERSION_bencoding + Nothing -> return $ Left $ KError MethodUnknown (queryMethod q) (queryId q) +#else + Nothing -> return $ Left $ decodeError "TODO TOX Error MethodUnknown" (queryId q) +#endif Just h -> runHandler h addr q {----------------------------------------------------------------------- @@ -435,11 +508,12 @@ dispatchHandler q @ KQuery {..} addr = do -- peer B fork too many threads -- ... space leak -- -handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () +handleQuery :: MonadKRPC h m => KQueryArgs -> KQuery -> SockAddr -> m () handleQuery raw q addr = void $ fork $ do myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" Manager {..} <- getManager res <- dispatchHandler q addr +#ifdef VERSION_bencoding let resbe = either toBEncode toBEncode res $(logOther "q") $ T.unlines [ either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode raw) @@ -447,21 +521,36 @@ handleQuery raw q addr = void $ fork $ do , either (const "") id $ T.decodeUtf8' (BL.toStrict $ showBEncode resbe) ] sendMessage sock addr resbe +#else + -- Errors not sent for Tox. + either (const $ return ()) (sendMessage sock addr . S.encode) res +#endif -handleResponse :: MonadKRPC h m => BValue -> KResult -> SockAddr -> m () +handleResponse :: MonadKRPC h m => KQueryArgs -> KResult -> SockAddr -> m () handleResponse raw result addr = do Manager {..} <- getManager liftIO $ do +#ifdef VERSION_bencoding let resultId = either errorId respId result +#else + let resultId = either Tox.msgNonce Tox.msgNonce result +#endif mcall <- unregisterQuery (resultId, addr) pendingCalls case mcall of Nothing -> return () Just ares -> putMVar ares (raw,result) -handleMessage :: MonadKRPC h m => BValue -> KMessage -> SockAddr -> m () +#ifdef VERSION_bencoding +handleMessage :: MonadKRPC h m => KQueryArgs -> KMessage -> SockAddr -> m () handleMessage raw (Q q) = handleQuery raw q handleMessage raw (R r) = handleResponse raw (Right r) handleMessage raw (E e) = handleResponse raw (Left e) +#else +handleMessage :: MonadKRPC h m => KQueryArgs -> Tox.Message BC.ByteString -> SockAddr -> m () +handleMessage raw q | Tox.isQuery q = handleQuery raw q +handleMessage raw r | Tox.isResponse r = handleResponse raw (Right r) +handleMessage raw e | Tox.isError e = handleResponse raw (Left e) +#endif listener :: MonadKRPC h m => m () listener = do @@ -469,9 +558,17 @@ listener = do fix $ \again -> do (bs, addr) <- liftIO $ do handle exceptions $ BS.recvFrom sock (optMaxMsgSize options) +#ifdef VERSION_bencoding case BE.parse bs >>= \r -> (,) r <$> BE.decode bs of +#else + case return bs >>= \r -> (,) r <$> decode bs of +#endif -- TODO ignore unknown messages at all? +#ifdef VERSION_bencoding Left e -> liftIO $ sendMessage sock addr $ unknownMessage e +#else + Left _ -> return () -- TODO TOX send unknownMessage error +#endif Right (raw,m) -> handleMessage raw m addr again where -- cgit v1.2.3