diff options
author | Joe Crayne <joe@jerkface.net> | 2018-11-30 01:58:43 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-16 14:08:26 -0500 |
commit | 59aa0062c15610015a6bce077be5da1d3ed34019 (patch) | |
tree | 19f397e4edec56e8c9aa9e3d008d3d1905ee466b /examples/toxrelay.hs | |
parent | 6ab923f538f0a090e09da37154d5ce0fbe657dac (diff) |
More work on TCP relay.
Diffstat (limited to 'examples/toxrelay.hs')
-rw-r--r-- | examples/toxrelay.hs | 173 |
1 files changed, 151 insertions, 22 deletions
diff --git a/examples/toxrelay.hs b/examples/toxrelay.hs index fdf0c011..f03605f9 100644 --- a/examples/toxrelay.hs +++ b/examples/toxrelay.hs | |||
@@ -1,20 +1,30 @@ | |||
1 | {-# LANGUAGE LambdaCase #-} | ||
2 | {-# LANGUAGE RecordWildCards #-} | ||
1 | {-# LANGUAGE ScopedTypeVariables #-} | 3 | {-# LANGUAGE ScopedTypeVariables #-} |
2 | 4 | ||
5 | import Control.Concurrent.MVar | ||
6 | import Control.Concurrent.STM | ||
3 | import Control.Exception | 7 | import Control.Exception |
4 | import Control.Monad | 8 | import Control.Monad |
5 | import Control.Concurrent.STM | ||
6 | import qualified Data.ByteString as B | 9 | import qualified Data.ByteString as B |
7 | import Data.Function | 10 | import Data.Function |
8 | import qualified Data.IntMap as IntMap | 11 | import Data.Functor.Identity |
9 | ;import Data.IntMap (IntMap) | 12 | import qualified Data.IntMap as IntMap |
13 | ;import Data.IntMap (IntMap) | ||
14 | import qualified Data.Map as Map | ||
15 | ;import Data.Map (Map) | ||
10 | import Data.Serialize | 16 | import Data.Serialize |
17 | import Data.Word | ||
11 | import System.IO | 18 | import System.IO |
12 | import Data.Functor.Identity | 19 | import System.IO.Error |
20 | import System.Timeout | ||
13 | 21 | ||
14 | import Crypto.Tox | 22 | import Crypto.Tox |
23 | import qualified Data.IntervalSet as IntSet | ||
24 | ;import Data.IntervalSet (IntSet) | ||
15 | import Data.Tox.Relay | 25 | import Data.Tox.Relay |
16 | import Network.StreamServer | ||
17 | import Network.Address (getBindAddress) | 26 | import Network.Address (getBindAddress) |
27 | import Network.StreamServer | ||
18 | import Network.Tox (newCrypto) | 28 | import Network.Tox (newCrypto) |
19 | 29 | ||
20 | 30 | ||
@@ -22,16 +32,49 @@ import Network.Tox (newCrypto) | |||
22 | hGetPrefixed :: Serialize a => Handle -> IO (Either String a) | 32 | hGetPrefixed :: Serialize a => Handle -> IO (Either String a) |
23 | hGetPrefixed h = do | 33 | hGetPrefixed h = do |
24 | mlen <- runGet getWord16be <$> B.hGet h 2 | 34 | mlen <- runGet getWord16be <$> B.hGet h 2 |
35 | -- We treat parse-fail the same as EOF. | ||
25 | fmap join $ forM mlen $ \len -> runGet get <$> B.hGet h (fromIntegral len) | 36 | fmap join $ forM mlen $ \len -> runGet get <$> B.hGet h (fromIntegral len) |
26 | 37 | ||
27 | hGetSized :: forall x. (Sized x, Serialize x) => Handle -> IO (Either String x) | 38 | hGetSized :: forall x. (Sized x, Serialize x) => Handle -> IO (Either String x) |
28 | hGetSized h = runGet get <$> B.hGet h len | 39 | hGetSized h = runGet get <$> B.hGet h len -- We treat parse-fail the same as EOF. |
29 | where | 40 | where |
30 | ConstSize len = size :: Size x | 41 | ConstSize len = size :: Size x |
31 | 42 | ||
32 | relaySession :: TransportCrypto -> TVar (IntMap Handle) -> (RelayPacket -> IO ()) -> sock -> Int -> Handle -> IO () | 43 | data RelaySession = RelaySession |
33 | relaySession crypto cons dispatch _ conid h = do | 44 | { indexPool :: IntSet -- ^ Ints that are either solicited or associated. |
34 | atomically $ modifyTVar' cons $ IntMap.insert conid h | 45 | , solicited :: Map PublicKey Int -- ^ Reserved ids, not yet in associated. |
46 | , associated :: IntMap (RelayPacket -> IO ()) -- ^ Peers this session is connected to. | ||
47 | } | ||
48 | |||
49 | freshSession :: RelaySession | ||
50 | freshSession = RelaySession | ||
51 | { indexPool = IntSet.empty | ||
52 | , solicited = Map.empty | ||
53 | , associated = IntMap.empty | ||
54 | } | ||
55 | |||
56 | disconnect :: TVar (Map PublicKey (RelayPacket -> IO (),TVar RelaySession)) | ||
57 | -> PublicKey | ||
58 | -> IO () | ||
59 | disconnect cons who = join $ atomically $ do | ||
60 | Map.lookup who <$> readTVar cons | ||
61 | >>= \case | ||
62 | Nothing -> return $ return () | ||
63 | Just (_,session) -> do | ||
64 | modifyTVar' cons $ Map.delete who | ||
65 | RelaySession { associated = cs } <- readTVar session | ||
66 | return $ let notifyPeer i send = (send (DisconnectNotification $ key2c i) >>) | ||
67 | in IntMap.foldrWithKey notifyPeer (return ()) cs | ||
68 | |||
69 | relaySession :: TransportCrypto | ||
70 | -> TVar (Map PublicKey (RelayPacket -> IO (),TVar RelaySession)) | ||
71 | -> sock | ||
72 | -> Int | ||
73 | -> Handle | ||
74 | -> IO () | ||
75 | relaySession crypto cons _ conid h = do | ||
76 | -- atomically $ modifyTVar' cons $ IntMap.insert conid h | ||
77 | |||
35 | -- mhello <- fmap (>>= \h -> decryptPayload (computeSharedSecret me (helloFrom h) (helloNonce h)) h) $ hGetSized h | 78 | -- mhello <- fmap (>>= \h -> decryptPayload (computeSharedSecret me (helloFrom h) (helloNonce h)) h) $ hGetSized h |
36 | 79 | ||
37 | (hGetSized h >>=) $ mapM_ $ \helloE -> do | 80 | (hGetSized h >>=) $ mapM_ $ \helloE -> do |
@@ -41,38 +84,124 @@ relaySession crypto cons dispatch _ conid h = do | |||
41 | 84 | ||
42 | noncef <- lookupNonceFunction crypto me them | 85 | noncef <- lookupNonceFunction crypto me them |
43 | let mhello = decryptPayload (noncef $ helloNonce helloE) helloE | 86 | let mhello = decryptPayload (noncef $ helloNonce helloE) helloE |
44 | |||
45 | forM_ mhello $ \hello -> do | 87 | forM_ mhello $ \hello -> do |
88 | let _ = hello :: Hello Identity | ||
46 | 89 | ||
47 | (me',welcome) <- atomically $ do | 90 | (me',welcome) <- atomically $ do |
48 | skey <- transportNewKey crypto | 91 | skey <- transportNewKey crypto |
49 | dta <- HelloData (toPublic skey) <$> transportNewNonce crypto | 92 | dta <- HelloData (toPublic skey) <$> transportNewNonce crypto |
50 | w24 <- transportNewNonce crypto | 93 | w24 <- transportNewNonce crypto |
51 | return (skey, Welcome w24 $ pure dta) | 94 | return (skey, Welcome w24 $ pure dta) |
95 | |||
52 | B.hPut h $ encode $ encryptPayload (noncef $ welcomeNonce welcome) welcome | 96 | B.hPut h $ encode $ encryptPayload (noncef $ welcomeNonce welcome) welcome |
53 | 97 | ||
54 | let them' = sessionPublicKey (runIdentity $ helloData hello) | 98 | noncef' <- let them' = sessionPublicKey (runIdentity $ helloData hello) |
55 | noncef' <- lookupNonceFunction crypto me' them' | 99 | in lookupNonceFunction crypto me' them' |
100 | |||
101 | sendPacket <- do | ||
102 | v <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) | ||
103 | return $ \p -> do | ||
104 | n24 <- takeMVar v | ||
105 | let bs = encode $ encrypt (noncef' n24) $ encodePlain (p :: RelayPacket) | ||
106 | do B.hPut h $ encode (fromIntegral (B.length bs) :: Word16) | ||
107 | B.hPut h bs | ||
108 | `catchIOError` \_ -> return () | ||
109 | putMVar v (incrementNonce24 n24) | ||
110 | |||
111 | let readPacket n24 = (>>= decrypt (noncef' n24) >=> decodePlain) <$> hGetPrefixed h | ||
112 | base = sessionBaseNonce $ runIdentity $ helloData hello | ||
113 | |||
114 | -- You get 3 seconds to send a session packet. | ||
115 | mpkt0 <- join <$> timeout 3000000 (either (const Nothing) Just <$> readPacket base) | ||
116 | forM_ mpkt0 $ \pkt0 -> do | ||
117 | |||
118 | disconnect cons (helloFrom hello) | ||
119 | session <- atomically $ do | ||
120 | session <- newTVar freshSession | ||
121 | modifyTVar' cons $ Map.insert (helloFrom hello) (sendPacket,session) | ||
122 | return session | ||
123 | |||
124 | handlePacket cons (helloFrom hello) sendPacket session pkt0 | ||
125 | |||
126 | flip fix (incrementNonce24 base) $ \loop n24 -> do | ||
127 | m <- readPacket n24 | ||
128 | forM_ m $ \p -> do | ||
129 | handlePacket cons (helloFrom hello) sendPacket session p | ||
130 | loop (incrementNonce24 n24) | ||
131 | `finally` | ||
132 | disconnect cons (helloFrom hello) | ||
133 | |||
134 | data R = R { routingRequest :: PublicKey -> IO ConId | ||
135 | , reply :: RelayPacket -> IO () | ||
136 | , routeOOB :: PublicKey -> IO (Maybe (RelayPacket -> IO ())) | ||
137 | } | ||
138 | |||
139 | handlePacket :: TVar (Map PublicKey (RelayPacket -> IO (), TVar RelaySession)) | ||
140 | -> PublicKey | ||
141 | -> (RelayPacket -> IO ()) | ||
142 | -> TVar RelaySession | ||
143 | -> RelayPacket | ||
144 | -> IO () | ||
145 | handlePacket cons me sendToMe session = \case | ||
146 | RoutingRequest them -> join $ atomically $ do | ||
147 | mySession <- readTVar session | ||
148 | mi <- case Map.lookup them (solicited mySession) of | ||
149 | Nothing -> fmap join $ forM (IntSet.nearestOutsider 0 (indexPool mySession)) $ \i -> do | ||
150 | if -120 <= i && i <= 119 | ||
151 | then do | ||
152 | writeTVar session mySession | ||
153 | { indexPool = IntSet.insert i (indexPool mySession) | ||
154 | , solicited = Map.insert them i (solicited mySession) | ||
155 | } | ||
156 | return $ Just i | ||
157 | else return Nothing -- No more slots available. | ||
158 | Just i -> return $ Just i | ||
159 | notifyConnect <- fmap (join . join) $ forM mi $ \i -> do | ||
160 | mp <- Map.lookup them <$> readTVar cons | ||
161 | forM mp $ \(sendToThem,peer) -> do | ||
162 | theirSession <- readTVar peer | ||
163 | forM (Map.lookup me $ solicited theirSession) $ \reserved_id -> do | ||
164 | writeTVar peer theirSession | ||
165 | { solicited = Map.delete me (solicited theirSession) | ||
166 | , associated = IntMap.insert reserved_id sendToMe (associated theirSession) | ||
167 | } | ||
168 | writeTVar session mySession | ||
169 | { solicited = Map.delete them (solicited mySession) | ||
170 | , associated = IntMap.insert i sendToThem (associated mySession) | ||
171 | } | ||
172 | return $ do sendToThem $ ConnectNotification (key2c reserved_id) | ||
173 | sendToMe $ ConnectNotification (key2c i) | ||
174 | return $ do sendToMe $ RoutingResponse (maybe badcon key2c mi) them | ||
175 | sequence_ notifyConnect | ||
56 | 176 | ||
57 | let _ = hello :: Hello Identity | 177 | RelayPing x -> sendToMe $ RelayPong x -- TODO x==0 is invalid. Do we care? |
58 | flip fix (sessionBaseNonce $ runIdentity $ helloData hello) $ \loop n24 -> do | 178 | |
59 | m <- (>>= decrypt (noncef' n24) >=> decodePlain) <$> hGetPrefixed h | 179 | OOBSend them bs -> do |
60 | forM_ m $ \p -> do | 180 | m <- atomically $ Map.lookup them <$> readTVar cons |
61 | dispatch p | 181 | forM_ m $ \(sendToThem,_) -> sendToThem $ OOBRecv me bs |
62 | loop (incrementNonce24 n24) | 182 | |
63 | `finally` | 183 | RelayData con bs -> join $ atomically $ do |
64 | atomically (modifyTVar' cons $ IntMap.delete conid) | 184 | -- Data: Data packets can only be sent and received if the |
185 | -- corresponding connection_id is connection (a Connect notification | ||
186 | -- has been received from it) if the server receives a Data packet for | ||
187 | -- a non connected or existent connection it will discard it. | ||
188 | mySession <- readTVar session | ||
189 | return $ sequence_ $ do | ||
190 | i <- c2key con | ||
191 | sendToThem <- IntMap.lookup i $ associated mySession | ||
192 | return $ sendToThem $ RelayData _todo bs | ||
65 | 193 | ||
194 | _ -> return () | ||
66 | 195 | ||
67 | 196 | ||
68 | main :: IO () | 197 | main :: IO () |
69 | main = do | 198 | main = do |
70 | crypto <- newCrypto | 199 | crypto <- newCrypto |
71 | cons <- newTVarIO IntMap.empty | 200 | cons <- newTVarIO Map.empty |
72 | a <- getBindAddress "33445" True | 201 | a <- getBindAddress "33445" True |
73 | h <- streamServer ServerConfig | 202 | h <- streamServer ServerConfig |
74 | { serverWarn = hPutStrLn stderr | 203 | { serverWarn = hPutStrLn stderr |
75 | , serverSession = relaySession crypto cons print | 204 | , serverSession = relaySession crypto cons |
76 | } | 205 | } |
77 | a | 206 | a |
78 | 207 | ||