diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-28 17:23:02 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-28 17:26:25 -0500 |
commit | 1e16567904a147b842070c1d98c83dc3e9c00c98 (patch) | |
tree | d3435b1b6ed89de751770b70da38a0e93a13950f /server/src/Network | |
parent | d1eef43fde09b670dfedad9742a70224fcdc941f (diff) |
Diffstat (limited to 'server/src/Network')
-rw-r--r-- | server/src/Network/QueryResponse/TCP.hs | 79 |
1 files changed, 44 insertions, 35 deletions
diff --git a/server/src/Network/QueryResponse/TCP.hs b/server/src/Network/QueryResponse/TCP.hs index 24aacd98..639212cb 100644 --- a/server/src/Network/QueryResponse/TCP.hs +++ b/server/src/Network/QueryResponse/TCP.hs | |||
@@ -71,9 +71,12 @@ data StreamHandshake addr x y = StreamHandshake | |||
71 | , streamAddr :: addr -> SockAddr | 71 | , streamAddr :: addr -> SockAddr |
72 | } | 72 | } |
73 | 73 | ||
74 | killSession :: TCPSession st -> IO () | 74 | killSession :: TCPSession (SessionProtocol x y) -> IO () |
75 | killSession PendingTCPSession = return () | 75 | killSession TCPSession{tcpState=st,tcpHandle=h,tcpThread=t} = do |
76 | killSession TCPSession{tcpThread=t} = killThread t | 76 | catchIOError (streamGoodbye st >> hClose h) |
77 | (\e -> return ()) | ||
78 | killThread t | ||
79 | killSession _ = return () | ||
77 | 80 | ||
78 | showStat :: IsString p => TCPSession st -> p | 81 | showStat :: IsString p => TCPSession st -> p |
79 | showStat r = case r of PendingTCPSession -> "pending." | 82 | showStat r = case r of PendingTCPSession -> "pending." |
@@ -82,6 +85,30 @@ showStat r = case r of PendingTCPSession -> "pending." | |||
82 | tcp_timeout :: Int | 85 | tcp_timeout :: Int |
83 | tcp_timeout = 10000000 | 86 | tcp_timeout = 10000000 |
84 | 87 | ||
88 | |||
89 | removeOnFail :: TCPCache (SessionProtocol x y) -> Handle -> TCPAddress -> IO () -> IO () | ||
90 | removeOnFail tcpcache h addr action = action `catchIOError` \e -> do | ||
91 | join $ atomically $ do | ||
92 | c <- readTVar (lru tcpcache) | ||
93 | case MM.lookup' addr c of | ||
94 | Just (tm, v@TCPSession {tcpHandle=stored}) | h == stored -> do | ||
95 | modifyTVar' (lru tcpcache) $ MM.delete addr | ||
96 | return $ killSession v | ||
97 | _ -> return $ return () | ||
98 | dput XTCP $ "TCP-send " ++ show addr ++ " " ++ show e | ||
99 | |||
100 | lookupSession :: TCPCache st -> POSIXTime -> TCPAddress -> Bool -> STM (Maybe (Down POSIXTime, TCPSession st)) | ||
101 | lookupSession tcpcache now saddr bDoCon = do | ||
102 | c <- readTVar (lru tcpcache) | ||
103 | let v = MM.lookup' saddr c | ||
104 | case v of | ||
105 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
106 | $ MM.insert' saddr PendingTCPSession (Down now) c | ||
107 | | otherwise -> return () | ||
108 | Just (tm, v) -> writeTVar (lru tcpcache) | ||
109 | $ MM.insert' saddr v (Down now) c | ||
110 | return v | ||
111 | |||
85 | acquireConnection :: TMVar (Arrival a addr x) | 112 | acquireConnection :: TMVar (Arrival a addr x) |
86 | -> TCPCache (SessionProtocol x y) | 113 | -> TCPCache (SessionProtocol x y) |
87 | -> StreamHandshake addr x y | 114 | -> StreamHandshake addr x y |
@@ -91,16 +118,8 @@ acquireConnection :: TMVar (Arrival a addr x) | |||
91 | acquireConnection mvar tcpcache stream addr bDoCon = do | 118 | acquireConnection mvar tcpcache stream addr bDoCon = do |
92 | now <- getPOSIXTime | 119 | now <- getPOSIXTime |
93 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | 120 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) |
94 | entry <- atomically $ do | 121 | let saddr = TCPAddress $ streamAddr stream addr |
95 | c <- readTVar (lru tcpcache) | 122 | entry <- atomically $ lookupSession tcpcache now saddr bDoCon |
96 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
97 | case v of | ||
98 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
99 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
100 | | otherwise -> return () | ||
101 | Just (tm, v) -> writeTVar (lru tcpcache) | ||
102 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c | ||
103 | return v | ||
104 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | 123 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) |
105 | case entry of | 124 | case entry of |
106 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 125 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
@@ -114,15 +133,14 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
114 | return h) | 133 | return h) |
115 | $ \e -> return Nothing | 134 | $ \e -> return Nothing |
116 | when (isNothing mh) $ do | 135 | when (isNothing mh) $ do |
117 | atomically $ modifyTVar' (lru tcpcache) | 136 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr |
118 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
119 | Socket.close sock | 137 | Socket.close sock |
120 | ret <- fmap join $ forM mh $ \h -> do | 138 | ret <- fmap join $ forM mh $ \h -> do |
121 | mst <- catchIOError (Just <$> streamHello stream addr h) | 139 | mst <- catchIOError (Just <$> streamHello stream addr h) |
122 | (\e -> return Nothing) | 140 | (\e -> return Nothing) |
123 | case mst of | 141 | case mst of |
124 | Nothing -> do | 142 | Nothing -> do |
125 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | 143 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr |
126 | return Nothing | 144 | return Nothing |
127 | Just st -> do | 145 | Just st -> do |
128 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | 146 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
@@ -143,8 +161,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
143 | loop | 161 | loop |
144 | Nothing -> do | 162 | Nothing -> do |
145 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | 163 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) |
146 | do atomically $ modifyTVar' (lru tcpcache) | 164 | do atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr |
147 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
148 | c <- atomically $ readTVar (lru tcpcache) | 165 | c <- atomically $ readTVar (lru tcpcache) |
149 | now <- getPOSIXTime | 166 | now <- getPOSIXTime |
150 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 167 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
@@ -163,33 +180,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
163 | retires <- atomically $ do | 180 | retires <- atomically $ do |
164 | c <- readTVar (lru tcpcache) | 181 | c <- readTVar (lru tcpcache) |
165 | let (rs,c') = MM.takeView (tcpMax tcpcache) | 182 | let (rs,c') = MM.takeView (tcpMax tcpcache) |
166 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | 183 | $ MM.insert' saddr v (Down t) c |
167 | writeTVar (lru tcpcache) c' | 184 | writeTVar (lru tcpcache) c' |
168 | writeTVar signal True | 185 | writeTVar signal True |
169 | return rs | 186 | return rs |
170 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do | 187 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do |
171 | dput XTCP $ "TCP dropped: " ++ show k | 188 | dput XTCP $ "TCP dropped: " ++ show k |
172 | killSession r | 189 | killSession r |
173 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | 190 | return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y |
174 | streamGoodbye st | ||
175 | hClose h | ||
176 | `catchIOError` \e -> return () | ||
177 | _ -> return () | ||
178 | |||
179 | return $ Just $ streamEncode st | ||
180 | when (isNothing ret) $ do | 191 | when (isNothing ret) $ do |
181 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | 192 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr |
182 | return ret | 193 | return ret |
183 | Just (tm, PendingTCPSession) | 194 | Just (tm, PendingTCPSession) |
184 | | not bDoCon -> return Nothing | 195 | | not bDoCon -> return Nothing |
185 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do | 196 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do |
186 | c <- readTVar (lru tcpcache) | 197 | c <- readTVar (lru tcpcache) |
187 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 198 | let v = MM.lookup' saddr c |
188 | case v of | 199 | case v of |
189 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | 200 | Just (_,TCPSession{tcpHandle=h,tcpState=st}) |
190 | Nothing -> return Nothing | 201 | -> return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y |
191 | _ -> retry | 202 | Nothing -> return Nothing |
192 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | 203 | _ -> retry |
204 | Just (tm, v@TCPSession {tcpHandle=h,tcpState=st}) -> return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y | ||
193 | 205 | ||
194 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | 206 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
195 | closeAll tcpcache stream = do | 207 | closeAll tcpcache stream = do |
@@ -197,9 +209,6 @@ closeAll tcpcache stream = do | |||
197 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | 209 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty |
198 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 210 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
199 | killSession r | 211 | killSession r |
200 | case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h) | ||
201 | (\e -> return ()) | ||
202 | _ -> return () | ||
203 | 212 | ||
204 | -- Use a cache of TCP client connections for sending (and receiving) packets. | 213 | -- Use a cache of TCP client connections for sending (and receiving) packets. |
205 | -- The boolean value prepended to the message allows the sender to specify | 214 | -- The boolean value prepended to the message allows the sender to specify |