From 59aa0062c15610015a6bce077be5da1d3ed34019 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Fri, 30 Nov 2018 01:58:43 -0500 Subject: More work on TCP relay. --- examples/toxrelay.hs | 173 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 151 insertions(+), 22 deletions(-) (limited to 'examples') diff --git a/examples/toxrelay.hs b/examples/toxrelay.hs index fdf0c011..f03605f9 100644 --- a/examples/toxrelay.hs +++ b/examples/toxrelay.hs @@ -1,20 +1,30 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} +import Control.Concurrent.MVar +import Control.Concurrent.STM import Control.Exception import Control.Monad -import Control.Concurrent.STM import qualified Data.ByteString as B import Data.Function -import qualified Data.IntMap as IntMap - ;import Data.IntMap (IntMap) +import Data.Functor.Identity +import qualified Data.IntMap as IntMap + ;import Data.IntMap (IntMap) +import qualified Data.Map as Map + ;import Data.Map (Map) import Data.Serialize +import Data.Word import System.IO -import Data.Functor.Identity +import System.IO.Error +import System.Timeout import Crypto.Tox +import qualified Data.IntervalSet as IntSet + ;import Data.IntervalSet (IntSet) import Data.Tox.Relay -import Network.StreamServer import Network.Address (getBindAddress) +import Network.StreamServer import Network.Tox (newCrypto) @@ -22,16 +32,49 @@ import Network.Tox (newCrypto) hGetPrefixed :: Serialize a => Handle -> IO (Either String a) hGetPrefixed h = do mlen <- runGet getWord16be <$> B.hGet h 2 + -- We treat parse-fail the same as EOF. fmap join $ forM mlen $ \len -> runGet get <$> B.hGet h (fromIntegral len) hGetSized :: forall x. (Sized x, Serialize x) => Handle -> IO (Either String x) -hGetSized h = runGet get <$> B.hGet h len +hGetSized h = runGet get <$> B.hGet h len -- We treat parse-fail the same as EOF. where ConstSize len = size :: Size x -relaySession :: TransportCrypto -> TVar (IntMap Handle) -> (RelayPacket -> IO ()) -> sock -> Int -> Handle -> IO () -relaySession crypto cons dispatch _ conid h = do - atomically $ modifyTVar' cons $ IntMap.insert conid h +data RelaySession = RelaySession + { indexPool :: IntSet -- ^ Ints that are either solicited or associated. + , solicited :: Map PublicKey Int -- ^ Reserved ids, not yet in associated. + , associated :: IntMap (RelayPacket -> IO ()) -- ^ Peers this session is connected to. + } + +freshSession :: RelaySession +freshSession = RelaySession + { indexPool = IntSet.empty + , solicited = Map.empty + , associated = IntMap.empty + } + +disconnect :: TVar (Map PublicKey (RelayPacket -> IO (),TVar RelaySession)) + -> PublicKey + -> IO () +disconnect cons who = join $ atomically $ do + Map.lookup who <$> readTVar cons + >>= \case + Nothing -> return $ return () + Just (_,session) -> do + modifyTVar' cons $ Map.delete who + RelaySession { associated = cs } <- readTVar session + return $ let notifyPeer i send = (send (DisconnectNotification $ key2c i) >>) + in IntMap.foldrWithKey notifyPeer (return ()) cs + +relaySession :: TransportCrypto + -> TVar (Map PublicKey (RelayPacket -> IO (),TVar RelaySession)) + -> sock + -> Int + -> Handle + -> IO () +relaySession crypto cons _ conid h = do + -- atomically $ modifyTVar' cons $ IntMap.insert conid h + -- mhello <- fmap (>>= \h -> decryptPayload (computeSharedSecret me (helloFrom h) (helloNonce h)) h) $ hGetSized h (hGetSized h >>=) $ mapM_ $ \helloE -> do @@ -41,38 +84,124 @@ relaySession crypto cons dispatch _ conid h = do noncef <- lookupNonceFunction crypto me them let mhello = decryptPayload (noncef $ helloNonce helloE) helloE - forM_ mhello $ \hello -> do + let _ = hello :: Hello Identity (me',welcome) <- atomically $ do skey <- transportNewKey crypto dta <- HelloData (toPublic skey) <$> transportNewNonce crypto w24 <- transportNewNonce crypto return (skey, Welcome w24 $ pure dta) + B.hPut h $ encode $ encryptPayload (noncef $ welcomeNonce welcome) welcome - let them' = sessionPublicKey (runIdentity $ helloData hello) - noncef' <- lookupNonceFunction crypto me' them' + noncef' <- let them' = sessionPublicKey (runIdentity $ helloData hello) + in lookupNonceFunction crypto me' them' + + sendPacket <- do + v <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) + return $ \p -> do + n24 <- takeMVar v + let bs = encode $ encrypt (noncef' n24) $ encodePlain (p :: RelayPacket) + do B.hPut h $ encode (fromIntegral (B.length bs) :: Word16) + B.hPut h bs + `catchIOError` \_ -> return () + putMVar v (incrementNonce24 n24) + + let readPacket n24 = (>>= decrypt (noncef' n24) >=> decodePlain) <$> hGetPrefixed h + base = sessionBaseNonce $ runIdentity $ helloData hello + + -- You get 3 seconds to send a session packet. + mpkt0 <- join <$> timeout 3000000 (either (const Nothing) Just <$> readPacket base) + forM_ mpkt0 $ \pkt0 -> do + + disconnect cons (helloFrom hello) + session <- atomically $ do + session <- newTVar freshSession + modifyTVar' cons $ Map.insert (helloFrom hello) (sendPacket,session) + return session + + handlePacket cons (helloFrom hello) sendPacket session pkt0 + + flip fix (incrementNonce24 base) $ \loop n24 -> do + m <- readPacket n24 + forM_ m $ \p -> do + handlePacket cons (helloFrom hello) sendPacket session p + loop (incrementNonce24 n24) + `finally` + disconnect cons (helloFrom hello) + +data R = R { routingRequest :: PublicKey -> IO ConId + , reply :: RelayPacket -> IO () + , routeOOB :: PublicKey -> IO (Maybe (RelayPacket -> IO ())) + } + +handlePacket :: TVar (Map PublicKey (RelayPacket -> IO (), TVar RelaySession)) + -> PublicKey + -> (RelayPacket -> IO ()) + -> TVar RelaySession + -> RelayPacket + -> IO () +handlePacket cons me sendToMe session = \case + RoutingRequest them -> join $ atomically $ do + mySession <- readTVar session + mi <- case Map.lookup them (solicited mySession) of + Nothing -> fmap join $ forM (IntSet.nearestOutsider 0 (indexPool mySession)) $ \i -> do + if -120 <= i && i <= 119 + then do + writeTVar session mySession + { indexPool = IntSet.insert i (indexPool mySession) + , solicited = Map.insert them i (solicited mySession) + } + return $ Just i + else return Nothing -- No more slots available. + Just i -> return $ Just i + notifyConnect <- fmap (join . join) $ forM mi $ \i -> do + mp <- Map.lookup them <$> readTVar cons + forM mp $ \(sendToThem,peer) -> do + theirSession <- readTVar peer + forM (Map.lookup me $ solicited theirSession) $ \reserved_id -> do + writeTVar peer theirSession + { solicited = Map.delete me (solicited theirSession) + , associated = IntMap.insert reserved_id sendToMe (associated theirSession) + } + writeTVar session mySession + { solicited = Map.delete them (solicited mySession) + , associated = IntMap.insert i sendToThem (associated mySession) + } + return $ do sendToThem $ ConnectNotification (key2c reserved_id) + sendToMe $ ConnectNotification (key2c i) + return $ do sendToMe $ RoutingResponse (maybe badcon key2c mi) them + sequence_ notifyConnect - let _ = hello :: Hello Identity - flip fix (sessionBaseNonce $ runIdentity $ helloData hello) $ \loop n24 -> do - m <- (>>= decrypt (noncef' n24) >=> decodePlain) <$> hGetPrefixed h - forM_ m $ \p -> do - dispatch p - loop (incrementNonce24 n24) - `finally` - atomically (modifyTVar' cons $ IntMap.delete conid) + RelayPing x -> sendToMe $ RelayPong x -- TODO x==0 is invalid. Do we care? + + OOBSend them bs -> do + m <- atomically $ Map.lookup them <$> readTVar cons + forM_ m $ \(sendToThem,_) -> sendToThem $ OOBRecv me bs + + RelayData con bs -> join $ atomically $ do + -- Data: Data packets can only be sent and received if the + -- corresponding connection_id is connection (a Connect notification + -- has been received from it) if the server receives a Data packet for + -- a non connected or existent connection it will discard it. + mySession <- readTVar session + return $ sequence_ $ do + i <- c2key con + sendToThem <- IntMap.lookup i $ associated mySession + return $ sendToThem $ RelayData _todo bs + _ -> return () main :: IO () main = do crypto <- newCrypto - cons <- newTVarIO IntMap.empty + cons <- newTVarIO Map.empty a <- getBindAddress "33445" True h <- streamServer ServerConfig { serverWarn = hPutStrLn stderr - , serverSession = relaySession crypto cons print + , serverSession = relaySession crypto cons } a -- cgit v1.2.3