diff options
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 32 | ||||
-rw-r--r-- | src/Network/Tox.hs | 2 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 76 |
3 files changed, 65 insertions, 45 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 328a19e1..efeab305 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -19,9 +19,12 @@ import Data.Time.Clock.POSIX | |||
19 | import Data.Word | 19 | import Data.Word |
20 | import Network.BSD | 20 | import Network.BSD |
21 | import Network.Socket | 21 | import Network.Socket |
22 | import System.Timeout | ||
22 | import System.IO | 23 | import System.IO |
23 | import System.IO.Error | 24 | import System.IO.Error |
24 | 25 | ||
26 | import DebugTag | ||
27 | import DPut | ||
25 | import Connection.Tcp (socketFamily) | 28 | import Connection.Tcp (socketFamily) |
26 | import qualified Data.MinMaxPSQ as MM | 29 | import qualified Data.MinMaxPSQ as MM |
27 | import Network.QueryResponse | 30 | import Network.QueryResponse |
@@ -61,11 +64,12 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
61 | -> TCPCache (SessionProtocol x y) | 64 | -> TCPCache (SessionProtocol x y) |
62 | -> StreamHandshake addr x y | 65 | -> StreamHandshake addr x y |
63 | -> addr | 66 | -> addr |
67 | -> Bool | ||
64 | -> IO (Maybe (y -> IO ())) | 68 | -> IO (Maybe (y -> IO ())) |
65 | acquireConnection mvar tcpcache stream addr = do | 69 | acquireConnection mvar tcpcache stream addr bDoCon = do |
66 | cache <- atomically $ readTVar (lru tcpcache) | 70 | cache <- atomically $ readTVar (lru tcpcache) |
67 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of | 71 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of |
68 | Nothing -> do | 72 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
69 | proto <- getProtocolNumber "tcp" | 73 | proto <- getProtocolNumber "tcp" |
70 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | 74 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
71 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 75 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
@@ -77,10 +81,12 @@ acquireConnection mvar tcpcache stream addr = do | |||
77 | t <- getPOSIXTime | 81 | t <- getPOSIXTime |
78 | rthread <- forkIO $ fix $ \loop -> do | 82 | rthread <- forkIO $ fix $ \loop -> do |
79 | x <- streamDecode st | 83 | x <- streamDecode st |
80 | putMVar mvar $ fmap (\u -> Right (u, addr)) x | ||
81 | case x of | 84 | case x of |
82 | Just _ -> loop | 85 | Just u -> do |
86 | timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | ||
87 | loop | ||
83 | Nothing -> do | 88 | Nothing -> do |
89 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | ||
84 | atomically $ modifyTVar' (lru tcpcache) | 90 | atomically $ modifyTVar' (lru tcpcache) |
85 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 91 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
86 | hClose h | 92 | hClose h |
@@ -91,14 +97,18 @@ acquireConnection mvar tcpcache stream addr = do | |||
91 | , tcpState = st | 97 | , tcpState = st |
92 | , tcpThread = rthread | 98 | , tcpThread = rthread |
93 | } | 99 | } |
94 | let (retires,cache') = MM.takeView (tcpMax tcpcache) | 100 | retires <- atomically $ do |
95 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache | 101 | c <- readTVar (lru tcpcache) |
102 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
103 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
104 | writeTVar (lru tcpcache) c' | ||
105 | return rs | ||
96 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 106 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
97 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 107 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
108 | dput XTCP $ "TCP dropped: " ++ show k | ||
98 | killThread (tcpThread r) | 109 | killThread (tcpThread r) |
99 | streamGoodbye st | 110 | streamGoodbye st |
100 | hClose (tcpHandle r) | 111 | hClose (tcpHandle r) |
101 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress k) | ||
102 | 112 | ||
103 | return $ Just $ streamEncode st | 113 | return $ Just $ streamEncode st |
104 | Just (tm,v) -> do | 114 | Just (tm,v) -> do |
@@ -119,15 +129,15 @@ closeAll tcpcache stream = do | |||
119 | 129 | ||
120 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 130 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
121 | -> StreamHandshake addr x y | 131 | -> StreamHandshake addr x y |
122 | -> IO (TransportA err addr x y) | 132 | -> IO (TransportA err addr x (Bool,y)) |
123 | tcpTransport maxcon stream = do | 133 | tcpTransport maxcon stream = do |
124 | msgvar <- newEmptyMVar | 134 | msgvar <- newEmptyMVar |
125 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 135 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
126 | return Transport | 136 | return Transport |
127 | { awaitMessage = (takeMVar msgvar >>=) | 137 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
128 | , sendMessage = \addr y -> do | 138 | , sendMessage = \addr (bDoCon,y) -> do |
129 | t <- forkIO $ do | 139 | t <- forkIO $ do |
130 | msock <- acquireConnection msgvar tcpcache stream addr | 140 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
131 | mapM_ ($ y) msock | 141 | mapM_ ($ y) msock |
132 | `catchIOError` \e -> return () | 142 | `catchIOError` \e -> return () |
133 | labelThread t "tcp-send" | 143 | labelThread t "tcp-send" |
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs index d0304170..c14339e4 100644 --- a/src/Network/Tox.hs +++ b/src/Network/Tox.hs | |||
@@ -328,7 +328,7 @@ newToxOverTransport keydb addr onNewSession suppliedDHTKey udp tcp = do | |||
328 | (orouter,otbl) <- newOnionRouter crypto (dput XRoutes) | 328 | (orouter,otbl) <- newOnionRouter crypto (dput XRoutes) |
329 | (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) | 329 | (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) |
330 | <- toxTransport crypto orouter lookupClose udp | 330 | <- toxTransport crypto orouter lookupClose udp |
331 | (sendMessage (clientNet $ tcpClient $ tcpKademliaClient orouter)) | 331 | (\dst x -> sendMessage (clientNet $ tcpClient $ tcpKademliaClient orouter) dst (True,x)) |
332 | tcp | 332 | tcp |
333 | sessions <- initSessions (sendMessage cryptonet) | 333 | sessions <- initSessions (sendMessage cryptonet) |
334 | 334 | ||
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index a7881c24..71050fe8 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -7,6 +7,7 @@ module Network.Tox.TCP | |||
7 | , NodeInfo(..) | 7 | , NodeInfo(..) |
8 | ) where | 8 | ) where |
9 | 9 | ||
10 | import Debug.Trace | ||
10 | import Control.Arrow | 11 | import Control.Arrow |
11 | import Control.Concurrent | 12 | import Control.Concurrent |
12 | import Control.Concurrent.STM | 13 | import Control.Concurrent.STM |
@@ -27,6 +28,7 @@ import qualified Data.Vector as Vector | |||
27 | import Network.Socket (SockAddr(..)) | 28 | import Network.Socket (SockAddr(..)) |
28 | import qualified Text.ParserCombinators.ReadP as RP | 29 | import qualified Text.ParserCombinators.ReadP as RP |
29 | import System.IO.Error | 30 | import System.IO.Error |
31 | import System.Timeout | ||
30 | 32 | ||
31 | import ControlMaybe | 33 | import ControlMaybe |
32 | import Crypto.Tox | 34 | import Crypto.Tox |
@@ -113,15 +115,16 @@ tcpStream crypto = StreamHandshake | |||
113 | dput XTCP $ "TCP: Failed to decode packet." | 115 | dput XTCP $ "TCP: Failed to decode packet." |
114 | return Nothing | 116 | return Nothing |
115 | Right x -> do | 117 | Right x -> do |
116 | n24 <- takeMVar nread | 118 | m24 <- timeout 100000 (takeMVar nread) |
117 | let r = decrypt (noncef' n24) x >>= decodePlain | 119 | fmap join $ forM m24 $ \n24 -> do |
118 | putMVar nread (incrementNonce24 n24) | 120 | let r = decrypt (noncef' n24) x >>= decodePlain |
119 | either (dput XTCP) | 121 | putMVar nread (incrementNonce24 n24) |
120 | (\x' -> do | 122 | either (dput XTCP) |
121 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' | 123 | (\x' -> do |
122 | return ()) | 124 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' |
123 | r | 125 | return ()) |
124 | return $ either (const Nothing) Just r | 126 | r |
127 | return $ either (const Nothing) Just r | ||
125 | in go `catchIOError` \e -> do | 128 | in go `catchIOError` \e -> do |
126 | dput XTCP $ "TCP exception: " ++ show e | 129 | dput XTCP $ "TCP exception: " ++ show e |
127 | return Nothing | 130 | return Nothing |
@@ -129,14 +132,14 @@ tcpStream crypto = StreamHandshake | |||
129 | n24 <- takeMVar nsend | 132 | n24 <- takeMVar nsend |
130 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y | 133 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y |
131 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | 134 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y |
132 | hPut h $ encode (fromIntegral $ Data.ByteString.length bs :: Word16) | 135 | hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) |
133 | <> bs | 136 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e |
134 | putMVar nsend (incrementNonce24 n24) | 137 | putMVar nsend (incrementNonce24 n24) |
135 | } | 138 | } |
136 | , streamAddr = nodeAddr | 139 | , streamAddr = nodeAddr |
137 | } | 140 | } |
138 | 141 | ||
139 | toxTCP :: TransportCrypto -> IO (Transport err NodeInfo RelayPacket) | 142 | toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket)) |
140 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) | 143 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) |
141 | 144 | ||
142 | tcpSpace :: KademliaSpace NodeId NodeInfo | 145 | tcpSpace :: KademliaSpace NodeId NodeInfo |
@@ -153,7 +156,7 @@ nodeSearch tcp = Search | |||
153 | 156 | ||
154 | data TCPClient err meth tid = TCPClient | 157 | data TCPClient err meth tid = TCPClient |
155 | { tcpCrypto :: TransportCrypto | 158 | { tcpCrypto :: TransportCrypto |
156 | , tcpClient :: Client err () tid NodeInfo RelayPacket | 159 | , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket) |
157 | , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) | 160 | , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) |
158 | } | 161 | } |
159 | 162 | ||
@@ -200,8 +203,8 @@ getUDPNodes' tcp seeking dst0 = do | |||
200 | wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) | 203 | wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) |
201 | let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse) | 204 | let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse) |
202 | { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout | 205 | { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout |
203 | , method = () -- meth | 206 | , method = OnionPacketID -- meth |
204 | , wrapQuery = \n8 src gateway x -> | 207 | , wrapQuery = \n8 src gateway x -> (,) True $ |
205 | OnionPacket n24 $ Addressed (UDP.nodeAddr dst) | 208 | OnionPacket n24 $ Addressed (UDP.nodeAddr dst) |
206 | $ wrapOnionPure b (wrap2 n24) (nodeAddr gateway') | 209 | $ wrapOnionPure b (wrap2 n24) (nodeAddr gateway') |
207 | $ wrapOnionPure c (wrap1 n24) (UDP.nodeAddr dst) | 210 | $ wrapOnionPure c (wrap1 n24) (UDP.nodeAddr dst) |
@@ -212,7 +215,7 @@ getUDPNodes' tcp seeking dst0 = do | |||
212 | , asymmData = pure (x,n8) | 215 | , asymmData = pure (x,n8) |
213 | } | 216 | } |
214 | , unwrapResponse = \case | 217 | , unwrapResponse = \case |
215 | OnionPacketResponse (OnionAnnounceResponse _ n24' r) | 218 | (_,OnionPacketResponse (OnionAnnounceResponse _ n24' r)) |
216 | -> decrypt (wrap0 n24') r >>= decodePlain | 219 | -> decrypt (wrap0 n24') r >>= decodePlain |
217 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x | 220 | x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x |
218 | } | 221 | } |
@@ -222,26 +225,28 @@ getUDPNodes' tcp seeking dst0 = do | |||
222 | return ( (ns,ns, const () <$> mb), gateway ) | 225 | return ( (ns,ns, const () <$> mb), gateway ) |
223 | 226 | ||
224 | 227 | ||
225 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (RelayPacket -> RelayPacket)) | 228 | handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) |
226 | handleOOB k bs src dst = do | 229 | handleOOB k bs src dst = do |
227 | dput XMisc $ "TODO: handleOOB " ++ show src | 230 | dput XMisc $ "TODO: handleOOB " ++ show src |
228 | return Nothing | 231 | return Nothing |
229 | 232 | ||
230 | handle2route :: OnionMessage Encrypted -> NodeInfo -> NodeInfo -> IO (Maybe (RelayPacket -> RelayPacket)) | 233 | handle2route :: OnionMessage Encrypted -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x)) |
231 | handle2route o src dst = do | 234 | handle2route o src dst = do |
232 | dput XMisc $ "TODO: handle2route " ++ show src | 235 | dput XMisc $ "TODO: handle2route " ++ show src |
233 | return Nothing | 236 | return Nothing |
234 | 237 | ||
235 | tcpPing :: Client err () Nonce8 addr RelayPacket -> addr -> IO (Maybe ()) | 238 | tcpPing :: Show addr => Client err PacketNumber Nonce8 addr (Bool,RelayPacket) -> addr -> IO (Maybe ()) |
236 | tcpPing client dst = sendQuery client meth () dst | 239 | tcpPing client dst = do |
240 | dput XTCP $ "tcpPing " ++ show dst | ||
241 | sendQuery client meth () dst | ||
237 | where meth = MethodSerializer | 242 | where meth = MethodSerializer |
238 | { wrapQuery = \n8 src dst () -> RelayPing n8 | 243 | { wrapQuery = \n8 src dst () -> (True,RelayPing n8) |
239 | , unwrapResponse = \_ -> () | 244 | , unwrapResponse = \_ -> () |
240 | , methodTimeout = \n8 dst -> return (dst,5000000) | 245 | , methodTimeout = \n8 dst -> return (dst,5000000) |
241 | , method = () | 246 | , method = PingPacket |
242 | } | 247 | } |
243 | 248 | ||
244 | type RelayClient = Client String () Nonce8 NodeInfo RelayPacket | 249 | type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) |
245 | 250 | ||
246 | -- | Create a new TCP relay client. Because polymorphic existential record | 251 | -- | Create a new TCP relay client. Because polymorphic existential record |
247 | -- updates are currently hard with GHC, this function accepts parameters for | 252 | -- updates are currently hard with GHC, this function accepts parameters for |
@@ -249,29 +254,34 @@ type RelayClient = Client String () Nonce8 NodeInfo RelayPacket | |||
249 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state | 254 | -- defaults are 'id' and 'tryPutMVar'. The resulting customized table state |
250 | -- will be returned to the caller along with the new client. | 255 | -- will be returned to the caller along with the new client. |
251 | newClient :: TransportCrypto | 256 | newClient :: TransportCrypto |
252 | -> (MVar RelayPacket -> a) -- ^ store mvar for query | 257 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query |
253 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query | 258 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query |
254 | -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 259 | -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
255 | , Client String () Nonce8 NodeInfo RelayPacket) | 260 | , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) |
256 | newClient crypto store load = do | 261 | newClient crypto store load = do |
257 | net <- toxTCP crypto | 262 | net <- toxTCP crypto |
258 | drg <- drgNew | 263 | drg <- drgNew |
259 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) | 264 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) |
260 | return $ (,) map_var Client | 265 | return $ (,) map_var Client |
261 | { clientNet = net | 266 | { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net |
262 | , clientDispatcher = DispatchMethods | 267 | , clientDispatcher = DispatchMethods |
263 | { classifyInbound = \case | 268 | { classifyInbound = (. snd) $ \case |
264 | RelayPing n -> IsQuery () n | 269 | RelayPing n -> IsQuery PingPacket n |
265 | RelayPong n -> IsResponse n | 270 | RelayPong n -> IsResponse n |
266 | OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 | 271 | OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 |
267 | OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o | 272 | OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o |
268 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs | 273 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs |
269 | , lookupHandler = \() -> Just MethodHandler | 274 | , lookupHandler = \case |
270 | { methodParse = \(RelayPing n8) -> Right () | 275 | PingPacket -> Just MethodHandler |
271 | , methodSerialize = \n8 src dst () -> RelayPong n8 | 276 | { methodParse = \(_,RelayPing n8) -> Right () |
277 | , methodSerialize = \n8 src dst () -> (False, RelayPong n8) | ||
272 | , methodAction = \src () -> return () | 278 | , methodAction = \src () -> return () |
273 | } | 279 | } |
274 | , tableMethods = transactionMethods' store load (contramap (\(Nonce8 w64) -> w64) w64MapMethods) | 280 | w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply |
281 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a | ||
282 | , noreplyAction = \addr a -> dput XTCP $ "tcp-lookupHandler: "++show w | ||
283 | } | ||
284 | , tableMethods = transactionMethods' store (\x -> load x . snd) (contramap (\(Nonce8 w64) -> w64) w64MapMethods) | ||
275 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 | 285 | $ first (either error Nonce8 . decode) . randomBytesGenerate 8 |
276 | } | 286 | } |
277 | , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors } | 287 | , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors } |