diff options
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs index defdf6a2..67c19512 100644 --- a/dht/src/Network/QueryResponse/TCP.hs +++ b/dht/src/Network/QueryResponse/TCP.hs | |||
@@ -21,7 +21,7 @@ import Data.Time.Clock.POSIX | |||
21 | import Data.Word | 21 | import Data.Word |
22 | import Data.String (IsString(..)) | 22 | import Data.String (IsString(..)) |
23 | import Network.BSD | 23 | import Network.BSD |
24 | import Network.Socket | 24 | import Network.Socket as Socket |
25 | import System.Timeout | 25 | import System.Timeout |
26 | import System.IO | 26 | import System.IO |
27 | import System.IO.Error | 27 | import System.IO.Error |
@@ -94,23 +94,25 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
94 | Nothing | bDoCon -> writeTVar (lru tcpcache) | 94 | Nothing | bDoCon -> writeTVar (lru tcpcache) |
95 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | 95 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c |
96 | | otherwise -> return () | 96 | | otherwise -> return () |
97 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | 97 | Just (tm, v) -> writeTVar (lru tcpcache) |
98 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c | ||
98 | return v | 99 | return v |
99 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | 100 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) |
100 | case entry of | 101 | case entry of |
101 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 102 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
102 | proto <- getProtocolNumber "tcp" | 103 | proto <- getProtocolNumber "tcp" |
104 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
103 | mh <- catchIOError (do h <- timeout tcp_timeout $ do | 105 | mh <- catchIOError (do h <- timeout tcp_timeout $ do |
104 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
105 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 106 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
106 | h <- socketToHandle sock ReadWriteMode | 107 | h <- socketToHandle sock ReadWriteMode |
107 | hSetBuffering h NoBuffering | 108 | hSetBuffering h NoBuffering |
108 | return h | 109 | return h |
109 | return h) | 110 | return h) |
110 | $ \e -> return Nothing | 111 | $ \e -> return Nothing |
111 | when (isNothing mh) | 112 | when (isNothing mh) $ do |
112 | $ atomically $ modifyTVar' (lru tcpcache) | 113 | atomically $ modifyTVar' (lru tcpcache) |
113 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 114 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
115 | Socket.close sock | ||
114 | ret <- fmap join $ forM mh $ \h -> do | 116 | ret <- fmap join $ forM mh $ \h -> do |
115 | mst <- catchIOError (Just <$> streamHello stream addr h) | 117 | mst <- catchIOError (Just <$> streamHello stream addr h) |
116 | (\e -> return Nothing) | 118 | (\e -> return Nothing) |
@@ -147,7 +149,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
147 | case mreport of | 149 | case mreport of |
148 | Just treport -> dput XTCP treport | 150 | Just treport -> dput XTCP treport |
149 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." | 151 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." |
150 | hClose h | 152 | hClose h `catchIOError` \e -> return () |
151 | let v = TCPSession | 153 | let v = TCPSession |
152 | { tcpHandle = h | 154 | { tcpHandle = h |
153 | , tcpState = st | 155 | , tcpState = st |
@@ -167,6 +169,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
167 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | 169 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do |
168 | streamGoodbye st | 170 | streamGoodbye st |
169 | hClose h | 171 | hClose h |
172 | `catchIOError` \e -> return () | ||
170 | _ -> return () | 173 | _ -> return () |
171 | 174 | ||
172 | return $ Just $ streamEncode st | 175 | return $ Just $ streamEncode st |
@@ -190,7 +193,8 @@ closeAll tcpcache stream = do | |||
190 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | 193 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty |
191 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 194 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
192 | killSession r | 195 | killSession r |
193 | case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h | 196 | case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h) |
197 | (\e -> return ()) | ||
194 | _ -> return () | 198 | _ -> return () |
195 | 199 | ||
196 | -- Use a cache of TCP client connections for sending (and receiving) packets. | 200 | -- Use a cache of TCP client connections for sending (and receiving) packets. |