summaryrefslogtreecommitdiff
path: root/server/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/Network/QueryResponse/TCP.hs')
-rw-r--r--server/src/Network/QueryResponse/TCP.hs225
1 files changed, 225 insertions, 0 deletions
diff --git a/server/src/Network/QueryResponse/TCP.hs b/server/src/Network/QueryResponse/TCP.hs
new file mode 100644
index 00000000..8b1b432b
--- /dev/null
+++ b/server/src/Network/QueryResponse/TCP.hs
@@ -0,0 +1,225 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4{-# LANGUAGE OverloadedStrings #-}
5module Network.QueryResponse.TCP where
6
7#ifdef THREAD_DEBUG
8import Control.Concurrent.Lifted.Instrument
9#else
10import Control.Concurrent.Lifted
11import GHC.Conc (labelThread,forkIO)
12import ForkLabeled
13#endif
14
15import Control.Arrow
16import Control.Concurrent.STM
17import Control.Concurrent.STM.TMVar
18import Control.Monad
19import Data.ByteString (ByteString,hPut)
20import Data.Function
21import Data.Hashable
22import Data.Maybe
23import Data.Ord
24import Data.Time.Clock.POSIX
25import Data.Word
26import Data.String (IsString(..))
27import Network.BSD
28import Network.Socket as Socket
29import System.Timeout
30import System.IO
31import System.IO.Error
32
33import DebugTag
34import DebugUtil
35import DPut
36import Connection.Tcp (socketFamily)
37import qualified Data.MinMaxPSQ as MM
38import Network.QueryResponse
39
40data TCPSession st
41 = PendingTCPSession
42 | TCPSession
43 { tcpHandle :: Handle
44 , tcpState :: st
45 , tcpThread :: ThreadId
46 }
47
48newtype TCPAddress = TCPAddress SockAddr
49 deriving (Eq,Ord,Show)
50
51instance Hashable TCPAddress where
52 hashWithSalt salt (TCPAddress x) = case x of
53 SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr)
54 SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d)
55 _ -> 0
56
57data TCPCache st = TCPCache
58 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
59 , tcpMax :: Int
60 }
61
62-- This is a suitable /st/ parameter to 'TCPCache'
63data SessionProtocol x y = SessionProtocol
64 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
65 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
66 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
67 }
68
69data StreamHandshake addr x y = StreamHandshake
70 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
71 , streamAddr :: addr -> SockAddr
72 }
73
74killSession :: TCPSession st -> IO ()
75killSession PendingTCPSession = return ()
76killSession TCPSession{tcpThread=t} = killThread t
77
78showStat :: IsString p => TCPSession st -> p
79showStat r = case r of PendingTCPSession -> "pending."
80 TCPSession {} -> "established."
81
82tcp_timeout :: Int
83tcp_timeout = 10000000
84
85acquireConnection :: TMVar (Arrival a addr x)
86 -> TCPCache (SessionProtocol x y)
87 -> StreamHandshake addr x y
88 -> addr
89 -> Bool
90 -> IO (Maybe (y -> IO ()))
91acquireConnection mvar tcpcache stream addr bDoCon = do
92 now <- getPOSIXTime
93 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
94 entry <- atomically $ do
95 c <- readTVar (lru tcpcache)
96 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
97 case v of
98 Nothing | bDoCon -> writeTVar (lru tcpcache)
99 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
100 | otherwise -> return ()
101 Just (tm, v) -> writeTVar (lru tcpcache)
102 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c
103 return v
104 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
105 case entry of
106 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
107 proto <- getProtocolNumber "tcp"
108 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
109 mh <- catchIOError (do h <- timeout tcp_timeout $ do
110 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
111 h <- socketToHandle sock ReadWriteMode
112 hSetBuffering h NoBuffering
113 return h
114 return h)
115 $ \e -> return Nothing
116 when (isNothing mh) $ do
117 atomically $ modifyTVar' (lru tcpcache)
118 $ MM.delete (TCPAddress $ streamAddr stream addr)
119 Socket.close sock
120 ret <- fmap join $ forM mh $ \h -> do
121 mst <- catchIOError (Just <$> streamHello stream addr h)
122 (\e -> return Nothing)
123 case mst of
124 Nothing -> do
125 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
126 return Nothing
127 Just st -> do
128 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
129 signal <- newTVarIO False
130 let showAddr a = show (streamAddr stream a)
131 rthread <- forkLabeled ("tcp:"++showAddr addr) $ do
132 atomically (readTVar signal >>= check)
133 fix $ \loop -> do
134 x <- streamDecode st
135 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
136 case x of
137 Just u -> do
138 m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u)
139 when (isNothing m) $ do
140 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
141 atomically $ tryTakeTMVar mvar
142 return ()
143 loop
144 Nothing -> do
145 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
146 do atomically $ modifyTVar' (lru tcpcache)
147 $ MM.delete (TCPAddress $ streamAddr stream addr)
148 c <- atomically $ readTVar (lru tcpcache)
149 now <- getPOSIXTime
150 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
151 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
152 mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout
153 case mreport of
154 Just treport -> dput XTCP treport
155 Nothing -> dput XTCP "TCP ERROR: threadReport timed out."
156 hClose h `catchIOError` \e -> return ()
157 let v = TCPSession
158 { tcpHandle = h
159 , tcpState = st
160 , tcpThread = rthread
161 }
162 t <- getPOSIXTime
163 retires <- atomically $ do
164 c <- readTVar (lru tcpcache)
165 let (rs,c') = MM.takeView (tcpMax tcpcache)
166 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
167 writeTVar (lru tcpcache) c'
168 writeTVar signal True
169 return rs
170 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
171 dput XTCP $ "TCP dropped: " ++ show k
172 killSession r
173 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
174 streamGoodbye st
175 hClose h
176 `catchIOError` \e -> return ()
177 _ -> return ()
178
179 return $ Just $ streamEncode st
180 when (isNothing ret) $ do
181 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
182 return ret
183 Just (tm, PendingTCPSession)
184 | not bDoCon -> return Nothing
185 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
186 c <- readTVar (lru tcpcache)
187 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
188 case v of
189 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
190 Nothing -> return Nothing
191 _ -> retry
192 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
193
194closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
195closeAll tcpcache stream = do
196 dput XTCP "TCP.closeAll called."
197 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
198 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
199 killSession r
200 case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h)
201 (\e -> return ())
202 _ -> return ()
203
204-- Use a cache of TCP client connections for sending (and receiving) packets.
205-- The boolean value prepended to the message allows the sender to specify
206-- whether or not a new connection will be initiated if neccessary. If 'False'
207-- is passed, then the packet will be sent only if there already exists a
208-- connection.
209tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
210 -> StreamHandshake addr x y
211 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
212tcpTransport maxcon stream = do
213 msgvar <- atomically newEmptyTMVar
214 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
215 return $ (,) tcpcache Transport
216 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do
217 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated)
218 , sendMessage = \addr (bDoCon,y) -> do
219 void . forkLabeled "tcp-send" $ do
220 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
221 mapM_ ($ y) msock
222 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
223 , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated)
224 True -> return ()
225 }