From 3dae31030c20ed9ad831dfba88db781ebe71ca54 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Thu, 27 Dec 2018 19:52:08 -0500 Subject: TCP-related Debug info. --- src/Network/QueryResponse.hs | 1 + src/Network/QueryResponse/TCP.hs | 36 +++++++++++++++++++++++------------- src/Network/Tox/TCP.hs | 24 +++++++++++++++--------- 3 files changed, 39 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index 0fbbc929..13160d31 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs @@ -198,6 +198,7 @@ forkListener name client = do thread_id <- forkIO $ do myThreadId >>= flip labelThread ("listener."++name) fix $ awaitMessage client . const + dput XMisc $ "Listener died: " ++ name return $ do closeTransport client killThread thread_id diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index a606c51d..bad61727 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs @@ -9,6 +9,7 @@ import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif +import Control.Arrow import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString,hPut) @@ -39,7 +40,7 @@ data TCPSession st } newtype TCPAddress = TCPAddress SockAddr - deriving (Eq,Ord) + deriving (Eq,Ord,Show) instance Hashable TCPAddress where hashWithSalt salt (TCPAddress x) = case x of @@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO () killSession PendingTCPSession = return () killSession TCPSession{tcpThread=t} = killThread t +showStat r = case r of PendingTCPSession -> "pending." + TCPSession {} -> "established." + acquireConnection :: MVar (Maybe (Either a (x, addr))) -> TCPCache (SessionProtocol x y) -> StreamHandshake addr x y @@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) -> IO (Maybe (y -> IO ())) acquireConnection mvar tcpcache stream addr bDoCon = do now <- getPOSIXTime + -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) entry <- atomically $ do c <- readTVar (lru tcpcache) let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c @@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | otherwise -> return () Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) return v + -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) case entry of Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do proto <- getProtocolNumber "tcp" @@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do ret <- fmap join $ forM mh $ \h -> do st <- streamHello stream addr h dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) - rthread <- forkIO $ fix $ \loop -> do + signal <- newTVarIO False + rthread <- forkIO $ do + atomically (readTVar signal >>= check) + fix $ \loop -> do x <- streamDecode st dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x case x of Just u -> do m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) when (isNothing m) $ do - dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." + dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." + tryTakeMVar mvar + return () loop Nothing -> do dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) - atomically $ modifyTVar' (lru tcpcache) + do atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) c <- atomically $ readTVar (lru tcpcache) now <- getPOSIXTime forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do - let stat = case r of - PendingTCPSession -> "pending." - TCPSession {} -> "established." - dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] + dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] hClose h let showAddr a = show (streamAddr stream a) labelThread rthread ("tcp:"++showAddr addr) @@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do let (rs,c') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c writeTVar (lru tcpcache) c' + writeTVar signal True return rs forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do myThreadId >>= flip labelThread ("tcp-close:"++show k) @@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do when (isNothing ret) $ do atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) return ret - Just (tm, PendingTCPSession) -> do - fmap join $ timeout 10000000 $ atomically $ do + Just (tm, PendingTCPSession) + | not bDoCon -> return Nothing + | otherwise -> fmap join $ timeout 10000000 $ atomically $ do c <- readTVar (lru tcpcache) let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c case v of @@ -166,17 +176,17 @@ closeAll tcpcache stream = do tcpTransport :: Int -- ^ maximum number of TCP links to maintain. -> StreamHandshake addr x y - -> IO (TransportA err addr x (Bool,y)) + -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) tcpTransport maxcon stream = do msgvar <- newEmptyMVar tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) - return Transport + return $ (,) tcpcache Transport { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) , sendMessage = \addr (bDoCon,y) -> do t <- forkIO $ do msock <- acquireConnection msgvar tcpcache stream addr bDoCon mapM_ ($ y) msock - `catchIOError` \e -> return () + `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e labelThread t "tcp-send" , closeTransport = closeAll tcpcache stream } diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index 36200586..adb42514 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs @@ -132,18 +132,22 @@ tcpStream crypto = StreamHandshake dput XTCP $ "TCP exception: " ++ show e return Nothing , streamEncode = \y -> do + dput XTCP $ "TCP(acquire nonce):" ++ show addr ++ " <-- " ++ show y n24 <- takeMVar nsend - dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y + dput XTCP $ "TCP(got nonce):" ++ show addr ++ " <-- " ++ show y let bs = encode $ encrypt (noncef' n24) $ encodePlain y ($ h) -- bracket (takeMVar hvar) (putMVar hvar) $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e + dput XTCP $ "TCP(incrementing nonce): " ++ show addr ++ " <-- " ++ show y putMVar nsend (incrementNonce24 n24) + dput XTCP $ "TCP(finished): " ++ show addr ++ " <-- " ++ show y } , streamAddr = nodeAddr } -toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket)) +toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol RelayPacket RelayPacket) + , TransportA err NodeInfo RelayPacket (Bool,RelayPacket) ) toxTCP crypto = tcpTransport 30 (tcpStream crypto) tcpSpace :: KademliaSpace NodeId NodeInfo @@ -267,13 +271,14 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) newClient :: TransportCrypto -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query -> (a -> RelayPacket -> IO void) -- ^ load mvar for query - -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) + -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) + , TCPCache (SessionProtocol RelayPacket RelayPacket) ) , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) newClient crypto store load = do - net <- toxTCP crypto + (tcpcache,net) <- toxTCP crypto drg <- drgNew map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) - return $ (,) map_var Client + return $ (,) (map_var,tcpcache) Client { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net , clientDispatcher = DispatchMethods { classifyInbound = (. snd) $ \case @@ -284,10 +289,11 @@ newClient crypto store load = do OOBRecv k bs -> IsUnsolicited $ handleOOB k bs wut -> IsUnknown (show wut) , lookupHandler = \case - PingPacket -> Just MethodHandler - { methodParse = \(_,RelayPing n8) -> Right () - , methodSerialize = \n8 src dst () -> (False, RelayPong n8) - , methodAction = \src () -> return () + PingPacket -> trace ("tcp-received-ping") $ Just MethodHandler + { methodParse = \case (_,RelayPing n8) -> Right () + _ -> trace ("tcp-non-ping") $ Left "TCP: Non-ping?" + , methodSerialize = \n8 src dst () -> trace ("tcp-made-pong-"++show n8) (False, RelayPong n8) + , methodAction = \src () -> dput XTCP $ "TCP pinged by "++show src } w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a -- cgit v1.2.3