diff options
author | Joe Crayne <joe@jerkface.net> | 2018-12-02 15:53:35 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-16 14:08:26 -0500 |
commit | e64a0efd2ca29257c189343b6dc75f6bee29d66d (patch) | |
tree | 3cdef723ca371c7a0be5a851a9d0022d7cfc6fab /src/Network | |
parent | e15b8709e5091808a50630372f278fcbd844d638 (diff) |
Tox TCP client transport.
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 57 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 85 |
2 files changed, 117 insertions, 25 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index 83ae367f..7efe6966 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -45,77 +45,84 @@ data TCPCache st = TCPCache | |||
45 | , tcpMax :: Int | 45 | , tcpMax :: Int |
46 | } | 46 | } |
47 | 47 | ||
48 | data StreamTransform st x y = StreamTransform | 48 | data SessionProtocol x y = SessionProtocol |
49 | { streamHello :: Handle -> IO st -- ^ "Hello" protocol upon fresh connection. | 49 | { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination. |
50 | , streamGoodbye :: st -> Handle -> IO () -- ^ "Goodbye" protocol upon termination. | 50 | , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages. |
51 | , streamDecode :: st -> Handle -> IO (Maybe x) -- ^ Parse inbound messages. | 51 | , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages. |
52 | , streamEncode :: st -> y -> IO ByteString -- ^ Serialize outbound messages. | ||
53 | } | 52 | } |
54 | 53 | ||
55 | acquireConnection :: MVar (Maybe (Either a (x, SockAddr))) | 54 | data StreamHandshake addr x y = StreamHandshake |
56 | -> TCPCache st | 55 | { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection. |
57 | -> StreamTransform st x y | 56 | , streamAddr :: addr -> SockAddr |
58 | -> SockAddr | 57 | } |
58 | |||
59 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | ||
60 | -> TCPCache (SessionProtocol x y) | ||
61 | -> StreamHandshake addr x y | ||
62 | -> addr | ||
59 | -> IO (Maybe (y -> IO ())) | 63 | -> IO (Maybe (y -> IO ())) |
60 | acquireConnection mvar tcpcache stream addr = do | 64 | acquireConnection mvar tcpcache stream addr = do |
61 | cache <- atomically $ readTVar (lru tcpcache) | 65 | cache <- atomically $ readTVar (lru tcpcache) |
62 | case MM.lookup' (TCPAddress addr) cache of | 66 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of |
63 | Nothing -> do | 67 | Nothing -> do |
64 | proto <- getProtocolNumber "tcp" | 68 | proto <- getProtocolNumber "tcp" |
65 | sock <- socket (socketFamily addr) Stream proto | 69 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
66 | connect sock addr `catchIOError` (\e -> close sock) | 70 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
67 | h <- socketToHandle sock ReadWriteMode | 71 | h <- socketToHandle sock ReadWriteMode |
68 | st <- streamHello stream h | 72 | st <- streamHello stream addr h |
69 | t <- getPOSIXTime | 73 | t <- getPOSIXTime |
70 | mh <- newMVar h | 74 | mh <- newMVar h |
71 | rthread <- forkIO $ fix $ \loop -> do | 75 | rthread <- forkIO $ fix $ \loop -> do |
72 | x <- streamDecode stream st h | 76 | x <- streamDecode st h |
73 | putMVar mvar $ fmap (\u -> Right (u, addr)) x | 77 | putMVar mvar $ fmap (\u -> Right (u, addr)) x |
74 | case x of | 78 | case x of |
75 | Just _ -> loop | 79 | Just _ -> loop |
76 | Nothing -> do | 80 | Nothing -> do |
77 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress addr) | 81 | atomically $ modifyTVar' (lru tcpcache) |
82 | $ MM.delete (TCPAddress $ streamAddr stream addr) | ||
78 | hClose h | 83 | hClose h |
79 | labelThread rthread ("tcp:"++show addr) | 84 | let showAddr a = show (streamAddr stream a) |
85 | labelThread rthread ("tcp:"++showAddr addr) | ||
80 | let v = TCPSession | 86 | let v = TCPSession |
81 | { tcpHandle = mh | 87 | { tcpHandle = mh |
82 | , tcpState = st | 88 | , tcpState = st |
83 | , tcpThread = rthread | 89 | , tcpThread = rthread |
84 | } | 90 | } |
85 | let (retires,cache') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress addr) v t cache | 91 | let (retires,cache') = MM.takeView (tcpMax tcpcache) |
92 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache | ||
86 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 93 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
87 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 94 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
88 | killThread (tcpThread r) | 95 | killThread (tcpThread r) |
89 | h <- takeMVar (tcpHandle r) | 96 | h <- takeMVar (tcpHandle r) |
90 | streamGoodbye stream st h | 97 | streamGoodbye st h |
91 | hClose h | 98 | hClose h |
92 | atomically $ writeTVar (lru tcpcache) cache' | 99 | atomically $ writeTVar (lru tcpcache) cache' |
93 | 100 | ||
94 | return $ Just $ \y -> do | 101 | return $ Just $ \y -> do |
95 | bs <- streamEncode stream st y | 102 | bs <- streamEncode st y |
96 | withMVar mh (`hPut` bs) | 103 | withMVar mh (`hPut` bs) |
97 | Just (tm,v) -> do | 104 | Just (tm,v) -> do |
98 | t <- getPOSIXTime | 105 | t <- getPOSIXTime |
99 | let TCPSession { tcpHandle = mh, tcpState = st } = v | 106 | let TCPSession { tcpHandle = mh, tcpState = st } = v |
100 | cache' = MM.insert' (TCPAddress addr) v t cache | 107 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache |
101 | atomically $ writeTVar (lru tcpcache) cache' | 108 | atomically $ writeTVar (lru tcpcache) cache' |
102 | return $ Just $ \y -> do | 109 | return $ Just $ \y -> do |
103 | bs <- streamEncode stream st y | 110 | bs <- streamEncode st y |
104 | withMVar mh (`hPut` bs) | 111 | withMVar mh (`hPut` bs) |
105 | 112 | ||
106 | closeAll :: TCPCache st -> StreamTransform st x y -> IO () | 113 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
107 | closeAll tcpcache stream = do | 114 | closeAll tcpcache stream = do |
108 | cache <- atomically $ readTVar (lru tcpcache) | 115 | cache <- atomically $ readTVar (lru tcpcache) |
109 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 116 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
110 | let st = tcpState r | 117 | let st = tcpState r |
111 | killThread (tcpThread r) | 118 | killThread (tcpThread r) |
112 | h <- takeMVar $ tcpHandle r | 119 | h <- takeMVar $ tcpHandle r |
113 | streamGoodbye stream st h | 120 | streamGoodbye st h |
114 | hClose h | 121 | hClose h |
115 | 122 | ||
116 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 123 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
117 | -> StreamTransform st x y | 124 | -> StreamHandshake addr x y |
118 | -> IO (TransportA err SockAddr x y) | 125 | -> IO (TransportA err addr x y) |
119 | tcpTransport maxcon stream = do | 126 | tcpTransport maxcon stream = do |
120 | msgvar <- newEmptyMVar | 127 | msgvar <- newEmptyMVar |
121 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 128 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs new file mode 100644 index 00000000..0780f121 --- /dev/null +++ b/src/Network/Tox/TCP.hs | |||
@@ -0,0 +1,85 @@ | |||
1 | {-# LANGUAGE RecursiveDo #-} | ||
2 | {-# LANGUAGE PartialTypeSignatures #-} | ||
3 | {-# LANGUAGE LambdaCase #-} | ||
4 | {-# LANGUAGE FlexibleContexts #-} | ||
5 | module Network.Tox.TCP where | ||
6 | |||
7 | import Control.Concurrent | ||
8 | import Control.Concurrent.STM | ||
9 | import Data.Functor.Identity | ||
10 | import Data.Serialize | ||
11 | import System.IO (Handle) | ||
12 | |||
13 | import Crypto.Tox | ||
14 | import Data.ByteString (hPut,hGet,ByteString) | ||
15 | import Data.Tox.Relay | ||
16 | import Network.Address (setPort,PortNumber,SockAddr) | ||
17 | import Network.QueryResponse | ||
18 | import Network.QueryResponse.TCP | ||
19 | import qualified Network.Tox.NodeId as UDP | ||
20 | |||
21 | |||
22 | withSize :: Sized x => (Size x -> m (p x)) -> m (p x) | ||
23 | withSize f = case size of len -> f len | ||
24 | |||
25 | data NodeInfo = NodeInfo | ||
26 | { udpNodeInfo :: UDP.NodeInfo | ||
27 | , tcpPort :: PortNumber | ||
28 | } | ||
29 | |||
30 | type NodeId = UDP.NodeId | ||
31 | |||
32 | nodeId :: NodeInfo -> NodeId | ||
33 | nodeId ni = UDP.nodeId $ udpNodeInfo ni | ||
34 | |||
35 | nodeAddr :: NodeInfo -> SockAddr | ||
36 | nodeAddr ni = setPort (tcpPort ni) $ UDP.nodeAddr $ udpNodeInfo ni | ||
37 | |||
38 | tcpStream :: (Serialize y, Sized y, Serialize x, Sized x) => | ||
39 | TransportCrypto -> StreamHandshake NodeInfo x y | ||
40 | tcpStream crypto = StreamHandshake | ||
41 | { streamHello = \addr h -> do | ||
42 | (skey, hello) <- atomically $ do | ||
43 | n24 <- transportNewNonce crypto | ||
44 | skey <- transportNewKey crypto | ||
45 | base24 <- transportNewNonce crypto | ||
46 | return $ (,) skey $ Hello $ Asymm | ||
47 | { senderKey = transportPublic crypto | ||
48 | , asymmNonce = n24 | ||
49 | , asymmData = pure HelloData | ||
50 | { sessionPublicKey = toPublic $ skey | ||
51 | , sessionBaseNonce = base24 | ||
52 | } | ||
53 | } | ||
54 | noncef <- lookupNonceFunction crypto (transportSecret crypto) (UDP.id2key $ nodeId addr) | ||
55 | hPut h $ encode $ encryptPayload (noncef $ helloNonce hello) hello | ||
56 | welcomeE <- withSize $ fmap decode . hGet h . constSize | ||
57 | let Right welcome = welcomeE >>= \w -> decryptPayload (noncef $ welcomeNonce w) w | ||
58 | noncef' <- lookupNonceFunction crypto skey (sessionPublicKey $ runIdentity $ welcomeData welcome) | ||
59 | nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello) | ||
60 | nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) | ||
61 | let them = sessionPublicKey $ runIdentity $ welcomeData welcome | ||
62 | return SessionProtocol | ||
63 | { streamGoodbye = \h -> return () -- No goodbye packet? Seems rude. | ||
64 | , streamDecode = \h -> do | ||
65 | decode <$> hGet h 2 >>= \case | ||
66 | Left _ -> return Nothing | ||
67 | Right len -> do | ||
68 | decode <$> hGet h len >>= \case | ||
69 | Left _ -> return Nothing | ||
70 | Right x -> do | ||
71 | n24 <- takeMVar nread | ||
72 | let r = decrypt (noncef' n24) x >>= decodePlain | ||
73 | putMVar nread (incrementNonce24 n24) | ||
74 | return $ either (const Nothing) Just r | ||
75 | , streamEncode = \y -> do | ||
76 | n24 <- takeMVar nsend | ||
77 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | ||
78 | putMVar nsend (incrementNonce24 n24) | ||
79 | return bs -- XXX: Should we wait until this bytestring is sent before putting the nonce back in the MVar? | ||
80 | } | ||
81 | , streamAddr = nodeAddr | ||
82 | } | ||
83 | |||
84 | toxTCP :: TransportCrypto -> IO (Transport err NodeInfo RelayPacket) | ||
85 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) | ||