summaryrefslogtreecommitdiff
path: root/src/Network/QueryResponse/TCP.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/QueryResponse/TCP.hs')
-rw-r--r--src/Network/QueryResponse/TCP.hs57
1 files changed, 32 insertions, 25 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 83ae367f..7efe6966 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -45,77 +45,84 @@ data TCPCache st = TCPCache
45 , tcpMax :: Int 45 , tcpMax :: Int
46 } 46 }
47 47
48data StreamTransform st x y = StreamTransform 48data SessionProtocol x y = SessionProtocol
49 { streamHello :: Handle -> IO st -- ^ "Hello" protocol upon fresh connection. 49 { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination.
50 , streamGoodbye :: st -> Handle -> IO () -- ^ "Goodbye" protocol upon termination. 50 , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages.
51 , streamDecode :: st -> Handle -> IO (Maybe x) -- ^ Parse inbound messages. 51 , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages.
52 , streamEncode :: st -> y -> IO ByteString -- ^ Serialize outbound messages.
53 } 52 }
54 53
55acquireConnection :: MVar (Maybe (Either a (x, SockAddr))) 54data StreamHandshake addr x y = StreamHandshake
56 -> TCPCache st 55 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
57 -> StreamTransform st x y 56 , streamAddr :: addr -> SockAddr
58 -> SockAddr 57 }
58
59acquireConnection :: MVar (Maybe (Either a (x, addr)))
60 -> TCPCache (SessionProtocol x y)
61 -> StreamHandshake addr x y
62 -> addr
59 -> IO (Maybe (y -> IO ())) 63 -> IO (Maybe (y -> IO ()))
60acquireConnection mvar tcpcache stream addr = do 64acquireConnection mvar tcpcache stream addr = do
61 cache <- atomically $ readTVar (lru tcpcache) 65 cache <- atomically $ readTVar (lru tcpcache)
62 case MM.lookup' (TCPAddress addr) cache of 66 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of
63 Nothing -> do 67 Nothing -> do
64 proto <- getProtocolNumber "tcp" 68 proto <- getProtocolNumber "tcp"
65 sock <- socket (socketFamily addr) Stream proto 69 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
66 connect sock addr `catchIOError` (\e -> close sock) 70 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
67 h <- socketToHandle sock ReadWriteMode 71 h <- socketToHandle sock ReadWriteMode
68 st <- streamHello stream h 72 st <- streamHello stream addr h
69 t <- getPOSIXTime 73 t <- getPOSIXTime
70 mh <- newMVar h 74 mh <- newMVar h
71 rthread <- forkIO $ fix $ \loop -> do 75 rthread <- forkIO $ fix $ \loop -> do
72 x <- streamDecode stream st h 76 x <- streamDecode st h
73 putMVar mvar $ fmap (\u -> Right (u, addr)) x 77 putMVar mvar $ fmap (\u -> Right (u, addr)) x
74 case x of 78 case x of
75 Just _ -> loop 79 Just _ -> loop
76 Nothing -> do 80 Nothing -> do
77 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress addr) 81 atomically $ modifyTVar' (lru tcpcache)
82 $ MM.delete (TCPAddress $ streamAddr stream addr)
78 hClose h 83 hClose h
79 labelThread rthread ("tcp:"++show addr) 84 let showAddr a = show (streamAddr stream a)
85 labelThread rthread ("tcp:"++showAddr addr)
80 let v = TCPSession 86 let v = TCPSession
81 { tcpHandle = mh 87 { tcpHandle = mh
82 , tcpState = st 88 , tcpState = st
83 , tcpThread = rthread 89 , tcpThread = rthread
84 } 90 }
85 let (retires,cache') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress addr) v t cache 91 let (retires,cache') = MM.takeView (tcpMax tcpcache)
92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache
86 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
87 myThreadId >>= flip labelThread ("tcp-close:"++show k) 94 myThreadId >>= flip labelThread ("tcp-close:"++show k)
88 killThread (tcpThread r) 95 killThread (tcpThread r)
89 h <- takeMVar (tcpHandle r) 96 h <- takeMVar (tcpHandle r)
90 streamGoodbye stream st h 97 streamGoodbye st h
91 hClose h 98 hClose h
92 atomically $ writeTVar (lru tcpcache) cache' 99 atomically $ writeTVar (lru tcpcache) cache'
93 100
94 return $ Just $ \y -> do 101 return $ Just $ \y -> do
95 bs <- streamEncode stream st y 102 bs <- streamEncode st y
96 withMVar mh (`hPut` bs) 103 withMVar mh (`hPut` bs)
97 Just (tm,v) -> do 104 Just (tm,v) -> do
98 t <- getPOSIXTime 105 t <- getPOSIXTime
99 let TCPSession { tcpHandle = mh, tcpState = st } = v 106 let TCPSession { tcpHandle = mh, tcpState = st } = v
100 cache' = MM.insert' (TCPAddress addr) v t cache 107 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache
101 atomically $ writeTVar (lru tcpcache) cache' 108 atomically $ writeTVar (lru tcpcache) cache'
102 return $ Just $ \y -> do 109 return $ Just $ \y -> do
103 bs <- streamEncode stream st y 110 bs <- streamEncode st y
104 withMVar mh (`hPut` bs) 111 withMVar mh (`hPut` bs)
105 112
106closeAll :: TCPCache st -> StreamTransform st x y -> IO () 113closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
107closeAll tcpcache stream = do 114closeAll tcpcache stream = do
108 cache <- atomically $ readTVar (lru tcpcache) 115 cache <- atomically $ readTVar (lru tcpcache)
109 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 116 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
110 let st = tcpState r 117 let st = tcpState r
111 killThread (tcpThread r) 118 killThread (tcpThread r)
112 h <- takeMVar $ tcpHandle r 119 h <- takeMVar $ tcpHandle r
113 streamGoodbye stream st h 120 streamGoodbye st h
114 hClose h 121 hClose h
115 122
116tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 123tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
117 -> StreamTransform st x y 124 -> StreamHandshake addr x y
118 -> IO (TransportA err SockAddr x y) 125 -> IO (TransportA err addr x y)
119tcpTransport maxcon stream = do 126tcpTransport maxcon stream = do
120 msgvar <- newEmptyMVar 127 msgvar <- newEmptyMVar
121 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 128 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)