From 3dae31030c20ed9ad831dfba88db781ebe71ca54 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Thu, 27 Dec 2018 19:52:08 -0500 Subject: TCP-related Debug info. --- src/Network/QueryResponse/TCP.hs | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) (limited to 'src/Network/QueryResponse/TCP.hs') diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index a606c51d..bad61727 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs @@ -9,6 +9,7 @@ import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif +import Control.Arrow import Control.Concurrent.STM import Control.Monad import Data.ByteString (ByteString,hPut) @@ -39,7 +40,7 @@ data TCPSession st } newtype TCPAddress = TCPAddress SockAddr - deriving (Eq,Ord) + deriving (Eq,Ord,Show) instance Hashable TCPAddress where hashWithSalt salt (TCPAddress x) = case x of @@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO () killSession PendingTCPSession = return () killSession TCPSession{tcpThread=t} = killThread t +showStat r = case r of PendingTCPSession -> "pending." + TCPSession {} -> "established." + acquireConnection :: MVar (Maybe (Either a (x, addr))) -> TCPCache (SessionProtocol x y) -> StreamHandshake addr x y @@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) -> IO (Maybe (y -> IO ())) acquireConnection mvar tcpcache stream addr bDoCon = do now <- getPOSIXTime + -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) entry <- atomically $ do c <- readTVar (lru tcpcache) let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c @@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | otherwise -> return () Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) return v + -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) case entry of Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do proto <- getProtocolNumber "tcp" @@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do ret <- fmap join $ forM mh $ \h -> do st <- streamHello stream addr h dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) - rthread <- forkIO $ fix $ \loop -> do + signal <- newTVarIO False + rthread <- forkIO $ do + atomically (readTVar signal >>= check) + fix $ \loop -> do x <- streamDecode st dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x case x of Just u -> do m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) when (isNothing m) $ do - dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." + dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." + tryTakeMVar mvar + return () loop Nothing -> do dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) - atomically $ modifyTVar' (lru tcpcache) + do atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) c <- atomically $ readTVar (lru tcpcache) now <- getPOSIXTime forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do - let stat = case r of - PendingTCPSession -> "pending." - TCPSession {} -> "established." - dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] + dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] hClose h let showAddr a = show (streamAddr stream a) labelThread rthread ("tcp:"++showAddr addr) @@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do let (rs,c') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c writeTVar (lru tcpcache) c' + writeTVar signal True return rs forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do myThreadId >>= flip labelThread ("tcp-close:"++show k) @@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do when (isNothing ret) $ do atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) return ret - Just (tm, PendingTCPSession) -> do - fmap join $ timeout 10000000 $ atomically $ do + Just (tm, PendingTCPSession) + | not bDoCon -> return Nothing + | otherwise -> fmap join $ timeout 10000000 $ atomically $ do c <- readTVar (lru tcpcache) let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c case v of @@ -166,17 +176,17 @@ closeAll tcpcache stream = do tcpTransport :: Int -- ^ maximum number of TCP links to maintain. -> StreamHandshake addr x y - -> IO (TransportA err addr x (Bool,y)) + -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) tcpTransport maxcon stream = do msgvar <- newEmptyMVar tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) - return Transport + return $ (,) tcpcache Transport { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) , sendMessage = \addr (bDoCon,y) -> do t <- forkIO $ do msock <- acquireConnection msgvar tcpcache stream addr bDoCon mapM_ ($ y) msock - `catchIOError` \e -> return () + `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e labelThread t "tcp-send" , closeTransport = closeAll tcpcache stream } -- cgit v1.2.3