summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r--src/Network/QueryResponse/TCP.hs192
1 files changed, 0 insertions, 192 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
deleted file mode 100644
index bad61727..00000000
--- a/src/Network/QueryResponse/TCP.hs
+++ /dev/null
@@ -1,192 +0,0 @@
1{-# LANGUAGE GeneralizedNewtypeDeriving #-}
2{-# LANGUAGE CPP #-}
3module Network.QueryResponse.TCP where
4
5#ifdef THREAD_DEBUG
6import Control.Concurrent.Lifted.Instrument
7#else
8import Control.Concurrent.Lifted
9import GHC.Conc (labelThread)
10#endif
11
12import Control.Arrow
13import Control.Concurrent.STM
14import Control.Monad
15import Data.ByteString (ByteString,hPut)
16import Data.Function
17import Data.Hashable
18import Data.Maybe
19import Data.Ord
20import Data.Time.Clock.POSIX
21import Data.Word
22import Network.BSD
23import Network.Socket
24import System.Timeout
25import System.IO
26import System.IO.Error
27
28import DebugTag
29import DPut
30import Connection.Tcp (socketFamily)
31import qualified Data.MinMaxPSQ as MM
32import Network.QueryResponse
33
34data TCPSession st
35 = PendingTCPSession
36 | TCPSession
37 { tcpHandle :: Handle
38 , tcpState :: st
39 , tcpThread :: ThreadId
40 }
41
42newtype TCPAddress = TCPAddress SockAddr
43 deriving (Eq,Ord,Show)
44
45instance Hashable TCPAddress where
46 hashWithSalt salt (TCPAddress x) = case x of
47 SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr)
48 SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d)
49 _ -> 0
50
51data TCPCache st = TCPCache
52 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
53 , tcpMax :: Int
54 }
55
56data SessionProtocol x y = SessionProtocol
57 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
58 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
59 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
60 }
61
62data StreamHandshake addr x y = StreamHandshake
63 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
64 , streamAddr :: addr -> SockAddr
65 }
66
67killSession :: TCPSession st -> IO ()
68killSession PendingTCPSession = return ()
69killSession TCPSession{tcpThread=t} = killThread t
70
71showStat r = case r of PendingTCPSession -> "pending."
72 TCPSession {} -> "established."
73
74acquireConnection :: MVar (Maybe (Either a (x, addr)))
75 -> TCPCache (SessionProtocol x y)
76 -> StreamHandshake addr x y
77 -> addr
78 -> Bool
79 -> IO (Maybe (y -> IO ()))
80acquireConnection mvar tcpcache stream addr bDoCon = do
81 now <- getPOSIXTime
82 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
83 entry <- atomically $ do
84 c <- readTVar (lru tcpcache)
85 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
86 case v of
87 Nothing | bDoCon -> writeTVar (lru tcpcache)
88 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
89 | otherwise -> return ()
90 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now)
91 return v
92 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
93 case entry of
94 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
95 proto <- getProtocolNumber "tcp"
96 mh <- catchIOError (do h <- timeout 10000000 $ do
97 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
98 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
99 h <- socketToHandle sock ReadWriteMode
100 hSetBuffering h NoBuffering
101 return h
102 return h)
103 $ \e -> return Nothing
104 ret <- fmap join $ forM mh $ \h -> do
105 st <- streamHello stream addr h
106 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
107 signal <- newTVarIO False
108 rthread <- forkIO $ do
109 atomically (readTVar signal >>= check)
110 fix $ \loop -> do
111 x <- streamDecode st
112 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
113 case x of
114 Just u -> do
115 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
116 when (isNothing m) $ do
117 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
118 tryTakeMVar mvar
119 return ()
120 loop
121 Nothing -> do
122 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
123 do atomically $ modifyTVar' (lru tcpcache)
124 $ MM.delete (TCPAddress $ streamAddr stream addr)
125 c <- atomically $ readTVar (lru tcpcache)
126 now <- getPOSIXTime
127 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
128 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
129 hClose h
130 let showAddr a = show (streamAddr stream a)
131 labelThread rthread ("tcp:"++showAddr addr)
132 let v = TCPSession
133 { tcpHandle = h
134 , tcpState = st
135 , tcpThread = rthread
136 }
137 t <- getPOSIXTime
138 retires <- atomically $ do
139 c <- readTVar (lru tcpcache)
140 let (rs,c') = MM.takeView (tcpMax tcpcache)
141 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
142 writeTVar (lru tcpcache) c'
143 writeTVar signal True
144 return rs
145 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
146 myThreadId >>= flip labelThread ("tcp-close:"++show k)
147 dput XTCP $ "TCP dropped: " ++ show k
148 killSession r
149 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
150 streamGoodbye st
151 hClose h
152 _ -> return ()
153
154 return $ Just $ streamEncode st
155 when (isNothing ret) $ do
156 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
157 return ret
158 Just (tm, PendingTCPSession)
159 | not bDoCon -> return Nothing
160 | otherwise -> fmap join $ timeout 10000000 $ atomically $ do
161 c <- readTVar (lru tcpcache)
162 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
163 case v of
164 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
165 Nothing -> return Nothing
166 _ -> retry
167 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
168
169closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
170closeAll tcpcache stream = do
171 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
172 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
173 killSession r
174 case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h
175 _ -> return ()
176
177tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
178 -> StreamHandshake addr x y
179 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
180tcpTransport maxcon stream = do
181 msgvar <- newEmptyMVar
182 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
183 return $ (,) tcpcache Transport
184 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
185 , sendMessage = \addr (bDoCon,y) -> do
186 t <- forkIO $ do
187 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
188 mapM_ ($ y) msock
189 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
190 labelThread t "tcp-send"
191 , closeTransport = closeAll tcpcache stream
192 }