summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-12-02 15:53:35 -0500
committerJoe Crayne <joe@jerkface.net>2018-12-16 14:08:26 -0500
commite64a0efd2ca29257c189343b6dc75f6bee29d66d (patch)
tree3cdef723ca371c7a0be5a851a9d0022d7cfc6fab
parente15b8709e5091808a50630372f278fcbd844d638 (diff)
Tox TCP client transport.
-rw-r--r--src/Crypto/Tox.hs2
-rw-r--r--src/Network/QueryResponse/TCP.hs57
-rw-r--r--src/Network/Tox/TCP.hs85
3 files changed, 118 insertions, 26 deletions
diff --git a/src/Crypto/Tox.hs b/src/Crypto/Tox.hs
index a7f1ae83..1b3d5e5c 100644
--- a/src/Crypto/Tox.hs
+++ b/src/Crypto/Tox.hs
@@ -169,7 +169,7 @@ authAndBytes (Encrypted bs) = (auth,bs')
169-- independently of the value, or the length depends on the value. 169-- independently of the value, or the length depends on the value.
170data Size a 170data Size a
171 = VarSize (a -> Int) 171 = VarSize (a -> Int)
172 | ConstSize !Int 172 | ConstSize { constSize :: !Int }
173 deriving Typeable 173 deriving Typeable
174 174
175instance Contravariant Size where 175instance Contravariant Size where
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 83ae367f..7efe6966 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -45,77 +45,84 @@ data TCPCache st = TCPCache
45 , tcpMax :: Int 45 , tcpMax :: Int
46 } 46 }
47 47
48data StreamTransform st x y = StreamTransform 48data SessionProtocol x y = SessionProtocol
49 { streamHello :: Handle -> IO st -- ^ "Hello" protocol upon fresh connection. 49 { streamGoodbye :: Handle -> IO () -- ^ "Goodbye" protocol upon termination.
50 , streamGoodbye :: st -> Handle -> IO () -- ^ "Goodbye" protocol upon termination. 50 , streamDecode :: Handle -> IO (Maybe x) -- ^ Parse inbound messages.
51 , streamDecode :: st -> Handle -> IO (Maybe x) -- ^ Parse inbound messages. 51 , streamEncode :: y -> IO ByteString -- ^ Serialize outbound messages.
52 , streamEncode :: st -> y -> IO ByteString -- ^ Serialize outbound messages.
53 } 52 }
54 53
55acquireConnection :: MVar (Maybe (Either a (x, SockAddr))) 54data StreamHandshake addr x y = StreamHandshake
56 -> TCPCache st 55 { streamHello :: addr -> Handle -> IO (SessionProtocol x y) -- ^ "Hello" protocol upon fresh connection.
57 -> StreamTransform st x y 56 , streamAddr :: addr -> SockAddr
58 -> SockAddr 57 }
58
59acquireConnection :: MVar (Maybe (Either a (x, addr)))
60 -> TCPCache (SessionProtocol x y)
61 -> StreamHandshake addr x y
62 -> addr
59 -> IO (Maybe (y -> IO ())) 63 -> IO (Maybe (y -> IO ()))
60acquireConnection mvar tcpcache stream addr = do 64acquireConnection mvar tcpcache stream addr = do
61 cache <- atomically $ readTVar (lru tcpcache) 65 cache <- atomically $ readTVar (lru tcpcache)
62 case MM.lookup' (TCPAddress addr) cache of 66 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of
63 Nothing -> do 67 Nothing -> do
64 proto <- getProtocolNumber "tcp" 68 proto <- getProtocolNumber "tcp"
65 sock <- socket (socketFamily addr) Stream proto 69 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
66 connect sock addr `catchIOError` (\e -> close sock) 70 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
67 h <- socketToHandle sock ReadWriteMode 71 h <- socketToHandle sock ReadWriteMode
68 st <- streamHello stream h 72 st <- streamHello stream addr h
69 t <- getPOSIXTime 73 t <- getPOSIXTime
70 mh <- newMVar h 74 mh <- newMVar h
71 rthread <- forkIO $ fix $ \loop -> do 75 rthread <- forkIO $ fix $ \loop -> do
72 x <- streamDecode stream st h 76 x <- streamDecode st h
73 putMVar mvar $ fmap (\u -> Right (u, addr)) x 77 putMVar mvar $ fmap (\u -> Right (u, addr)) x
74 case x of 78 case x of
75 Just _ -> loop 79 Just _ -> loop
76 Nothing -> do 80 Nothing -> do
77 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress addr) 81 atomically $ modifyTVar' (lru tcpcache)
82 $ MM.delete (TCPAddress $ streamAddr stream addr)
78 hClose h 83 hClose h
79 labelThread rthread ("tcp:"++show addr) 84 let showAddr a = show (streamAddr stream a)
85 labelThread rthread ("tcp:"++showAddr addr)
80 let v = TCPSession 86 let v = TCPSession
81 { tcpHandle = mh 87 { tcpHandle = mh
82 , tcpState = st 88 , tcpState = st
83 , tcpThread = rthread 89 , tcpThread = rthread
84 } 90 }
85 let (retires,cache') = MM.takeView (tcpMax tcpcache) $ MM.insert' (TCPAddress addr) v t cache 91 let (retires,cache') = MM.takeView (tcpMax tcpcache)
92 $ MM.insert' (TCPAddress $ streamAddr stream addr) v t cache
86 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 93 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
87 myThreadId >>= flip labelThread ("tcp-close:"++show k) 94 myThreadId >>= flip labelThread ("tcp-close:"++show k)
88 killThread (tcpThread r) 95 killThread (tcpThread r)
89 h <- takeMVar (tcpHandle r) 96 h <- takeMVar (tcpHandle r)
90 streamGoodbye stream st h 97 streamGoodbye st h
91 hClose h 98 hClose h
92 atomically $ writeTVar (lru tcpcache) cache' 99 atomically $ writeTVar (lru tcpcache) cache'
93 100
94 return $ Just $ \y -> do 101 return $ Just $ \y -> do
95 bs <- streamEncode stream st y 102 bs <- streamEncode st y
96 withMVar mh (`hPut` bs) 103 withMVar mh (`hPut` bs)
97 Just (tm,v) -> do 104 Just (tm,v) -> do
98 t <- getPOSIXTime 105 t <- getPOSIXTime
99 let TCPSession { tcpHandle = mh, tcpState = st } = v 106 let TCPSession { tcpHandle = mh, tcpState = st } = v
100 cache' = MM.insert' (TCPAddress addr) v t cache 107 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v t cache
101 atomically $ writeTVar (lru tcpcache) cache' 108 atomically $ writeTVar (lru tcpcache) cache'
102 return $ Just $ \y -> do 109 return $ Just $ \y -> do
103 bs <- streamEncode stream st y 110 bs <- streamEncode st y
104 withMVar mh (`hPut` bs) 111 withMVar mh (`hPut` bs)
105 112
106closeAll :: TCPCache st -> StreamTransform st x y -> IO () 113closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
107closeAll tcpcache stream = do 114closeAll tcpcache stream = do
108 cache <- atomically $ readTVar (lru tcpcache) 115 cache <- atomically $ readTVar (lru tcpcache)
109 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 116 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
110 let st = tcpState r 117 let st = tcpState r
111 killThread (tcpThread r) 118 killThread (tcpThread r)
112 h <- takeMVar $ tcpHandle r 119 h <- takeMVar $ tcpHandle r
113 streamGoodbye stream st h 120 streamGoodbye st h
114 hClose h 121 hClose h
115 122
116tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 123tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
117 -> StreamTransform st x y 124 -> StreamHandshake addr x y
118 -> IO (TransportA err SockAddr x y) 125 -> IO (TransportA err addr x y)
119tcpTransport maxcon stream = do 126tcpTransport maxcon stream = do
120 msgvar <- newEmptyMVar 127 msgvar <- newEmptyMVar
121 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 128 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs
new file mode 100644
index 00000000..0780f121
--- /dev/null
+++ b/src/Network/Tox/TCP.hs
@@ -0,0 +1,85 @@
1{-# LANGUAGE RecursiveDo #-}
2{-# LANGUAGE PartialTypeSignatures #-}
3{-# LANGUAGE LambdaCase #-}
4{-# LANGUAGE FlexibleContexts #-}
5module Network.Tox.TCP where
6
7import Control.Concurrent
8import Control.Concurrent.STM
9import Data.Functor.Identity
10import Data.Serialize
11import System.IO (Handle)
12
13import Crypto.Tox
14import Data.ByteString (hPut,hGet,ByteString)
15import Data.Tox.Relay
16import Network.Address (setPort,PortNumber,SockAddr)
17import Network.QueryResponse
18import Network.QueryResponse.TCP
19import qualified Network.Tox.NodeId as UDP
20
21
22withSize :: Sized x => (Size x -> m (p x)) -> m (p x)
23withSize f = case size of len -> f len
24
25data NodeInfo = NodeInfo
26 { udpNodeInfo :: UDP.NodeInfo
27 , tcpPort :: PortNumber
28 }
29
30type NodeId = UDP.NodeId
31
32nodeId :: NodeInfo -> NodeId
33nodeId ni = UDP.nodeId $ udpNodeInfo ni
34
35nodeAddr :: NodeInfo -> SockAddr
36nodeAddr ni = setPort (tcpPort ni) $ UDP.nodeAddr $ udpNodeInfo ni
37
38tcpStream :: (Serialize y, Sized y, Serialize x, Sized x) =>
39 TransportCrypto -> StreamHandshake NodeInfo x y
40tcpStream crypto = StreamHandshake
41 { streamHello = \addr h -> do
42 (skey, hello) <- atomically $ do
43 n24 <- transportNewNonce crypto
44 skey <- transportNewKey crypto
45 base24 <- transportNewNonce crypto
46 return $ (,) skey $ Hello $ Asymm
47 { senderKey = transportPublic crypto
48 , asymmNonce = n24
49 , asymmData = pure HelloData
50 { sessionPublicKey = toPublic $ skey
51 , sessionBaseNonce = base24
52 }
53 }
54 noncef <- lookupNonceFunction crypto (transportSecret crypto) (UDP.id2key $ nodeId addr)
55 hPut h $ encode $ encryptPayload (noncef $ helloNonce hello) hello
56 welcomeE <- withSize $ fmap decode . hGet h . constSize
57 let Right welcome = welcomeE >>= \w -> decryptPayload (noncef $ welcomeNonce w) w
58 noncef' <- lookupNonceFunction crypto skey (sessionPublicKey $ runIdentity $ welcomeData welcome)
59 nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello)
60 nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome)
61 let them = sessionPublicKey $ runIdentity $ welcomeData welcome
62 return SessionProtocol
63 { streamGoodbye = \h -> return () -- No goodbye packet? Seems rude.
64 , streamDecode = \h -> do
65 decode <$> hGet h 2 >>= \case
66 Left _ -> return Nothing
67 Right len -> do
68 decode <$> hGet h len >>= \case
69 Left _ -> return Nothing
70 Right x -> do
71 n24 <- takeMVar nread
72 let r = decrypt (noncef' n24) x >>= decodePlain
73 putMVar nread (incrementNonce24 n24)
74 return $ either (const Nothing) Just r
75 , streamEncode = \y -> do
76 n24 <- takeMVar nsend
77 let bs = encode $ encrypt (noncef' n24) $ encodePlain y
78 putMVar nsend (incrementNonce24 n24)
79 return bs -- XXX: Should we wait until this bytestring is sent before putting the nonce back in the MVar?
80 }
81 , streamAddr = nodeAddr
82 }
83
84toxTCP :: TransportCrypto -> IO (Transport err NodeInfo RelayPacket)
85toxTCP crypto = tcpTransport 30 (tcpStream crypto)