summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-12-02 16:36:03 -0500
committerJoe Crayne <joe@jerkface.net>2018-12-16 14:08:26 -0500
commit93e51f29cc6c25455a5f91dbc1c3e678922523fd (patch)
treed9911cdc48cf4de510fcfc5693def6cb9d1b6e81 /src/Network/QueryResponse
parente64a0efd2ca29257c189343b6dc75f6bee29d66d (diff)
TCP cache: Use (Down POSIXTime) for MinMax priority.
Diffstat (limited to 'src/Network/QueryResponse')
-rw-r--r--src/Network/QueryResponse/TCP.hs42
1 files changed, 18 insertions, 24 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 7efe6966..c0bdcd3c 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -14,6 +14,7 @@ import Control.Monad
14import Data.ByteString (ByteString,hPut) 14import Data.ByteString (ByteString,hPut)
15import Data.Function 15import Data.Function
16import Data.Hashable 16import Data.Hashable
17import Data.Ord
17import Data.Time.Clock.POSIX 18import Data.Time.Clock.POSIX
18import Data.Word 19import Data.Word
19import Network.BSD 20import Network.BSD
@@ -26,7 +27,7 @@ import qualified Data.MinMaxPSQ as MM
26import Network.QueryResponse 27import Network.QueryResponse
27 28
28data TCPSession st = TCPSession 29data TCPSession st = TCPSession
29 { tcpHandle :: MVar Handle 30 { tcpHandle :: Handle
30 , tcpState :: st 31 , tcpState :: st
31 , tcpThread :: ThreadId 32 , tcpThread :: ThreadId
32 } 33 }
@@ -41,14 +42,14 @@ instance Hashable TCPAddress where
41 _ -> 0 42 _ -> 0
42 43
43data TCPCache st = TCPCache 44data TCPCache st = TCPCache
44 { lru :: TVar (MM.MinMaxPSQ' TCPAddress POSIXTime (TCPSession st)) 45 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
45 , tcpMax :: Int 46 , tcpMax :: Int
46 } 47 }
47 48
48data SessionProtocol x y = SessionProtocol 49data SessionProtocol x y = SessionProtocol
49 { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. 50 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
50 , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. 51 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
51 , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. 52 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
52 } 53 }
53 54
54data StreamHandshake addr x y = StreamHandshake 55data StreamHandshake addr x y = StreamHandshake
@@ -71,9 +72,8 @@ acquireConnection mvar tcpcache stream addr = do
71 h <- socketToHandle sock ReadWriteMode 72 h <- socketToHandle sock ReadWriteMode
72 st <- streamHello stream addr h 73 st <- streamHello stream addr h
73 t <- getPOSIXTime 74 t <- getPOSIXTime
74 mh <- newMVar h
75 rthread <- forkIO $ fix $ \loop -> do 75 rthread <- forkIO $ fix $ \loop -> do
76 x <- streamDecode st h 76 x <- streamDecode st
77 putMVar mvar $ fmap (\u -> Right (u, addr)) x 77 putMVar mvar $ fmap (\u -> Right (u, addr)) x
78 case x of 78 case x of
79 Just _ -> loop 79 Just _ -> loop
@@ -84,41 +84,35 @@ acquireConnection mvar tcpcache stream addr = do
84 let showAddr a = show (streamAddr stream a) 84 let showAddr a = show (streamAddr stream a)
85 labelThread rthread ("tcp:"++showAddr addr) 85 labelThread rthread ("tcp:"++showAddr addr)
86 let v = TCPSession 86 let v = TCPSession
87 { tcpHandle = mh 87 { tcpHandle = h
88 , tcpState = st 88 , tcpState = st
89 , tcpThread = rthread 89 , tcpThread = rthread
90 } 90 }
91 let (retires,cache') = MM.takeView (tcpMax tcpcache) 91 let (retires,cache') = MM.takeView (tcpMax tcpcache)
92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache 92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache
93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
94 myThreadId >>= flip labelThread ("tcp-close:"++show k) 94 myThreadId >>= flip labelThread ("tcp-close:"++show k)
95 killThread (tcpThread r) 95 killThread (tcpThread r)
96 h <- takeMVar (tcpHandle r) 96 streamGoodbye st
97 streamGoodbye st h 97 hClose (tcpHandle r)
98 hClose h
99 atomically $ writeTVar (lru tcpcache) cache' 98 atomically $ writeTVar (lru tcpcache) cache'
100 99
101 return $ Just $ \y -> do 100 return $ Just $ streamEncode st
102 bs <- streamEncode st y
103 withMVar mh (`hPut` bs)
104 Just (tm,v) -> do 101 Just (tm,v) -> do
105 t <- getPOSIXTime 102 t <- getPOSIXTime
106 let TCPSession { tcpHandle = mh, tcpState = st } = v 103 let TCPSession { tcpHandle = h, tcpState = st } = v
107 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache 104 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache
108 atomically $ writeTVar (lru tcpcache) cache' 105 atomically $ writeTVar (lru tcpcache) cache'
109 return $ Just $ \y -> do 106 return $ Just $ streamEncode st
110 bs <- streamEncode st y
111 withMVar mh (`hPut` bs)
112 107
113closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () 108closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
114closeAll tcpcache stream = do 109closeAll tcpcache stream = do
115 cache <- atomically $ readTVar (lru tcpcache) 110 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
116 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 111 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
117 let st = tcpState r 112 let st = tcpState r
118 killThread (tcpThread r) 113 killThread (tcpThread r)
119 h <- takeMVar $ tcpHandle r 114 streamGoodbye st
120 streamGoodbye st h 115 hClose (tcpHandle r)
121 hClose h
122 116
123tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 117tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
124 -> StreamHandshake addr x y 118 -> StreamHandshake addr x y