summaryrefslogtreecommitdiff
path: root/dht/src/Network/QueryResponse
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2020-01-03 15:35:23 -0500
committerJoe Crayne <joe@jerkface.net>2020-01-03 17:26:06 -0500
commit31b799222cb76cd0002d9a3cc5b340a7b6fed139 (patch)
tree8b834e455529fb270375e4967d1acad56553544f /dht/src/Network/QueryResponse
parent1e03ed3670a8386ede93a09fa0c67785e7da6478 (diff)
server library.
Diffstat (limited to 'dht/src/Network/QueryResponse')
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs223
1 files changed, 0 insertions, 223 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
deleted file mode 100644
index 0028a5b6..00000000
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ /dev/null
@@ -1,223 +0,0 @@
1{-# LANGUAGE CPP #-}
2{-# LANGUAGE GeneralizedNewtypeDeriving #-}
3{-# LANGUAGE LambdaCase #-}
4module Network.QueryResponse.TCP where
5
6#ifdef THREAD_DEBUG
7import Control.Concurrent.Lifted.Instrument
8#else
9import Control.Concurrent.Lifted
10import GHC.Conc (labelThread)
11#endif
12
13import Control.Arrow
14import Control.Concurrent.STM
15import Control.Concurrent.STM.TMVar
16import Control.Monad
17import Data.ByteString (ByteString,hPut)
18import Data.Function
19import Data.Hashable
20import Data.Maybe
21import Data.Ord
22import Data.Time.Clock.POSIX
23import Data.Word
24import Data.String (IsString(..))
25import Network.BSD
26import Network.Socket as Socket
27import System.Timeout
28import System.IO
29import System.IO.Error
30
31import DebugTag
32import DebugUtil
33import DPut
34import Connection.Tcp (socketFamily)
35import qualified Data.MinMaxPSQ as MM
36import Network.QueryResponse
37
38data TCPSession st
39 = PendingTCPSession
40 | TCPSession
41 { tcpHandle :: Handle
42 , tcpState :: st
43 , tcpThread :: ThreadId
44 }
45
46newtype TCPAddress = TCPAddress SockAddr
47 deriving (Eq,Ord,Show)
48
49instance Hashable TCPAddress where
50 hashWithSalt salt (TCPAddress x) = case x of
51 SockAddrInet port addr -> hashWithSalt salt (fromIntegral port :: Word16,addr)
52 SockAddrInet6 port b c d -> hashWithSalt salt (fromIntegral port :: Word16,b,c,d)
53 _ -> 0
54
55data TCPCache st = TCPCache
56 { lru :: TVar (MM.MinMaxPSQ' TCPAddress (Down POSIXTime) (TCPSession st))
57 , tcpMax :: Int
58 }
59
60-- This is a suitable /st/ parameter to 'TCPCache'
61data SessionProtocol x y = SessionProtocol
62 { streamGoodbye :: IO () -- ^ "Goodbye" protocol upon termination.
63 , streamDecode :: IO (Maybe x) -- ^ Parse inbound messages.
64 , streamEncode :: y -> IO () -- ^ Serialize outbound messages.
65 }
66
67data StreamHandshake addr x y = StreamHandshake
68 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
69 , streamAddr :: addr -> SockAddr
70 }
71
72killSession :: TCPSession st -> IO ()
73killSession PendingTCPSession = return ()
74killSession TCPSession{tcpThread=t} = killThread t
75
76showStat :: IsString p => TCPSession st -> p
77showStat r = case r of PendingTCPSession -> "pending."
78 TCPSession {} -> "established."
79
80tcp_timeout :: Int
81tcp_timeout = 10000000
82
83acquireConnection :: TMVar (Arrival a addr x)
84 -> TCPCache (SessionProtocol x y)
85 -> StreamHandshake addr x y
86 -> addr
87 -> Bool
88 -> IO (Maybe (y -> IO ()))
89acquireConnection mvar tcpcache stream addr bDoCon = do
90 now <- getPOSIXTime
91 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
92 entry <- atomically $ do
93 c <- readTVar (lru tcpcache)
94 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
95 case v of
96 Nothing | bDoCon -> writeTVar (lru tcpcache)
97 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
98 | otherwise -> return ()
99 Just (tm, v) -> writeTVar (lru tcpcache)
100 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) c
101 return v
102 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
103 case entry of
104 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
105 proto <- getProtocolNumber "tcp"
106 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
107 mh <- catchIOError (do h <- timeout tcp_timeout $ do
108 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
109 h <- socketToHandle sock ReadWriteMode
110 hSetBuffering h NoBuffering
111 return h
112 return h)
113 $ \e -> return Nothing
114 when (isNothing mh) $ do
115 atomically $ modifyTVar' (lru tcpcache)
116 $ MM.delete (TCPAddress $ streamAddr stream addr)
117 Socket.close sock
118 ret <- fmap join $ forM mh $ \h -> do
119 mst <- catchIOError (Just <$> streamHello stream addr h)
120 (\e -> return Nothing)
121 case mst of
122 Nothing -> do
123 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
124 return Nothing
125 Just st -> do
126 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
127 signal <- newTVarIO False
128 let showAddr a = show (streamAddr stream a)
129 rthread <- forkLabeled ("tcp:"++showAddr addr) $ do
130 atomically (readTVar signal >>= check)
131 fix $ \loop -> do
132 x <- streamDecode st
133 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
134 case x of
135 Just u -> do
136 m <- timeout tcp_timeout $ atomically (putTMVar mvar $ Arrival addr u)
137 when (isNothing m) $ do
138 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
139 atomically $ tryTakeTMVar mvar
140 return ()
141 loop
142 Nothing -> do
143 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
144 do atomically $ modifyTVar' (lru tcpcache)
145 $ MM.delete (TCPAddress $ streamAddr stream addr)
146 c <- atomically $ readTVar (lru tcpcache)
147 now <- getPOSIXTime
148 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
149 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
150 mreport <- timeout tcp_timeout $ threadReport False -- XXX: Paranoid timeout
151 case mreport of
152 Just treport -> dput XTCP treport
153 Nothing -> dput XTCP "TCP ERROR: threadReport timed out."
154 hClose h `catchIOError` \e -> return ()
155 let v = TCPSession
156 { tcpHandle = h
157 , tcpState = st
158 , tcpThread = rthread
159 }
160 t <- getPOSIXTime
161 retires <- atomically $ do
162 c <- readTVar (lru tcpcache)
163 let (rs,c') = MM.takeView (tcpMax tcpcache)
164 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
165 writeTVar (lru tcpcache) c'
166 writeTVar signal True
167 return rs
168 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
169 dput XTCP $ "TCP dropped: " ++ show k
170 killSession r
171 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
172 streamGoodbye st
173 hClose h
174 `catchIOError` \e -> return ()
175 _ -> return ()
176
177 return $ Just $ streamEncode st
178 when (isNothing ret) $ do
179 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
180 return ret
181 Just (tm, PendingTCPSession)
182 | not bDoCon -> return Nothing
183 | otherwise -> fmap join $ timeout tcp_timeout $ atomically $ do
184 c <- readTVar (lru tcpcache)
185 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
186 case v of
187 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
188 Nothing -> return Nothing
189 _ -> retry
190 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
191
192closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
193closeAll tcpcache stream = do
194 dput XTCP "TCP.closeAll called."
195 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
196 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
197 killSession r
198 case r of TCPSession{tcpState=st,tcpHandle=h} -> catchIOError (streamGoodbye st >> hClose h)
199 (\e -> return ())
200 _ -> return ()
201
202-- Use a cache of TCP client connections for sending (and receiving) packets.
203-- The boolean value prepended to the message allows the sender to specify
204-- whether or not a new connection will be initiated if neccessary. If 'False'
205-- is passed, then the packet will be sent only if there already exists a
206-- connection.
207tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
208 -> StreamHandshake addr x y
209 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
210tcpTransport maxcon stream = do
211 msgvar <- atomically newEmptyTMVar
212 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
213 return $ (,) tcpcache Transport
214 { awaitMessage = \f -> takeTMVar msgvar >>= \x -> return $ do
215 f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Terminated)
216 , sendMessage = \addr (bDoCon,y) -> do
217 void . forkLabeled "tcp-send" $ do
218 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
219 mapM_ ($ y) msock
220 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
221 , setActive = \case False -> closeAll tcpcache stream >> atomically (putTMVar msgvar Terminated)
222 True -> return ()
223 }