summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse')
-rw-r--r--src/Network/QueryResponse/TCP.hs147
1 files changed, 92 insertions, 55 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index efeab305..a606c51d 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -14,6 +14,7 @@ import Control.Monad
14import Data.ByteString (ByteString,hPut) 14import Data.ByteString (ByteString,hPut)
15import Data.Function 15import Data.Function
16import Data.Hashable 16import Data.Hashable
17import Data.Maybe
17import Data.Ord 18import Data.Ord
18import Data.Time.Clock.POSIX 19import Data.Time.Clock.POSIX
19import Data.Word 20import Data.Word
@@ -29,11 +30,13 @@ import Connection.Tcp (socketFamily)
29import qualified Data.MinMaxPSQ as MM 30import qualified Data.MinMaxPSQ as MM
30import Network.QueryResponse 31import Network.QueryResponse
31 32
32data TCPSession st = TCPSession 33data TCPSession st
33 { tcpHandle :: Handle 34 = PendingTCPSession
34 , tcpState :: st 35 | TCPSession
35 , tcpThread :: ThreadId 36 { tcpHandle :: Handle
36 } 37 , tcpState :: st
38 , tcpThread :: ThreadId
39 }
37 40
38newtype TCPAddress = TCPAddress SockAddr 41newtype TCPAddress = TCPAddress SockAddr
39 deriving (Eq,Ord) 42 deriving (Eq,Ord)
@@ -60,6 +63,10 @@ data StreamHandshake addr x y = StreamHandshake
60 , streamAddr :: addr -> SockAddr 63 , streamAddr :: addr -> SockAddr
61 } 64 }
62 65
66killSession :: TCPSession st -> IO ()
67killSession PendingTCPSession = return ()
68killSession TCPSession{tcpThread=t} = killThread t
69
63acquireConnection :: MVar (Maybe (Either a (x, addr))) 70acquireConnection :: MVar (Maybe (Either a (x, addr)))
64 -> TCPCache (SessionProtocol x y) 71 -> TCPCache (SessionProtocol x y)
65 -> StreamHandshake addr x y 72 -> StreamHandshake addr x y
@@ -67,65 +74,95 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
67 -> Bool 74 -> Bool
68 -> IO (Maybe (y -> IO ())) 75 -> IO (Maybe (y -> IO ()))
69acquireConnection mvar tcpcache stream addr bDoCon = do 76acquireConnection mvar tcpcache stream addr bDoCon = do
70 cache <- atomically $ readTVar (lru tcpcache) 77 now <- getPOSIXTime
71 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of 78 entry <- atomically $ do
79 c <- readTVar (lru tcpcache)
80 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
81 case v of
82 Nothing | bDoCon -> writeTVar (lru tcpcache)
83 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
84 | otherwise -> return ()
85 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now)
86 return v
87 case entry of
72 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 88 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
73 proto <- getProtocolNumber "tcp" 89 proto <- getProtocolNumber "tcp"
74 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto 90 mh <- catchIOError (do h <- timeout 10000000 $ do
75 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) 91 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
76 h <- socketToHandle sock ReadWriteMode 92 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
77 return $ Just h) 93 h <- socketToHandle sock ReadWriteMode
94 hSetBuffering h NoBuffering
95 return h
96 return h)
78 $ \e -> return Nothing 97 $ \e -> return Nothing
79 fmap join $ forM mh $ \h -> do 98 ret <- fmap join $ forM mh $ \h -> do
80 st <- streamHello stream addr h 99 st <- streamHello stream addr h
81 t <- getPOSIXTime 100 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
82 rthread <- forkIO $ fix $ \loop -> do 101 rthread <- forkIO $ fix $ \loop -> do
83 x <- streamDecode st 102 x <- streamDecode st
84 case x of 103 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
85 Just u -> do 104 case x of
86 timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) 105 Just u -> do
87 loop 106 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
88 Nothing -> do 107 when (isNothing m) $ do
89 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) 108 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet."
90 atomically $ modifyTVar' (lru tcpcache) 109 loop
91 $ MM.delete (TCPAddress $ streamAddr stream addr) 110 Nothing -> do
92 hClose h 111 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
93 let showAddr a = show (streamAddr stream a) 112 atomically $ modifyTVar' (lru tcpcache)
94 labelThread rthread ("tcp:"++showAddr addr) 113 $ MM.delete (TCPAddress $ streamAddr stream addr)
95 let v = TCPSession 114 c <- atomically $ readTVar (lru tcpcache)
96 { tcpHandle = h 115 now <- getPOSIXTime
97 , tcpState = st 116 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
98 , tcpThread = rthread 117 let stat = case r of
99 } 118 PendingTCPSession -> "pending."
100 retires <- atomically $ do 119 TCPSession {} -> "established."
101 c <- readTVar (lru tcpcache) 120 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat]
102 let (rs,c') = MM.takeView (tcpMax tcpcache) 121 hClose h
103 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c 122 let showAddr a = show (streamAddr stream a)
104 writeTVar (lru tcpcache) c' 123 labelThread rthread ("tcp:"++showAddr addr)
105 return rs 124 let v = TCPSession
106 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 125 { tcpHandle = h
107 myThreadId >>= flip labelThread ("tcp-close:"++show k) 126 , tcpState = st
108 dput XTCP $ "TCP dropped: " ++ show k 127 , tcpThread = rthread
109 killThread (tcpThread r) 128 }
110 streamGoodbye st 129 t <- getPOSIXTime
111 hClose (tcpHandle r) 130 retires <- atomically $ do
131 c <- readTVar (lru tcpcache)
132 let (rs,c') = MM.takeView (tcpMax tcpcache)
133 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
134 writeTVar (lru tcpcache) c'
135 return rs
136 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
137 myThreadId >>= flip labelThread ("tcp-close:"++show k)
138 dput XTCP $ "TCP dropped: " ++ show k
139 killSession r
140 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
141 streamGoodbye st
142 hClose h
143 _ -> return ()
112 144
113 return $ Just $ streamEncode st 145 return $ Just $ streamEncode st
114 Just (tm,v) -> do 146 when (isNothing ret) $ do
115 t <- getPOSIXTime 147 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
116 let TCPSession { tcpHandle = h, tcpState = st } = v 148 return ret
117 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache 149 Just (tm, PendingTCPSession) -> do
118 atomically $ writeTVar (lru tcpcache) cache' 150 fmap join $ timeout 10000000 $ atomically $ do
119 return $ Just $ streamEncode st 151 c <- readTVar (lru tcpcache)
152 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
153 case v of
154 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
155 Nothing -> return Nothing
156 _ -> retry
157 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
120 158
121closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () 159closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
122closeAll tcpcache stream = do 160closeAll tcpcache stream = do
123 cache <- atomically $ swapTVar (lru tcpcache) MM.empty 161 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
124 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 162 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
125 let st = tcpState r 163 killSession r
126 killThread (tcpThread r) 164 case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h
127 streamGoodbye st 165 _ -> return ()
128 hClose (tcpHandle r)
129 166
130tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 167tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
131 -> StreamHandshake addr x y 168 -> StreamHandshake addr x y