diff options
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 192 |
1 files changed, 0 insertions, 192 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs deleted file mode 100644 index bad61727..00000000 --- a/src/Network/QueryResponse/TCP.hs +++ /dev/null | |||
@@ -1,192 +0,0 @@ | |||
1 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
2 | {-# LANGUAGE CPP #-} | ||
3 | module Network.QueryResponse.TCP where | ||
4 | |||
5 | #ifdef THREAD_DEBUG | ||
6 | import Control.Concurrent.Lifted.Instrument | ||
7 | #else | ||
8 | import Control.Concurrent.Lifted | ||
9 | import GHC.Conc (labelThread) | ||
10 | #endif | ||
11 | |||
12 | import Control.Arrow | ||
13 | import Control.Concurrent.STM | ||
14 | import Control.Monad | ||
15 | import Data.ByteString (ByteString,hPut) | ||
16 | import Data.Function | ||
17 | import Data.Hashable | ||
18 | import Data.Maybe | ||
19 | import Data.Ord | ||
20 | import Data.Time.Clock.POSIX | ||
21 | import Data.Word | ||
22 | import Network.BSD | ||
23 | import Network.Socket | ||
24 | import System.Timeout | ||
25 | import System.IO | ||
26 | import System.IO.Error | ||
27 | |||
28 | import DebugTag | ||
29 | import DPut | ||
30 | import Connection.Tcp (socketFamily) | ||
31 | import qualified Data.MinMaxPSQ as MM | ||
32 | import Network.QueryResponse | ||
33 | |||
34 | data TCPSession st | ||
35 | = PendingTCPSession | ||
36 | | TCPSession | ||
37 | { tcpHandle :: Handle | ||
38 | , tcpState :: st | ||
39 | , tcpThread :: ThreadId | ||
40 | } | ||
41 | |||
42 | newtype TCPAddress = TCPAddress SockAddr | ||
43 | deriving (Eq,Ord,Show) | ||
44 | |||
45 | instance Hashable TCPAddress where | ||
46 | hashWithSalt salt (TCPAddress x) = case x of | ||
47 | SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr) | ||
48 | SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d) | ||
49 | _ -> 0 | ||
50 | |||
51 | data TCPCache st = TCPCache | ||
52 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) | ||
53 | , tcpMax :: Int | ||
54 | } | ||
55 | |||
56 | data SessionProtocol x y = SessionProtocol | ||
57 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. | ||
58 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. | ||
59 | , streamEncode :: y -> IO () -- ^ Serialize outbound messages. | ||
60 | } | ||
61 | |||
62 | data StreamHandshake addr x y = StreamHandshake | ||
63 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. | ||
64 | , streamAddr :: addr -> SockAddr | ||
65 | } | ||
66 | |||
67 | killSession :: TCPSession st -> IO () | ||
68 | killSession PendingTCPSession = return () | ||
69 | killSession TCPSession{tcpThread=t} = killThread t | ||
70 | |||
71 | showStat r = case r of PendingTCPSession -> "pending." | ||
72 | TCPSession {} -> "established." | ||
73 | |||
74 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | ||
75 | -> TCPCache (SessionProtocol x y) | ||
76 | -> StreamHandshake addr x y | ||
77 | -> addr | ||
78 | -> Bool | ||
79 | -> IO (Maybe (y -> IO ())) | ||
80 | acquireConnection mvar tcpcache stream addr bDoCon = do | ||
81 | now <- getPOSIXTime | ||
82 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
83 | entry <- atomically $ do | ||
84 | c <- readTVar (lru tcpcache) | ||
85 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
86 | case v of | ||
87 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
88 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
89 | | otherwise -> return () | ||
90 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | ||
91 | return v | ||
92 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
93 | case entry of | ||
94 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | ||
95 | proto <- getProtocolNumber "tcp" | ||
96 | mh <- catchIOError (do h <- timeout 10000000 $ do | ||
97 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
98 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | ||
99 | h <- socketToHandle sock ReadWriteMode | ||
100 | hSetBuffering h NoBuffering | ||
101 | return h | ||
102 | return h) | ||
103 | $ \e -> return Nothing | ||
104 | ret <- fmap join $ forM mh $ \h -> do | ||
105 | st <- streamHello stream addr h | ||
106 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | ||
107 | signal <- newTVarIO False | ||
108 | rthread <- forkIO $ do | ||
109 | atomically (readTVar signal >>= check) | ||
110 | fix $ \loop -> do | ||
111 | x <- streamDecode st | ||
112 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | ||
113 | case x of | ||
114 | Just u -> do | ||
115 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | ||
116 | when (isNothing m) $ do | ||
117 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | ||
118 | tryTakeMVar mvar | ||
119 | return () | ||
120 | loop | ||
121 | Nothing -> do | ||
122 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
123 | do atomically $ modifyTVar' (lru tcpcache) | ||
124 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
125 | c <- atomically $ readTVar (lru tcpcache) | ||
126 | now <- getPOSIXTime | ||
127 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | ||
128 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | ||
129 | hClose h | ||
130 | let showAddr a = show (streamAddr stream a) | ||
131 | labelThread rthread ("tcp:"++showAddr addr) | ||
132 | let v = TCPSession | ||
133 | { tcpHandle = h | ||
134 | , tcpState = st | ||
135 | , tcpThread = rthread | ||
136 | } | ||
137 | t <- getPOSIXTime | ||
138 | retires <- atomically $ do | ||
139 | c <- readTVar (lru tcpcache) | ||
140 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
141 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
142 | writeTVar (lru tcpcache) c' | ||
143 | writeTVar signal True | ||
144 | return rs | ||
145 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | ||
146 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | ||
147 | dput XTCP $ "TCP dropped: " ++ show k | ||
148 | killSession r | ||
149 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
150 | streamGoodbye st | ||
151 | hClose h | ||
152 | _ -> return () | ||
153 | |||
154 | return $ Just $ streamEncode st | ||
155 | when (isNothing ret) $ do | ||
156 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
157 | return ret | ||
158 | Just (tm, PendingTCPSession) | ||
159 | | not bDoCon -> return Nothing | ||
160 | | otherwise -> fmap join $ timeout 10000000 $ atomically $ do | ||
161 | c <- readTVar (lru tcpcache) | ||
162 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
163 | case v of | ||
164 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
165 | Nothing -> return Nothing | ||
166 | _ -> retry | ||
167 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
168 | |||
169 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | ||
170 | closeAll tcpcache stream = do | ||
171 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | ||
172 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | ||
173 | killSession r | ||
174 | case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h | ||
175 | _ -> return () | ||
176 | |||
177 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | ||
178 | -> StreamHandshake addr x y | ||
179 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) | ||
180 | tcpTransport maxcon stream = do | ||
181 | msgvar <- newEmptyMVar | ||
182 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | ||
183 | return $ (,) tcpcache Transport | ||
184 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | ||
185 | , sendMessage = \addr (bDoCon,y) -> do | ||
186 | t <- forkIO $ do | ||
187 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | ||
188 | mapM_ ($ y) msock | ||
189 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | ||
190 | labelThread t "tcp-send" | ||
191 | , closeTransport = closeAll tcpcache stream | ||
192 | } | ||