diff options
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 328a19e1..efeab305 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -19,9 +19,12 @@ import Data.Time.Clock.POSIX | |||
19 | import Data.Word | 19 | import Data.Word |
20 | import Network.BSD | 20 | import Network.BSD |
21 | import Network.Socket | 21 | import Network.Socket |
22 | import System.Timeout | ||
22 | import System.IO | 23 | import System.IO |
23 | import System.IO.Error | 24 | import System.IO.Error |
24 | 25 | ||
26 | import DebugTag | ||
27 | import DPut | ||
25 | import Connection.Tcp (socketFamily) | 28 | import Connection.Tcp (socketFamily) |
26 | import qualified Data.MinMaxPSQ as MM | 29 | import qualified Data.MinMaxPSQ as MM |
27 | import Network.QueryResponse | 30 | import Network.QueryResponse |
@@ -61,11 +64,12 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
61 | -> TCPCache (SessionProtocol x y) | 64 | -> TCPCache (SessionProtocol x y) |
62 | -> StreamHandshake addr x y | 65 | -> StreamHandshake addr x y |
63 | -> addr | 66 | -> addr |
67 | -> Bool | ||
64 | -> IO (Maybe (y -> IO ())) | 68 | -> IO (Maybe (y -> IO ())) |
65 | acquireConnection mvar tcpcache stream addr = do | 69 | acquireConnection mvar tcpcache stream addr bDoCon = do |
66 | cache <- atomically $ readTVar (lru tcpcache) | 70 | cache <- atomically $ readTVar (lru tcpcache) |
67 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of | 71 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of |
68 | Nothing -> do | 72 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
69 | proto <- getProtocolNumber "tcp" | 73 | proto <- getProtocolNumber "tcp" |
70 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | 74 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
71 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 75 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
@@ -77,10 +81,12 @@ acquireConnection mvar tcpcache stream addr = do | |||
77 | t <- getPOSIXTime | 81 | t <- getPOSIXTime |
78 | rthread <- forkIO $ fix $ \loop -> do | 82 | rthread <- forkIO $ fix $ \loop -> do |
79 | x <- streamDecode st | 83 | x <- streamDecode st |
80 | putMVar mvar $ fmap (\u -> Right (u, addr)) x | ||
81 | case x of | 84 | case x of |
82 | Just _ -> loop | 85 | Just u -> do |
86 | timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | ||
87 | loop | ||
83 | Nothing -> do | 88 | Nothing -> do |
89 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
84 | atomically $ modifyTVar' (lru tcpcache) | 90 | atomically $ modifyTVar' (lru tcpcache) |
85 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 91 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
86 | hClose h | 92 | hClose h |
@@ -91,14 +97,18 @@ acquireConnection mvar tcpcache stream addr = do | |||
91 | , tcpState = st | 97 | , tcpState = st |
92 | , tcpThread = rthread | 98 | , tcpThread = rthread |
93 | } | 99 | } |
94 | let (retires,cache') = MM.takeView (tcpMax tcpcache) | 100 | retires <- atomically $ do |
95 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache | 101 | c <- readTVar (lru tcpcache) |
102 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
103 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
104 | writeTVar (lru tcpcache) c' | ||
105 | return rs | ||
96 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 106 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
97 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 107 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
108 | dput XTCP $ "TCP dropped: " ++ show k | ||
98 | killThread (tcpThread r) | 109 | killThread (tcpThread r) |
99 | streamGoodbye st | 110 | streamGoodbye st |
100 | hClose (tcpHandle r) | 111 | hClose (tcpHandle r) |
101 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress k) | ||
102 | 112 | ||
103 | return $ Just $ streamEncode st | 113 | return $ Just $ streamEncode st |
104 | Just (tm,v) -> do | 114 | Just (tm,v) -> do |
@@ -119,15 +129,15 @@ closeAll tcpcache stream = do | |||
119 | 129 | ||
120 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 130 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
121 | -> StreamHandshake addr x y | 131 | -> StreamHandshake addr x y |
122 | -> IO (TransportA err addr x y) | 132 | -> IO (TransportA err addr x (Bool,y)) |
123 | tcpTransport maxcon stream = do | 133 | tcpTransport maxcon stream = do |
124 | msgvar <- newEmptyMVar | 134 | msgvar <- newEmptyMVar |
125 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 135 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
126 | return Transport | 136 | return Transport |
127 | { awaitMessage = (takeMVar msgvar >>=) | 137 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
128 | , sendMessage = \addr y -> do | 138 | , sendMessage = \addr (bDoCon,y) -> do |
129 | t <- forkIO $ do | 139 | t <- forkIO $ do |
130 | msock <- acquireConnection msgvar tcpcache stream addr | 140 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
131 | mapM_ ($ y) msock | 141 | mapM_ ($ y) msock |
132 | `catchIOError` \e -> return () | 142 | `catchIOError` \e -> return () |
133 | labelThread t "tcp-send" | 143 | labelThread t "tcp-send" |