diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 15:35:23 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-03 17:26:06 -0500 |
commit | 31b799222cb76cd0002d9a3cc5b340a7b6fed139 (patch) | |
tree | 8b834e455529fb270375e4967d1acad56553544f /dht/src/Network/QueryResponse | |
parent | 1e03ed3670a8386ede93a09fa0c67785e7da6478 (diff) |
server library.
Diffstat (limited to 'dht/src/Network/QueryResponse')
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 223 |
1 files changed, 0 insertions, 223 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs deleted file mode 100644 index 0028a5b6..00000000 --- a/dht/src/Network/QueryResponse/TCP.hs +++ /dev/null | |||
@@ -1,223 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | module Network.QueryResponse.TCP where | ||
5 | |||
6 | #ifdef THREAD_DEBUG | ||
7 | import Control.Concurrent.Lifted.Instrument | ||
8 | #else | ||
9 | import Control.Concurrent.Lifted | ||
10 | import GHC.Conc (labelThread) | ||
11 | #endif | ||
12 | |||
13 | import Control.Arrow | ||
14 | import Control.Concurrent.STM | ||
15 | import Control.Concurrent.STM.TMVar | ||
16 | import Control.Monad | ||
17 | import Data.ByteString (ByteString,hPut) | ||
18 | import Data.Function | ||
19 | import Data.Hashable | ||
20 | import Data.Maybe | ||
21 | import Data.Ord | ||
22 | import Data.Time.Clock.POSIX | ||
23 | import Data.Word | ||
24 | import Data.String (IsString(..)) | ||
25 | import Network.BSD | ||
26 | import Network.Socket as Socket | ||
27 | import System.Timeout | ||
28 | import System.IO | ||
29 | import System.IO.Error | ||
30 | |||
31 | import DebugTag | ||
32 | import DebugUtil | ||
33 | import DPut | ||
34 | import Connection.Tcp (socketFamily) | ||
35 | import qualified Data.MinMaxPSQ as MM | ||
36 | import Network.QueryResponse | ||
37 | |||
38 | data TCPSession st | ||
39 | = PendingTCPSession | ||
40 | | TCPSession | ||
41 | { tcpHandle :: Handle | ||
42 | , tcpState :: st | ||
43 | , tcpThread :: ThreadId | ||
44 | } | ||
45 | |||
46 | newtype TCPAddress = TCPAddress SockAddr | ||
47 | deriving (Eq,Ord,Show) | ||
48 | |||
49 | instance Hashable TCPAddress where | ||
50 | hashWithSalt salt (TCPAddress x) = case x of | ||
51 | SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr) | ||
52 | SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d) | ||
53 | _ -> 0 | ||
54 | |||
55 | data TCPCache st = TCPCache | ||
56 | { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st)) | ||
57 | , tcpMax :: Int | ||
58 | } | ||
59 | |||
60 | -- This is a suitable /st/ parameter to 'TCPCache' | ||
61 | data SessionProtocol x y = SessionProtocol | ||
62 | { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination. | ||
63 | , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages. | ||
64 | , streamEncode :: y -> IO () -- ^ Serialize outbound messages. | ||
65 | } | ||
66 | |||
67 | data StreamHandshake addr x y = StreamHandshake | ||
68 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. | ||
69 | , streamAddr :: addr -> SockAddr | ||
70 | } | ||
71 | |||
72 | killSession :: TCPSession st -> IO () | ||
73 | killSession PendingTCPSession = return () | ||
74 | killSession TCPSession{tcpThread=t} = killThread t | ||
75 | |||
76 | showStat :: IsString p => TCPSession st -> p | ||
77 | showStat r = case r of PendingTCPSession -> "pending." | ||
78 | TCPSession {} -> "established." | ||
79 | |||
80 | tcp_timeout :: Int | ||
81 | tcp_timeout = 10000000 | ||
82 | |||
83 | acquireConnection :: TMVar (Arrival a addr x) | ||
84 | -> TCPCache (SessionProtocol x y) | ||
85 | -> StreamHandshake addr x y | ||
86 | -> addr | ||
87 | -> Bool | ||
88 | -> IO (Maybe (y -> IO ())) | ||
89 | acquireConnection mvar tcpcache stream addr bDoCon = do | ||
90 | now <- getPOSIXTime | ||
91 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
92 | entry <- atomically $ do | ||
93 | c <- readTVar (lru tcpcache) | ||
94 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
95 | case v of | ||
96 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
97 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
98 | | otherwise -> return () | ||
99 | Just (tm, v) -> writeTVar (lru tcpcache) | ||
100 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c | ||
101 | return v | ||
102 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
103 | case entry of | ||
104 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | ||
105 | proto <- getProtocolNumber "tcp" | ||
106 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | ||
107 | mh <- catchIOError (do h <- timeout tcp_timeout $ do | ||
108 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | ||
109 | h <- socketToHandle sock ReadWriteMode | ||
110 | hSetBuffering h NoBuffering | ||
111 | return h | ||
112 | return h) | ||
113 | $ \e -> return Nothing | ||
114 | when (isNothing mh) $ do | ||
115 | atomically $ modifyTVar' (lru tcpcache) | ||
116 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
117 | Socket.close sock | ||
118 | ret <- fmap join $ forM mh $ \h -> do | ||
119 | mst <- catchIOError (Just <$> streamHello stream addr h) | ||
120 | (\e -> return Nothing) | ||
121 | case mst of | ||
122 | Nothing -> do | ||
123 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
124 | return Nothing | ||
125 | Just st -> do | ||
126 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | ||
127 | signal <- newTVarIO False | ||
128 | let showAddr a = show (streamAddr stream a) | ||
129 | rthread <- forkLabeled ("tcp:"++showAddr addr) $ do | ||
130 | atomically (readTVar signal >>= check) | ||
131 | fix $ \loop -> do | ||
132 | x <- streamDecode st | ||
133 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | ||
134 | case x of | ||
135 | Just u -> do | ||
136 | m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u) | ||
137 | when (isNothing m) $ do | ||
138 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." | ||
139 | atomically $ tryTakeTMVar mvar | ||
140 | return () | ||
141 | loop | ||
142 | Nothing -> do | ||
143 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
144 | do atomically $ modifyTVar' (lru tcpcache) | ||
145 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
146 | c <- atomically $ readTVar (lru tcpcache) | ||
147 | now <- getPOSIXTime | ||
148 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | ||
149 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | ||
150 | mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout | ||
151 | case mreport of | ||
152 | Just treport -> dput XTCP treport | ||
153 | Nothing -> dput XTCP "TCP ERROR: threadReport timed out." | ||
154 | hClose h `catchIOError` \e -> return () | ||
155 | let v = TCPSession | ||
156 | { tcpHandle = h | ||
157 | , tcpState = st | ||
158 | , tcpThread = rthread | ||
159 | } | ||
160 | t <- getPOSIXTime | ||
161 | retires <- atomically $ do | ||
162 | c <- readTVar (lru tcpcache) | ||
163 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
164 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
165 | writeTVar (lru tcpcache) c' | ||
166 | writeTVar signal True | ||
167 | return rs | ||
168 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do | ||
169 | dput XTCP $ "TCP dropped: " ++ show k | ||
170 | killSession r | ||
171 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
172 | streamGoodbye st | ||
173 | hClose h | ||
174 | `catchIOError` \e -> return () | ||
175 | _ -> return () | ||
176 | |||
177 | return $ Just $ streamEncode st | ||
178 | when (isNothing ret) $ do | ||
179 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
180 | return ret | ||
181 | Just (tm, PendingTCPSession) | ||
182 | | not bDoCon -> return Nothing | ||
183 | | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do | ||
184 | c <- readTVar (lru tcpcache) | ||
185 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
186 | case v of | ||
187 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
188 | Nothing -> return Nothing | ||
189 | _ -> retry | ||
190 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
191 | |||
192 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | ||
193 | closeAll tcpcache stream = do | ||
194 | dput XTCP "TCP.closeAll called." | ||
195 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | ||
196 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | ||
197 | killSession r | ||
198 | case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h) | ||
199 | (\e -> return ()) | ||
200 | _ -> return () | ||
201 | |||
202 | -- Use a cache of TCP client connections for sending (and receiving) packets. | ||
203 | -- The boolean value prepended to the message allows the sender to specify | ||
204 | -- whether or not a new connection will be initiated if neccessary. If 'False' | ||
205 | -- is passed, then the packet will be sent only if there already exists a | ||
206 | -- connection. | ||
207 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | ||
208 | -> StreamHandshake addr x y | ||
209 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) | ||
210 | tcpTransport maxcon stream = do | ||
211 | msgvar <- atomically newEmptyTMVar | ||
212 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | ||
213 | return $ (,) tcpcache Transport | ||
214 | { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do | ||
215 | f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated) | ||
216 | , sendMessage = \addr (bDoCon,y) -> do | ||
217 | void . forkLabeled "tcp-send" $ do | ||
218 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | ||
219 | mapM_ ($ y) msock | ||
220 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | ||
221 | , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated) | ||
222 | True -> return () | ||
223 | } | ||