summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r--src/Network/QueryResponse/TCP.hs32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 328a19e1..efeab305 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -19,9 +19,12 @@ import Data.Time.Clock.POSIX
19import Data.Word 19import Data.Word
20import Network.BSD 20import Network.BSD
21import Network.Socket 21import Network.Socket
22import System.Timeout
22import System.IO 23import System.IO
23import System.IO.Error 24import System.IO.Error
24 25
26import DebugTag
27import DPut
25import Connection.Tcp (socketFamily) 28import Connection.Tcp (socketFamily)
26import qualified Data.MinMaxPSQ as MM 29import qualified Data.MinMaxPSQ as MM
27import Network.QueryResponse 30import Network.QueryResponse
@@ -61,11 +64,12 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
61 -> TCPCache (SessionProtocol x y) 64 -> TCPCache (SessionProtocol x y)
62 -> StreamHandshake addr x y 65 -> StreamHandshake addr x y
63 -> addr 66 -> addr
67 -> Bool
64 -> IO (Maybe (y -> IO ())) 68 -> IO (Maybe (y -> IO ()))
65acquireConnection mvar tcpcache stream addr = do 69acquireConnection mvar tcpcache stream addr bDoCon = do
66 cache <- atomically $ readTVar (lru tcpcache) 70 cache <- atomically $ readTVar (lru tcpcache)
67 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of 71 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of
68 Nothing -> do 72 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
69 proto <- getProtocolNumber "tcp" 73 proto <- getProtocolNumber "tcp"
70 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto 74 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
71 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) 75 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
@@ -77,10 +81,12 @@ acquireConnection mvar tcpcache stream addr = do
77 t <- getPOSIXTime 81 t <- getPOSIXTime
78 rthread <- forkIO $ fix $ \loop -> do 82 rthread <- forkIO $ fix $ \loop -> do
79 x <- streamDecode st 83 x <- streamDecode st
80 putMVar mvar $ fmap (\u -> Right (u, addr)) x
81 case x of 84 case x of
82 Just _ -> loop 85 Just u -> do
86 timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
87 loop
83 Nothing -> do 88 Nothing -> do
89 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
84 atomically $ modifyTVar' (lru tcpcache) 90 atomically $ modifyTVar' (lru tcpcache)
85 $ MM.delete (TCPAddress $ streamAddr stream addr) 91 $ MM.delete (TCPAddress $ streamAddr stream addr)
86 hClose h 92 hClose h
@@ -91,14 +97,18 @@ acquireConnection mvar tcpcache stream addr = do
91 , tcpState = st 97 , tcpState = st
92 , tcpThread = rthread 98 , tcpThread = rthread
93 } 99 }
94 let (retires,cache') = MM.takeView (tcpMax tcpcache) 100 retires <- atomically $ do
95 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache 101 c <- readTVar (lru tcpcache)
102 let (rs,c') = MM.takeView (tcpMax tcpcache)
103 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
104 writeTVar (lru tcpcache) c'
105 return rs
96 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 106 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
97 myThreadId >>= flip labelThread ("tcp-close:"++show k) 107 myThreadId >>= flip labelThread ("tcp-close:"++show k)
108 dput XTCP $ "TCP dropped: " ++ show k
98 killThread (tcpThread r) 109 killThread (tcpThread r)
99 streamGoodbye st 110 streamGoodbye st
100 hClose (tcpHandle r) 111 hClose (tcpHandle r)
101 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress k)
102 112
103 return $ Just $ streamEncode st 113 return $ Just $ streamEncode st
104 Just (tm,v) -> do 114 Just (tm,v) -> do
@@ -119,15 +129,15 @@ closeAll tcpcache stream = do
119 129
120tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 130tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
121 -> StreamHandshake addr x y 131 -> StreamHandshake addr x y
122 -> IO (TransportA err addr x y) 132 -> IO (TransportA err addr x (Bool,y))
123tcpTransport maxcon stream = do 133tcpTransport maxcon stream = do
124 msgvar <- newEmptyMVar 134 msgvar <- newEmptyMVar
125 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 135 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
126 return Transport 136 return Transport
127 { awaitMessage = (takeMVar msgvar >>=) 137 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
128 , sendMessage = \addr y -> do 138 , sendMessage = \addr (bDoCon,y) -> do
129 t <- forkIO $ do 139 t <- forkIO $ do
130 msock <- acquireConnection msgvar tcpcache stream addr 140 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
131 mapM_ ($ y) msock 141 mapM_ ($ y) msock
132 `catchIOError` \e -> return () 142 `catchIOError` \e -> return ()
133 labelThread t "tcp-send" 143 labelThread t "tcp-send"