summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-28 17:23:02 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-28 17:26:25 -0500
commit1e16567904a147b842070c1d98c83dc3e9c00c98 (patch)
treed3435b1b6ed89de751770b70da38a0e93a13950f
parentd1eef43fde09b670dfedad9742a70224fcdc941f (diff)
Kill tcp session on exception.HEADmaster
-rw-r--r--dht/src/Network/Tox/TCP.hs16
-rw-r--r--server/src/Network/QueryResponse/TCP.hs79
2 files changed, 50 insertions, 45 deletions
diff --git a/dht/src/Network/Tox/TCP.hs b/dht/src/Network/Tox/TCP.hs
index 36289e19..e3780675 100644
--- a/dht/src/Network/Tox/TCP.hs
+++ b/dht/src/Network/Tox/TCP.hs
@@ -146,16 +146,12 @@ tcpStream crypto mkst = StreamHandshake
146 dput XTCP $ "TCP exception: " ++ show e 146 dput XTCP $ "TCP exception: " ++ show e
147 return Nothing 147 return Nothing
148 , streamEncode = \y -> do 148 , streamEncode = \y -> do
149 -- dput XTCP $ "TCP(acquire nonce):" ++ show addr ++ " <-- " ++ show y 149 -- We need this to throw so the tcp session state can be cleaned up elsewhere.
150 n24 <- takeMVar nsend 150 bracket (takeMVar nsend) (putMVar nsend . incrementNonce24)
151 -- dput XTCP $ "TCP(got nonce):" ++ show addr ++ " <-- " ++ show y 151 $ \n24 -> do
152 let bs = encode $ encrypt (noncef' n24) $ encodePlain y 152 let bs = encode $ encrypt (noncef' n24) $ encodePlain y
153 ($ h) -- bracket (takeMVar hvar) (putMVar hvar) 153 hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs)
154 $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) 154 dput XTCP $ "TCP: " ++ show addr ++ " <-- " ++ show y
155 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e
156 -- dput XTCP $ "TCP(incrementing nonce): " ++ show addr ++ " <-- " ++ show y
157 putMVar nsend (incrementNonce24 n24)
158 dput XTCP $ "TCP: " ++ show addr ++ " <-- " ++ show y
159 } 155 }
160 , streamAddr = nodeAddr 156 , streamAddr = nodeAddr
161 } 157 }
diff --git a/server/src/Network/QueryResponse/TCP.hs b/server/src/Network/QueryResponse/TCP.hs
index 24aacd98..639212cb 100644
--- a/server/src/Network/QueryResponse/TCP.hs
+++ b/server/src/Network/QueryResponse/TCP.hs
@@ -71,9 +71,12 @@ data StreamHandshake addr x y = StreamHandshake
71 , streamAddr :: addr -> SockAddr 71 , streamAddr :: addr -> SockAddr
72 } 72 }
73 73
74killSession :: TCPSession st -> IO () 74killSession :: TCPSession (SessionProtocol x y) -> IO ()
75killSession PendingTCPSession = return () 75killSession TCPSession{tcpState=st,tcpHandle=h,tcpThread=t} = do
76killSession TCPSession{tcpThread=t} = killThread t 76 catchIOError (streamGoodbye st >> hClose h)
77 (\e -> return ())
78 killThread t
79killSession _ = return ()
77 80
78showStat :: IsString p => TCPSession st -> p 81showStat :: IsString p => TCPSession st -> p
79showStat r = case r of PendingTCPSession -> "pending." 82showStat r = case r of PendingTCPSession -> "pending."
@@ -82,6 +85,30 @@ showStat r = case r of PendingTCPSession -> "pending."
82tcp_timeout :: Int 85tcp_timeout :: Int
83tcp_timeout = 10000000 86tcp_timeout = 10000000
84 87
88
89removeOnFail :: TCPCache (SessionProtocol x y) -> Handle -> TCPAddress -> IO () -> IO ()
90removeOnFail tcpcache h addr action = action `catchIOError` \e -> do
91 join $ atomically $ do
92 c <- readTVar (lru tcpcache)
93 case MM.lookup' addr c of
94 Just (tm, v@TCPSession {tcpHandle=stored}) | h == stored -> do
95 modifyTVar' (lru tcpcache) $ MM.delete addr
96 return $ killSession v
97 _ -> return $ return ()
98 dput XTCP $ "TCP-send " ++ show addr ++ " " ++ show e
99
100lookupSession :: TCPCache st -> POSIXTime -> TCPAddress -> Bool -> STM (Maybe (Down POSIXTime, TCPSession st))
101lookupSession tcpcache now saddr bDoCon = do
102 c <- readTVar (lru tcpcache)
103 let v = MM.lookup' saddr c
104 case v of
105 Nothing | bDoCon -> writeTVar (lru tcpcache)
106 $ MM.insert' saddr PendingTCPSession (Down now) c
107 | otherwise -> return ()
108 Just (tm, v) -> writeTVar (lru tcpcache)
109 $ MM.insert' saddr v (Down now) c
110 return v
111
85acquireConnection :: TMVar (Arrival a addr x) 112acquireConnection :: TMVar (Arrival a addr x)
86 -> TCPCache (SessionProtocol x y) 113 -> TCPCache (SessionProtocol x y)
87 -> StreamHandshake addr x y 114 -> StreamHandshake addr x y
@@ -91,16 +118,8 @@ acquireConnection :: TMVar (Arrival a addr x)
91acquireConnection mvar tcpcache stream addr bDoCon = do 118acquireConnection mvar tcpcache stream addr bDoCon = do
92 now <- getPOSIXTime 119 now <- getPOSIXTime
93 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) 120 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
94 entry <- atomically $ do 121 let saddr = TCPAddress $ streamAddr stream addr
95 c <- readTVar (lru tcpcache) 122 entry <- atomically $ lookupSession tcpcache now saddr bDoCon
96 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
97 case v of
98 Nothing | bDoCon -> writeTVar (lru tcpcache)
99 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
100 | otherwise -> return ()
101 Just (tm, v) -> writeTVar (lru tcpcache)
102 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c
103 return v
104 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) 123 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
105 case entry of 124 case entry of
106 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 125 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
@@ -114,15 +133,14 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
114 return h) 133 return h)
115 $ \e -> return Nothing 134 $ \e -> return Nothing
116 when (isNothing mh) $ do 135 when (isNothing mh) $ do
117 atomically $ modifyTVar' (lru tcpcache) 136 atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr
118 $ MM.delete (TCPAddress $ streamAddr stream addr)
119 Socket.close sock 137 Socket.close sock
120 ret <- fmap join $ forM mh $ \h -> do 138 ret <- fmap join $ forM mh $ \h -> do
121 mst <- catchIOError (Just <$> streamHello stream addr h) 139 mst <- catchIOError (Just <$> streamHello stream addr h)
122 (\e -> return Nothing) 140 (\e -> return Nothing)
123 case mst of 141 case mst of
124 Nothing -> do 142 Nothing -> do
125 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) 143 atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr
126 return Nothing 144 return Nothing
127 Just st -> do 145 Just st -> do
128 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) 146 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
@@ -143,8 +161,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
143 loop 161 loop
144 Nothing -> do 162 Nothing -> do
145 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) 163 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
146 do atomically $ modifyTVar' (lru tcpcache) 164 do atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr
147 $ MM.delete (TCPAddress $ streamAddr stream addr)
148 c <- atomically $ readTVar (lru tcpcache) 165 c <- atomically $ readTVar (lru tcpcache)
149 now <- getPOSIXTime 166 now <- getPOSIXTime
150 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do 167 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
@@ -163,33 +180,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
163 retires <- atomically $ do 180 retires <- atomically $ do
164 c <- readTVar (lru tcpcache) 181 c <- readTVar (lru tcpcache)
165 let (rs,c') = MM.takeView (tcpMax tcpcache) 182 let (rs,c') = MM.takeView (tcpMax tcpcache)
166 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c 183 $ MM.insert' saddr v (Down t) c
167 writeTVar (lru tcpcache) c' 184 writeTVar (lru tcpcache) c'
168 writeTVar signal True 185 writeTVar signal True
169 return rs 186 return rs
170 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do 187 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
171 dput XTCP $ "TCP dropped: " ++ show k 188 dput XTCP $ "TCP dropped: " ++ show k
172 killSession r 189 killSession r
173 case r of TCPSession {tcpState=st,tcpHandle=h} -> do 190 return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y
174 streamGoodbye st
175 hClose h
176 `catchIOError` \e -> return ()
177 _ -> return ()
178
179 return $ Just $ streamEncode st
180 when (isNothing ret) $ do 191 when (isNothing ret) $ do
181 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) 192 atomically $ modifyTVar' (lru tcpcache) $ MM.delete saddr
182 return ret 193 return ret
183 Just (tm, PendingTCPSession) 194 Just (tm, PendingTCPSession)
184 | not bDoCon -> return Nothing 195 | not bDoCon -> return Nothing
185 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do 196 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
186 c <- readTVar (lru tcpcache) 197 c <- readTVar (lru tcpcache)
187 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 198 let v = MM.lookup' saddr c
188 case v of 199 case v of
189 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st 200 Just (_,TCPSession{tcpHandle=h,tcpState=st})
190 Nothing -> return Nothing 201 -> return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y
191 _ -> retry 202 Nothing -> return Nothing
192 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st 203 _ -> retry
204 Just (tm, v@TCPSession {tcpHandle=h,tcpState=st}) -> return $ Just $ \y -> removeOnFail tcpcache h saddr $ streamEncode st y
193 205
194closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () 206closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
195closeAll tcpcache stream = do 207closeAll tcpcache stream = do
@@ -197,9 +209,6 @@ closeAll tcpcache stream = do
197 cache <- atomically $ swapTVar (lru tcpcache) MM.empty 209 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
198 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 210 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
199 killSession r 211 killSession r
200 case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h)
201 (\e -> return ())
202 _ -> return ()
203 212
204-- Use a cache of TCP client connections for sending (and receiving) packets. 213-- Use a cache of TCP client connections for sending (and receiving) packets.
205-- The boolean value prepended to the message allows the sender to specify 214-- The boolean value prepended to the message allows the sender to specify