summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r--src/Network/QueryResponse/TCP.hs36
1 files changed, 23 insertions, 13 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index a606c51d..bad61727 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -9,6 +9,7 @@ import Control.Concurrent.Lifted
9import GHC.Conc (labelThread) 9import GHC.Conc (labelThread)
10#endif 10#endif
11 11
12import Control.Arrow
12import Control.Concurrent.STM 13import Control.Concurrent.STM
13import Control.Monad 14import Control.Monad
14import Data.ByteString (ByteString,hPut) 15import Data.ByteString (ByteString,hPut)
@@ -39,7 +40,7 @@ data TCPSession st
39 } 40 }
40 41
41newtype TCPAddress = TCPAddress SockAddr 42newtype TCPAddress = TCPAddress SockAddr
42 deriving (Eq,Ord) 43 deriving (Eq,Ord,Show)
43 44
44instance Hashable TCPAddress where 45instance Hashable TCPAddress where
45 hashWithSalt salt (TCPAddress x) = case x of 46 hashWithSalt salt (TCPAddress x) = case x of
@@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO ()
67killSession PendingTCPSession = return () 68killSession PendingTCPSession = return ()
68killSession TCPSession{tcpThread=t} = killThread t 69killSession TCPSession{tcpThread=t} = killThread t
69 70
71showStat r = case r of PendingTCPSession -> "pending."
72 TCPSession {} -> "established."
73
70acquireConnection :: MVar (Maybe (Either a (x, addr))) 74acquireConnection :: MVar (Maybe (Either a (x, addr)))
71 -> TCPCache (SessionProtocol x y) 75 -> TCPCache (SessionProtocol x y)
72 -> StreamHandshake addr x y 76 -> StreamHandshake addr x y
@@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
75 -> IO (Maybe (y -> IO ())) 79 -> IO (Maybe (y -> IO ()))
76acquireConnection mvar tcpcache stream addr bDoCon = do 80acquireConnection mvar tcpcache stream addr bDoCon = do
77 now <- getPOSIXTime 81 now <- getPOSIXTime
82 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
78 entry <- atomically $ do 83 entry <- atomically $ do
79 c <- readTVar (lru tcpcache) 84 c <- readTVar (lru tcpcache)
80 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 85 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
@@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
84 | otherwise -> return () 89 | otherwise -> return ()
85 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) 90 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now)
86 return v 91 return v
92 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
87 case entry of 93 case entry of
88 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 94 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
89 proto <- getProtocolNumber "tcp" 95 proto <- getProtocolNumber "tcp"
@@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
98 ret <- fmap join $ forM mh $ \h -> do 104 ret <- fmap join $ forM mh $ \h -> do
99 st <- streamHello stream addr h 105 st <- streamHello stream addr h
100 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) 106 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
101 rthread <- forkIO $ fix $ \loop -> do 107 signal <- newTVarIO False
108 rthread <- forkIO $ do
109 atomically (readTVar signal >>= check)
110 fix $ \loop -> do
102 x <- streamDecode st 111 x <- streamDecode st
103 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x 112 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
104 case x of 113 case x of
105 Just u -> do 114 Just u -> do
106 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) 115 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
107 when (isNothing m) $ do 116 when (isNothing m) $ do
108 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." 117 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
118 tryTakeMVar mvar
119 return ()
109 loop 120 loop
110 Nothing -> do 121 Nothing -> do
111 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) 122 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
112 atomically $ modifyTVar' (lru tcpcache) 123 do atomically $ modifyTVar' (lru tcpcache)
113 $ MM.delete (TCPAddress $ streamAddr stream addr) 124 $ MM.delete (TCPAddress $ streamAddr stream addr)
114 c <- atomically $ readTVar (lru tcpcache) 125 c <- atomically $ readTVar (lru tcpcache)
115 now <- getPOSIXTime 126 now <- getPOSIXTime
116 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do 127 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
117 let stat = case r of 128 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
118 PendingTCPSession -> "pending."
119 TCPSession {} -> "established."
120 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat]
121 hClose h 129 hClose h
122 let showAddr a = show (streamAddr stream a) 130 let showAddr a = show (streamAddr stream a)
123 labelThread rthread ("tcp:"++showAddr addr) 131 labelThread rthread ("tcp:"++showAddr addr)
@@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
132 let (rs,c') = MM.takeView (tcpMax tcpcache) 140 let (rs,c') = MM.takeView (tcpMax tcpcache)
133 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c 141 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
134 writeTVar (lru tcpcache) c' 142 writeTVar (lru tcpcache) c'
143 writeTVar signal True
135 return rs 144 return rs
136 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 145 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
137 myThreadId >>= flip labelThread ("tcp-close:"++show k) 146 myThreadId >>= flip labelThread ("tcp-close:"++show k)
@@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
146 when (isNothing ret) $ do 155 when (isNothing ret) $ do
147 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) 156 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
148 return ret 157 return ret
149 Just (tm, PendingTCPSession) -> do 158 Just (tm, PendingTCPSession)
150 fmap join $ timeout 10000000 $ atomically $ do 159 | not bDoCon -> return Nothing
160 | otherwise -> fmap join $ timeout 10000000 $ atomically $ do
151 c <- readTVar (lru tcpcache) 161 c <- readTVar (lru tcpcache)
152 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 162 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
153 case v of 163 case v of
@@ -166,17 +176,17 @@ closeAll tcpcache stream = do
166 176
167tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 177tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
168 -> StreamHandshake addr x y 178 -> StreamHandshake addr x y
169 -> IO (TransportA err addr x (Bool,y)) 179 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
170tcpTransport maxcon stream = do 180tcpTransport maxcon stream = do
171 msgvar <- newEmptyMVar 181 msgvar <- newEmptyMVar
172 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 182 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
173 return Transport 183 return $ (,) tcpcache Transport
174 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) 184 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
175 , sendMessage = \addr (bDoCon,y) -> do 185 , sendMessage = \addr (bDoCon,y) -> do
176 t <- forkIO $ do 186 t <- forkIO $ do
177 msock <- acquireConnection msgvar tcpcache stream addr bDoCon 187 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
178 mapM_ ($ y) msock 188 mapM_ ($ y) msock
179 `catchIOError` \e -> return () 189 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
180 labelThread t "tcp-send" 190 labelThread t "tcp-send"
181 , closeTransport = closeAll tcpcache stream 191 , closeTransport = closeAll tcpcache stream
182 } 192 }