diff options
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 57 |
1 files changed, 32 insertions, 25 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 83ae367f..7efe6966 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -45,77 +45,84 @@ data TCPCache st = TCPCache | |||
45 | , tcpMax :: Int | 45 | , tcpMax :: Int |
46 | } | 46 | } |
47 | 47 | ||
48 | data StreamTransform st x y = StreamTransform | 48 | data SessionProtocol x y = SessionProtocol |
49 | { streamHello :: Handle -> IO st -- ^ "Hello" protocol upon fresh connection. | 49 | { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. |
50 | , streamGoodbye :: st -> Handle -> IO () -- ^ "Goodbye" protocol upon termination. | 50 | , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. |
51 | , streamDecode :: st -> Handle -> IO (Maybe x) -- ^ Parse inbound messages. | 51 | , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. |
52 | , streamEncode :: st -> y -> IO ByteString -- ^ Serialize outbound messages. | ||
53 | } | 52 | } |
54 | 53 | ||
55 | acquireConnection :: MVar (Maybe (Either a (x, SockAddr))) | 54 | data StreamHandshake addr x y = StreamHandshake |
56 | -> TCPCache st | 55 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. |
57 | -> StreamTransform st x y | 56 | , streamAddr :: addr -> SockAddr |
58 | -> SockAddr | 57 | } |
58 | |||
59 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | ||
60 | -> TCPCache (SessionProtocol x y) | ||
61 | -> StreamHandshake addr x y | ||
62 | -> addr | ||
59 | -> IO (Maybe (y -> IO ())) | 63 | -> IO (Maybe (y -> IO ())) |
60 | acquireConnection mvar tcpcache stream addr = do | 64 | acquireConnection mvar tcpcache stream addr = do |
61 | cache <- atomically $ readTVar (lru tcpcache) | 65 | cache <- atomically $ readTVar (lru tcpcache) |
62 | case MM.lookup' (TCPAddress addr) cache of | 66 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of |
63 | Nothing -> do | 67 | Nothing -> do |
64 | proto <- getProtocolNumber "tcp" | 68 | proto <- getProtocolNumber "tcp" |
65 | sock <- socket (socketFamily addr) Stream proto | 69 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
66 | connect sock addr `catchIOError` (\e -> close sock) | 70 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
67 | h <- socketToHandle sock ReadWriteMode | 71 | h <- socketToHandle sock ReadWriteMode |
68 | st <- streamHello stream h | 72 | st <- streamHello stream addr h |
69 | t <- getPOSIXTime | 73 | t <- getPOSIXTime |
70 | mh <- newMVar h | 74 | mh <- newMVar h |
71 | rthread <- forkIO $ fix $ \loop -> do | 75 | rthread <- forkIO $ fix $ \loop -> do |
72 | x <- streamDecode stream st h | 76 | x <- streamDecode st h |
73 | putMVar mvar $ fmap (\u -> Right (u, addr)) x | 77 | putMVar mvar $ fmap (\u -> Right (u, addr)) x |
74 | case x of | 78 | case x of |
75 | Just _ -> loop | 79 | Just _ -> loop |
76 | Nothing -> do | 80 | Nothing -> do |
77 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress addr) | 81 | atomically $ modifyTVar' (lru tcpcache) |
82 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
78 | hClose h | 83 | hClose h |
79 | labelThread rthread ("tcp:"++show addr) | 84 | let showAddr a = show (streamAddr stream a) |
85 | labelThread rthread ("tcp:"++showAddr addr) | ||
80 | let v = TCPSession | 86 | let v = TCPSession |
81 | { tcpHandle = mh | 87 | { tcpHandle = mh |
82 | , tcpState = st | 88 | , tcpState = st |
83 | , tcpThread = rthread | 89 | , tcpThread = rthread |
84 | } | 90 | } |
85 | let (retires,cache') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress addr) v t cache | 91 | let (retires,cache') = MM.takeView (tcpMax tcpcache) |
92 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache | ||
86 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 93 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
87 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 94 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
88 | killThread (tcpThread r) | 95 | killThread (tcpThread r) |
89 | h <- takeMVar (tcpHandle r) | 96 | h <- takeMVar (tcpHandle r) |
90 | streamGoodbye stream st h | 97 | streamGoodbye st h |
91 | hClose h | 98 | hClose h |
92 | atomically $ writeTVar (lru tcpcache) cache' | 99 | atomically $ writeTVar (lru tcpcache) cache' |
93 | 100 | ||
94 | return $ Just $ \y -> do | 101 | return $ Just $ \y -> do |
95 | bs <- streamEncode stream st y | 102 | bs <- streamEncode st y |
96 | withMVar mh (`hPut` bs) | 103 | withMVar mh (`hPut` bs) |
97 | Just (tm,v) -> do | 104 | Just (tm,v) -> do |
98 | t <- getPOSIXTime | 105 | t <- getPOSIXTime |
99 | let TCPSession { tcpHandle = mh, tcpState = st } = v | 106 | let TCPSession { tcpHandle = mh, tcpState = st } = v |
100 | cache' = MM.insert' (TCPAddress addr) v t cache | 107 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache |
101 | atomically $ writeTVar (lru tcpcache) cache' | 108 | atomically $ writeTVar (lru tcpcache) cache' |
102 | return $ Just $ \y -> do | 109 | return $ Just $ \y -> do |
103 | bs <- streamEncode stream st y | 110 | bs <- streamEncode st y |
104 | withMVar mh (`hPut` bs) | 111 | withMVar mh (`hPut` bs) |
105 | 112 | ||
106 | closeAll :: TCPCache st -> StreamTransform st x y -> IO () | 113 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
107 | closeAll tcpcache stream = do | 114 | closeAll tcpcache stream = do |
108 | cache <- atomically $ readTVar (lru tcpcache) | 115 | cache <- atomically $ readTVar (lru tcpcache) |
109 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 116 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
110 | let st = tcpState r | 117 | let st = tcpState r |
111 | killThread (tcpThread r) | 118 | killThread (tcpThread r) |
112 | h <- takeMVar $ tcpHandle r | 119 | h <- takeMVar $ tcpHandle r |
113 | streamGoodbye stream st h | 120 | streamGoodbye st h |
114 | hClose h | 121 | hClose h |
115 | 122 | ||
116 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 123 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
117 | -> StreamTransform st x y | 124 | -> StreamHandshake addr x y |
118 | -> IO (TransportA err SockAddr x y) | 125 | -> IO (TransportA err addr x y) |
119 | tcpTransport maxcon stream = do | 126 | tcpTransport maxcon stream = do |
120 | msgvar <- newEmptyMVar | 127 | msgvar <- newEmptyMVar |
121 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 128 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |