From 37eb345d1ffb40e814766b6df8134ca21e6987a7 Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Mon, 17 Dec 2018 00:47:42 -0500 Subject: tcp: this seems to work. --- src/Network/QueryResponse/TCP.hs | 147 ++++++++++++++++++++++++--------------- 1 file changed, 92 insertions(+), 55 deletions(-) (limited to 'src/Network/QueryResponse') diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index efeab305..a606c51d 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs @@ -14,6 +14,7 @@ import Control.Monad import Data.ByteString (ByteString,hPut) import Data.Function import Data.Hashable +import Data.Maybe import Data.Ord import Data.Time.Clock.POSIX import Data.Word @@ -29,11 +30,13 @@ import Connection.Tcp (socketFamily) import qualified Data.MinMaxPSQ as MM import Network.QueryResponse -data TCPSession st = TCPSession - { tcpHandle :: Handle - , tcpState :: st - , tcpThread :: ThreadId - } +data TCPSession st + = PendingTCPSession + | TCPSession + { tcpHandle :: Handle + , tcpState :: st + , tcpThread :: ThreadId + } newtype TCPAddress = TCPAddress SockAddr deriving (Eq,Ord) @@ -60,6 +63,10 @@ data StreamHandshake addr x y = StreamHandshake , streamAddr :: addr -> SockAddr } +killSession :: TCPSession st -> IO () +killSession PendingTCPSession = return () +killSession TCPSession{tcpThread=t} = killThread t + acquireConnection :: MVar (Maybe (Either a (x, addr))) -> TCPCache (SessionProtocol x y) -> StreamHandshake addr x y @@ -67,65 +74,95 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) -> Bool -> IO (Maybe (y -> IO ())) acquireConnection mvar tcpcache stream addr bDoCon = do - cache <- atomically $ readTVar (lru tcpcache) - case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of + now <- getPOSIXTime + entry <- atomically $ do + c <- readTVar (lru tcpcache) + let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c + case v of + Nothing | bDoCon -> writeTVar (lru tcpcache) + $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c + | otherwise -> return () + Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) + return v + case entry of Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do proto <- getProtocolNumber "tcp" - mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto - connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) - h <- socketToHandle sock ReadWriteMode - return $ Just h) + mh <- catchIOError (do h <- timeout 10000000 $ do + sock <- socket (socketFamily $ streamAddr stream addr) Stream proto + connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) + h <- socketToHandle sock ReadWriteMode + hSetBuffering h NoBuffering + return h + return h) $ \e -> return Nothing - fmap join $ forM mh $ \h -> do - st <- streamHello stream addr h - t <- getPOSIXTime - rthread <- forkIO $ fix $ \loop -> do - x <- streamDecode st - case x of - Just u -> do - timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) - loop - Nothing -> do - dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) - atomically $ modifyTVar' (lru tcpcache) - $ MM.delete (TCPAddress $ streamAddr stream addr) - hClose h - let showAddr a = show (streamAddr stream a) - labelThread rthread ("tcp:"++showAddr addr) - let v = TCPSession - { tcpHandle = h - , tcpState = st - , tcpThread = rthread - } - retires <- atomically $ do - c <- readTVar (lru tcpcache) - let (rs,c') = MM.takeView (tcpMax tcpcache) - $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c - writeTVar (lru tcpcache) c' - return rs - forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do - myThreadId >>= flip labelThread ("tcp-close:"++show k) - dput XTCP $ "TCP dropped: " ++ show k - killThread (tcpThread r) - streamGoodbye st - hClose (tcpHandle r) + ret <- fmap join $ forM mh $ \h -> do + st <- streamHello stream addr h + dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) + rthread <- forkIO $ fix $ \loop -> do + x <- streamDecode st + dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x + case x of + Just u -> do + m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) + when (isNothing m) $ do + dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." + loop + Nothing -> do + dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) + atomically $ modifyTVar' (lru tcpcache) + $ MM.delete (TCPAddress $ streamAddr stream addr) + c <- atomically $ readTVar (lru tcpcache) + now <- getPOSIXTime + forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do + let stat = case r of + PendingTCPSession -> "pending." + TCPSession {} -> "established." + dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] + hClose h + let showAddr a = show (streamAddr stream a) + labelThread rthread ("tcp:"++showAddr addr) + let v = TCPSession + { tcpHandle = h + , tcpState = st + , tcpThread = rthread + } + t <- getPOSIXTime + retires <- atomically $ do + c <- readTVar (lru tcpcache) + let (rs,c') = MM.takeView (tcpMax tcpcache) + $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c + writeTVar (lru tcpcache) c' + return rs + forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do + myThreadId >>= flip labelThread ("tcp-close:"++show k) + dput XTCP $ "TCP dropped: " ++ show k + killSession r + case r of TCPSession {tcpState=st,tcpHandle=h} -> do + streamGoodbye st + hClose h + _ -> return () - return $ Just $ streamEncode st - Just (tm,v) -> do - t <- getPOSIXTime - let TCPSession { tcpHandle = h, tcpState = st } = v - cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache - atomically $ writeTVar (lru tcpcache) cache' - return $ Just $ streamEncode st + return $ Just $ streamEncode st + when (isNothing ret) $ do + atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) + return ret + Just (tm, PendingTCPSession) -> do + fmap join $ timeout 10000000 $ atomically $ do + c <- readTVar (lru tcpcache) + let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c + case v of + Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st + Nothing -> return Nothing + _ -> retry + Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () closeAll tcpcache stream = do cache <- atomically $ swapTVar (lru tcpcache) MM.empty forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do - let st = tcpState r - killThread (tcpThread r) - streamGoodbye st - hClose (tcpHandle r) + killSession r + case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h + _ -> return () tcpTransport :: Int -- ^ maximum number of TCP links to maintain. -> StreamHandshake addr x y -- cgit v1.2.3