From 93e51f29cc6c25455a5f91dbc1c3e678922523fd Mon Sep 17 00:00:00 2001 From: Joe Crayne Date: Sun, 2 Dec 2018 16:36:03 -0500 Subject: TCP cache: Use (Down POSIXTime) for MinMax priority. --- src/Network/QueryResponse/TCP.hs | 42 +++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 24 deletions(-) (limited to 'src/Network/QueryResponse') diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 7efe6966..c0bdcd3c 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs @@ -14,6 +14,7 @@ import Control.Monad import Data.ByteString (ByteString,hPut) import Data.Function import Data.Hashable +import Data.Ord import Data.Time.Clock.POSIX import Data.Word import Network.BSD @@ -26,7 +27,7 @@ import qualified Data.MinMaxPSQ as MM import Network.QueryResponse data TCPSession st = TCPSession - { tcpHandle :: MVar Handle + { tcpHandle :: Handle , tcpState :: st , tcpThread :: ThreadId } @@ -41,14 +42,14 @@ instance Hashable TCPAddress where _ -> 0 data TCPCache st = TCPCache - { lru :: TVar (MM.MinMaxPSQ' TCPAddress POSIXTime (TCPSession st)) + { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) , tcpMax :: Int } data SessionProtocol x y = SessionProtocol - { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. - , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. - , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. + { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. + , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. + , streamEncode :: y -> IO () -- ^ Serialize outbound messages. } data StreamHandshake addr x y = StreamHandshake @@ -71,9 +72,8 @@ acquireConnection mvar tcpcache stream addr = do h <- socketToHandle sock ReadWriteMode st <- streamHello stream addr h t <- getPOSIXTime - mh <- newMVar h rthread <- forkIO $ fix $ \loop -> do - x <- streamDecode st h + x <- streamDecode st putMVar mvar $ fmap (\u -> Right (u, addr)) x case x of Just _ -> loop @@ -84,41 +84,35 @@ acquireConnection mvar tcpcache stream addr = do let showAddr a = show (streamAddr stream a) labelThread rthread ("tcp:"++showAddr addr) let v = TCPSession - { tcpHandle = mh + { tcpHandle = h , tcpState = st , tcpThread = rthread } let (retires,cache') = MM.takeView (tcpMax tcpcache) - $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache + $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do myThreadId >>= flip labelThread ("tcp-close:"++show k) killThread (tcpThread r) - h <- takeMVar (tcpHandle r) - streamGoodbye st h - hClose h + streamGoodbye st + hClose (tcpHandle r) atomically $ writeTVar (lru tcpcache) cache' - return $ Just $ \y -> do - bs <- streamEncode st y - withMVar mh (`hPut` bs) + return $ Just $ streamEncode st Just (tm,v) -> do t <- getPOSIXTime - let TCPSession { tcpHandle = mh, tcpState = st } = v - cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache + let TCPSession { tcpHandle = h, tcpState = st } = v + cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache atomically $ writeTVar (lru tcpcache) cache' - return $ Just $ \y -> do - bs <- streamEncode st y - withMVar mh (`hPut` bs) + return $ Just $ streamEncode st closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () closeAll tcpcache stream = do - cache <- atomically $ readTVar (lru tcpcache) + cache <- atomically $ swapTVar (lru tcpcache) MM.empty forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do let st = tcpState r killThread (tcpThread r) - h <- takeMVar $ tcpHandle r - streamGoodbye st h - hClose h + streamGoodbye st + hClose (tcpHandle r) tcpTransport :: Int -- ^ maximum number of TCP links to maintain. -> StreamHandshake addr x y -- cgit v1.2.3