diff options
Diffstat (limited to 'src/Network/QueryResponse')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 147 |
1 files changed, 92 insertions, 55 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index efeab305..a606c51d 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -14,6 +14,7 @@ import Control.Monad | |||
14 | import Data.ByteString (ByteString,hPut) | 14 | import Data.ByteString (ByteString,hPut) |
15 | import Data.Function | 15 | import Data.Function |
16 | import Data.Hashable | 16 | import Data.Hashable |
17 | import Data.Maybe | ||
17 | import Data.Ord | 18 | import Data.Ord |
18 | import Data.Time.Clock.POSIX | 19 | import Data.Time.Clock.POSIX |
19 | import Data.Word | 20 | import Data.Word |
@@ -29,11 +30,13 @@ import Connection.Tcp (socketFamily) | |||
29 | import qualified Data.MinMaxPSQ as MM | 30 | import qualified Data.MinMaxPSQ as MM |
30 | import Network.QueryResponse | 31 | import Network.QueryResponse |
31 | 32 | ||
32 | data TCPSession st = TCPSession | 33 | data TCPSession st |
33 | { tcpHandle :: Handle | 34 | = PendingTCPSession |
34 | , tcpState :: st | 35 | | TCPSession |
35 | , tcpThread :: ThreadId | 36 | { tcpHandle :: Handle |
36 | } | 37 | , tcpState :: st |
38 | , tcpThread :: ThreadId | ||
39 | } | ||
37 | 40 | ||
38 | newtype TCPAddress = TCPAddress SockAddr | 41 | newtype TCPAddress = TCPAddress SockAddr |
39 | deriving (Eq,Ord) | 42 | deriving (Eq,Ord) |
@@ -60,6 +63,10 @@ data StreamHandshake addr x y = StreamHandshake | |||
60 | , streamAddr :: addr -> SockAddr | 63 | , streamAddr :: addr -> SockAddr |
61 | } | 64 | } |
62 | 65 | ||
66 | killSession :: TCPSession st -> IO () | ||
67 | killSession PendingTCPSession = return () | ||
68 | killSession TCPSession{tcpThread=t} = killThread t | ||
69 | |||
63 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 70 | acquireConnection :: MVar (Maybe (Either a (x, addr))) |
64 | -> TCPCache (SessionProtocol x y) | 71 | -> TCPCache (SessionProtocol x y) |
65 | -> StreamHandshake addr x y | 72 | -> StreamHandshake addr x y |
@@ -67,65 +74,95 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
67 | -> Bool | 74 | -> Bool |
68 | -> IO (Maybe (y -> IO ())) | 75 | -> IO (Maybe (y -> IO ())) |
69 | acquireConnection mvar tcpcache stream addr bDoCon = do | 76 | acquireConnection mvar tcpcache stream addr bDoCon = do |
70 | cache <- atomically $ readTVar (lru tcpcache) | 77 | now <- getPOSIXTime |
71 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of | 78 | entry <- atomically $ do |
79 | c <- readTVar (lru tcpcache) | ||
80 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
81 | case v of | ||
82 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
83 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
84 | | otherwise -> return () | ||
85 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | ||
86 | return v | ||
87 | case entry of | ||
72 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 88 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
73 | proto <- getProtocolNumber "tcp" | 89 | proto <- getProtocolNumber "tcp" |
74 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | 90 | mh <- catchIOError (do h <- timeout 10000000 $ do |
75 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 91 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
76 | h <- socketToHandle sock ReadWriteMode | 92 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
77 | return $ Just h) | 93 | h <- socketToHandle sock ReadWriteMode |
94 | hSetBuffering h NoBuffering | ||
95 | return h | ||
96 | return h) | ||
78 | $ \e -> return Nothing | 97 | $ \e -> return Nothing |
79 | fmap join $ forM mh $ \h -> do | 98 | ret <- fmap join $ forM mh $ \h -> do |
80 | st <- streamHello stream addr h | 99 | st <- streamHello stream addr h |
81 | t <- getPOSIXTime | 100 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
82 | rthread <- forkIO $ fix $ \loop -> do | 101 | rthread <- forkIO $ fix $ \loop -> do |
83 | x <- streamDecode st | 102 | x <- streamDecode st |
84 | case x of | 103 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
85 | Just u -> do | 104 | case x of |
86 | timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | 105 | Just u -> do |
87 | loop | 106 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) |
88 | Nothing -> do | 107 | when (isNothing m) $ do |
89 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | 108 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." |
90 | atomically $ modifyTVar' (lru tcpcache) | 109 | loop |
91 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 110 | Nothing -> do |
92 | hClose h | 111 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) |
93 | let showAddr a = show (streamAddr stream a) | 112 | atomically $ modifyTVar' (lru tcpcache) |
94 | labelThread rthread ("tcp:"++showAddr addr) | 113 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
95 | let v = TCPSession | 114 | c <- atomically $ readTVar (lru tcpcache) |
96 | { tcpHandle = h | 115 | now <- getPOSIXTime |
97 | , tcpState = st | 116 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
98 | , tcpThread = rthread | 117 | let stat = case r of |
99 | } | 118 | PendingTCPSession -> "pending." |
100 | retires <- atomically $ do | 119 | TCPSession {} -> "established." |
101 | c <- readTVar (lru tcpcache) | 120 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] |
102 | let (rs,c') = MM.takeView (tcpMax tcpcache) | 121 | hClose h |
103 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | 122 | let showAddr a = show (streamAddr stream a) |
104 | writeTVar (lru tcpcache) c' | 123 | labelThread rthread ("tcp:"++showAddr addr) |
105 | return rs | 124 | let v = TCPSession |
106 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 125 | { tcpHandle = h |
107 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 126 | , tcpState = st |
108 | dput XTCP $ "TCP dropped: " ++ show k | 127 | , tcpThread = rthread |
109 | killThread (tcpThread r) | 128 | } |
110 | streamGoodbye st | 129 | t <- getPOSIXTime |
111 | hClose (tcpHandle r) | 130 | retires <- atomically $ do |
131 | c <- readTVar (lru tcpcache) | ||
132 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
133 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
134 | writeTVar (lru tcpcache) c' | ||
135 | return rs | ||
136 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | ||
137 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | ||
138 | dput XTCP $ "TCP dropped: " ++ show k | ||
139 | killSession r | ||
140 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
141 | streamGoodbye st | ||
142 | hClose h | ||
143 | _ -> return () | ||
112 | 144 | ||
113 | return $ Just $ streamEncode st | 145 | return $ Just $ streamEncode st |
114 | Just (tm,v) -> do | 146 | when (isNothing ret) $ do |
115 | t <- getPOSIXTime | 147 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) |
116 | let TCPSession { tcpHandle = h, tcpState = st } = v | 148 | return ret |
117 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache | 149 | Just (tm, PendingTCPSession) -> do |
118 | atomically $ writeTVar (lru tcpcache) cache' | 150 | fmap join $ timeout 10000000 $ atomically $ do |
119 | return $ Just $ streamEncode st | 151 | c <- readTVar (lru tcpcache) |
152 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
153 | case v of | ||
154 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
155 | Nothing -> return Nothing | ||
156 | _ -> retry | ||
157 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
120 | 158 | ||
121 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | 159 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
122 | closeAll tcpcache stream = do | 160 | closeAll tcpcache stream = do |
123 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | 161 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty |
124 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 162 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
125 | let st = tcpState r | 163 | killSession r |
126 | killThread (tcpThread r) | 164 | case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h |
127 | streamGoodbye st | 165 | _ -> return () |
128 | hClose (tcpHandle r) | ||
129 | 166 | ||
130 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 167 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
131 | -> StreamHandshake addr x y | 168 | -> StreamHandshake addr x y |