From e64a0efd2ca29257c189343b6dc75f6bee29d66d Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sun, 2 Dec 2018 15:53:35 -0500 Subject: Tox TCP client transport. --- src/Crypto/Tox.hs | 2 +- src/Network/QueryResponse/TCP.hs | 57 +++++++++++++++------------ src/Network/Tox/TCP.hs | 85 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 26 deletions(-) create mode 100644 src/Network/Tox/TCP.hs diff --git a/src/Crypto/Tox.hs b/src/Crypto/Tox.hs index a7f1ae83..1b3d5e5c 100644 --- a/src/Crypto/Tox.hs +++ b/src/Crypto/Tox.hs @@ -169,7 +169,7 @@ authAndBytes (Encrypted bs) = (auth,bs') -- independently of the value, or the length depends on the value. data Size a = VarSize (a -> Int) - | ConstSize !Int + | ConstSize { constSize :: !Int } deriving Typeable instance Contravariant Size where diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 83ae367f..7efe6966 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs @@ -45,77 +45,84 @@ data TCPCache st = TCPCache , tcpMax :: Int } -data StreamTransform st x y = StreamTransform - { streamHello :: Handle -> IO st -- ^ "Hello" protocol upon fresh connection. - , streamGoodbye :: st -> Handle -> IO () -- ^ "Goodbye" protocol upon termination. - , streamDecode :: st -> Handle -> IO (Maybe x) -- ^ Parse inbound messages. - , streamEncode :: st -> y -> IO ByteString -- ^ Serialize outbound messages. +data SessionProtocol x y = SessionProtocol + { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. + , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. + , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. } -acquireConnection :: MVar (Maybe (Either a (x, SockAddr))) - -> TCPCache st - -> StreamTransform st x y - -> SockAddr +data StreamHandshake addr x y = StreamHandshake + { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. + , streamAddr :: addr -> SockAddr + } + +acquireConnection :: MVar (Maybe (Either a (x, addr))) + -> TCPCache (SessionProtocol x y) + -> StreamHandshake addr x y + -> addr -> IO (Maybe (y -> IO ())) acquireConnection mvar tcpcache stream addr = do cache <- atomically $ readTVar (lru tcpcache) - case MM.lookup' (TCPAddress addr) cache of + case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of Nothing -> do proto <- getProtocolNumber "tcp" - sock <- socket (socketFamily addr) Stream proto - connect sock addr `catchIOError` (\e -> close sock) + sock <- socket (socketFamily $ streamAddr stream addr) Stream proto + connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) h <- socketToHandle sock ReadWriteMode - st <- streamHello stream h + st <- streamHello stream addr h t <- getPOSIXTime mh <- newMVar h rthread <- forkIO $ fix $ \loop -> do - x <- streamDecode stream st h + x <- streamDecode st h putMVar mvar $ fmap (\u -> Right (u, addr)) x case x of Just _ -> loop Nothing -> do - atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress addr) + atomically $ modifyTVar' (lru tcpcache) + $ MM.delete (TCPAddress $ streamAddr stream addr) hClose h - labelThread rthread ("tcp:"++show addr) + let showAddr a = show (streamAddr stream a) + labelThread rthread ("tcp:"++showAddr addr) let v = TCPSession { tcpHandle = mh , tcpState = st , tcpThread = rthread } - let (retires,cache') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress addr) v t cache + let (retires,cache') = MM.takeView (tcpMax tcpcache) + $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do myThreadId >>= flip labelThread ("tcp-close:"++show k) killThread (tcpThread r) h <- takeMVar (tcpHandle r) - streamGoodbye stream st h + streamGoodbye st h hClose h atomically $ writeTVar (lru tcpcache) cache' return $ Just $ \y -> do - bs <- streamEncode stream st y + bs <- streamEncode st y withMVar mh (`hPut` bs) Just (tm,v) -> do t <- getPOSIXTime let TCPSession { tcpHandle = mh, tcpState = st } = v - cache' = MM.insert' (TCPAddress addr) v t cache + cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache atomically $ writeTVar (lru tcpcache) cache' return $ Just $ \y -> do - bs <- streamEncode stream st y + bs <- streamEncode st y withMVar mh (`hPut` bs) -closeAll :: TCPCache st -> StreamTransform st x y -> IO () +closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () closeAll tcpcache stream = do cache <- atomically $ readTVar (lru tcpcache) forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do let st = tcpState r killThread (tcpThread r) h <- takeMVar $ tcpHandle r - streamGoodbye stream st h + streamGoodbye st h hClose h tcpTransport :: Int -- ^ maximum number of TCP links to maintain. - -> StreamTransform st x y - -> IO (TransportA err SockAddr x y) + -> StreamHandshake addr x y + -> IO (TransportA err addr x y) tcpTransport maxcon stream = do msgvar <- newEmptyMVar tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs new file mode 100644 index 00000000..0780f121 --- /dev/null +++ b/src/Network/Tox/TCP.hs @@ -0,0 +1,85 @@ +{-# LANGUAGE RecursiveDo #-} +{-# LANGUAGE PartialTypeSignatures #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE FlexibleContexts #-} +module Network.Tox.TCP where + +import Control.Concurrent +import Control.Concurrent.STM +import Data.Functor.Identity +import Data.Serialize +import System.IO (Handle) + +import Crypto.Tox +import Data.ByteString (hPut,hGet,ByteString) +import Data.Tox.Relay +import Network.Address (setPort,PortNumber,SockAddr) +import Network.QueryResponse +import Network.QueryResponse.TCP +import qualified Network.Tox.NodeId as UDP + + +withSize :: Sized x => (Size x -> m (p x)) -> m (p x) +withSize f = case size of len -> f len + +data NodeInfo = NodeInfo + { udpNodeInfo :: UDP.NodeInfo + , tcpPort :: PortNumber + } + +type NodeId = UDP.NodeId + +nodeId :: NodeInfo -> NodeId +nodeId ni = UDP.nodeId $ udpNodeInfo ni + +nodeAddr :: NodeInfo -> SockAddr +nodeAddr ni = setPort (tcpPort ni) $ UDP.nodeAddr $ udpNodeInfo ni + +tcpStream :: (Serialize y, Sized y, Serialize x, Sized x) => + TransportCrypto -> StreamHandshake NodeInfo x y +tcpStream crypto = StreamHandshake + { streamHello = \addr h -> do + (skey, hello) <- atomically $ do + n24 <- transportNewNonce crypto + skey <- transportNewKey crypto + base24 <- transportNewNonce crypto + return $ (,) skey $ Hello $ Asymm + { senderKey = transportPublic crypto + , asymmNonce = n24 + , asymmData = pure HelloData + { sessionPublicKey = toPublic $ skey + , sessionBaseNonce = base24 + } + } + noncef <- lookupNonceFunction crypto (transportSecret crypto) (UDP.id2key $ nodeId addr) + hPut h $ encode $ encryptPayload (noncef $ helloNonce hello) hello + welcomeE <- withSize $ fmap decode . hGet h . constSize + let Right welcome = welcomeE >>= \w -> decryptPayload (noncef $ welcomeNonce w) w + noncef' <- lookupNonceFunction crypto skey (sessionPublicKey $ runIdentity $ welcomeData welcome) + nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello) + nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) + let them = sessionPublicKey $ runIdentity $ welcomeData welcome + return SessionProtocol + { streamGoodbye = \h -> return () -- No goodbye packet? Seems rude. + , streamDecode = \h -> do + decode <$> hGet h 2 >>= \case + Left _ -> return Nothing + Right len -> do + decode <$> hGet h len >>= \case + Left _ -> return Nothing + Right x -> do + n24 <- takeMVar nread + let r = decrypt (noncef' n24) x >>= decodePlain + putMVar nread (incrementNonce24 n24) + return $ either (const Nothing) Just r + , streamEncode = \y -> do + n24 <- takeMVar nsend + let bs = encode $ encrypt (noncef' n24) $ encodePlain y + putMVar nsend (incrementNonce24 n24) + return bs -- XXX: Should we wait until this bytestring is sent before putting the nonce back in the MVar? + } + , streamAddr = nodeAddr + } + +toxTCP :: TransportCrypto -> IO (Transport err NodeInfo RelayPacket) +toxTCP crypto = tcpTransport 30 (tcpStream crypto) -- cgit v1.2.3