diff options
author | Joe Crayne <joe@jerkface.net> | 2018-12-17 00:47:42 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-17 00:47:42 -0500 |
commit | 37eb345d1ffb40e814766b6df8134ca21e6987a7 (patch) | |
tree | b915748ce6806af9a88ce1115cedcdb7bdaa22ba /src/Network | |
parent | 7dcd6ce16ce480791e4207e2c6d554a76fe75d6e (diff) |
tcp: this seems to work.
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 147 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 38 |
2 files changed, 117 insertions, 68 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index efeab305..a606c51d 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -14,6 +14,7 @@ import Control.Monad | |||
14 | import Data.ByteString (ByteString,hPut) | 14 | import Data.ByteString (ByteString,hPut) |
15 | import Data.Function | 15 | import Data.Function |
16 | import Data.Hashable | 16 | import Data.Hashable |
17 | import Data.Maybe | ||
17 | import Data.Ord | 18 | import Data.Ord |
18 | import Data.Time.Clock.POSIX | 19 | import Data.Time.Clock.POSIX |
19 | import Data.Word | 20 | import Data.Word |
@@ -29,11 +30,13 @@ import Connection.Tcp (socketFamily) | |||
29 | import qualified Data.MinMaxPSQ as MM | 30 | import qualified Data.MinMaxPSQ as MM |
30 | import Network.QueryResponse | 31 | import Network.QueryResponse |
31 | 32 | ||
32 | data TCPSession st = TCPSession | 33 | data TCPSession st |
33 | { tcpHandle :: Handle | 34 | = PendingTCPSession |
34 | , tcpState :: st | 35 | | TCPSession |
35 | , tcpThread :: ThreadId | 36 | { tcpHandle :: Handle |
36 | } | 37 | , tcpState :: st |
38 | , tcpThread :: ThreadId | ||
39 | } | ||
37 | 40 | ||
38 | newtype TCPAddress = TCPAddress SockAddr | 41 | newtype TCPAddress = TCPAddress SockAddr |
39 | deriving (Eq,Ord) | 42 | deriving (Eq,Ord) |
@@ -60,6 +63,10 @@ data StreamHandshake addr x y = StreamHandshake | |||
60 | , streamAddr :: addr -> SockAddr | 63 | , streamAddr :: addr -> SockAddr |
61 | } | 64 | } |
62 | 65 | ||
66 | killSession :: TCPSession st -> IO () | ||
67 | killSession PendingTCPSession = return () | ||
68 | killSession TCPSession{tcpThread=t} = killThread t | ||
69 | |||
63 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 70 | acquireConnection :: MVar (Maybe (Either a (x, addr))) |
64 | -> TCPCache (SessionProtocol x y) | 71 | -> TCPCache (SessionProtocol x y) |
65 | -> StreamHandshake addr x y | 72 | -> StreamHandshake addr x y |
@@ -67,65 +74,95 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
67 | -> Bool | 74 | -> Bool |
68 | -> IO (Maybe (y -> IO ())) | 75 | -> IO (Maybe (y -> IO ())) |
69 | acquireConnection mvar tcpcache stream addr bDoCon = do | 76 | acquireConnection mvar tcpcache stream addr bDoCon = do |
70 | cache <- atomically $ readTVar (lru tcpcache) | 77 | now <- getPOSIXTime |
71 | case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of | 78 | entry <- atomically $ do |
79 | c <- readTVar (lru tcpcache) | ||
80 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
81 | case v of | ||
82 | Nothing | bDoCon -> writeTVar (lru tcpcache) | ||
83 | $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c | ||
84 | | otherwise -> return () | ||
85 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | ||
86 | return v | ||
87 | case entry of | ||
72 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 88 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
73 | proto <- getProtocolNumber "tcp" | 89 | proto <- getProtocolNumber "tcp" |
74 | mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto | 90 | mh <- catchIOError (do h <- timeout 10000000 $ do |
75 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) | 91 | sock <- socket (socketFamily $ streamAddr stream addr) Stream proto |
76 | h <- socketToHandle sock ReadWriteMode | 92 | connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) |
77 | return $ Just h) | 93 | h <- socketToHandle sock ReadWriteMode |
94 | hSetBuffering h NoBuffering | ||
95 | return h | ||
96 | return h) | ||
78 | $ \e -> return Nothing | 97 | $ \e -> return Nothing |
79 | fmap join $ forM mh $ \h -> do | 98 | ret <- fmap join $ forM mh $ \h -> do |
80 | st <- streamHello stream addr h | 99 | st <- streamHello stream addr h |
81 | t <- getPOSIXTime | 100 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
82 | rthread <- forkIO $ fix $ \loop -> do | 101 | rthread <- forkIO $ fix $ \loop -> do |
83 | x <- streamDecode st | 102 | x <- streamDecode st |
84 | case x of | 103 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
85 | Just u -> do | 104 | case x of |
86 | timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | 105 | Just u -> do |
87 | loop | 106 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) |
88 | Nothing -> do | 107 | when (isNothing m) $ do |
89 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | 108 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." |
90 | atomically $ modifyTVar' (lru tcpcache) | 109 | loop |
91 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 110 | Nothing -> do |
92 | hClose h | 111 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) |
93 | let showAddr a = show (streamAddr stream a) | 112 | atomically $ modifyTVar' (lru tcpcache) |
94 | labelThread rthread ("tcp:"++showAddr addr) | 113 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
95 | let v = TCPSession | 114 | c <- atomically $ readTVar (lru tcpcache) |
96 | { tcpHandle = h | 115 | now <- getPOSIXTime |
97 | , tcpState = st | 116 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
98 | , tcpThread = rthread | 117 | let stat = case r of |
99 | } | 118 | PendingTCPSession -> "pending." |
100 | retires <- atomically $ do | 119 | TCPSession {} -> "established." |
101 | c <- readTVar (lru tcpcache) | 120 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] |
102 | let (rs,c') = MM.takeView (tcpMax tcpcache) | 121 | hClose h |
103 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | 122 | let showAddr a = show (streamAddr stream a) |
104 | writeTVar (lru tcpcache) c' | 123 | labelThread rthread ("tcp:"++showAddr addr) |
105 | return rs | 124 | let v = TCPSession |
106 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 125 | { tcpHandle = h |
107 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 126 | , tcpState = st |
108 | dput XTCP $ "TCP dropped: " ++ show k | 127 | , tcpThread = rthread |
109 | killThread (tcpThread r) | 128 | } |
110 | streamGoodbye st | 129 | t <- getPOSIXTime |
111 | hClose (tcpHandle r) | 130 | retires <- atomically $ do |
131 | c <- readTVar (lru tcpcache) | ||
132 | let (rs,c') = MM.takeView (tcpMax tcpcache) | ||
133 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | ||
134 | writeTVar (lru tcpcache) c' | ||
135 | return rs | ||
136 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | ||
137 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | ||
138 | dput XTCP $ "TCP dropped: " ++ show k | ||
139 | killSession r | ||
140 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | ||
141 | streamGoodbye st | ||
142 | hClose h | ||
143 | _ -> return () | ||
112 | 144 | ||
113 | return $ Just $ streamEncode st | 145 | return $ Just $ streamEncode st |
114 | Just (tm,v) -> do | 146 | when (isNothing ret) $ do |
115 | t <- getPOSIXTime | 147 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) |
116 | let TCPSession { tcpHandle = h, tcpState = st } = v | 148 | return ret |
117 | cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache | 149 | Just (tm, PendingTCPSession) -> do |
118 | atomically $ writeTVar (lru tcpcache) cache' | 150 | fmap join $ timeout 10000000 $ atomically $ do |
119 | return $ Just $ streamEncode st | 151 | c <- readTVar (lru tcpcache) |
152 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | ||
153 | case v of | ||
154 | Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st | ||
155 | Nothing -> return Nothing | ||
156 | _ -> retry | ||
157 | Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st | ||
120 | 158 | ||
121 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () | 159 | closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () |
122 | closeAll tcpcache stream = do | 160 | closeAll tcpcache stream = do |
123 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty | 161 | cache <- atomically $ swapTVar (lru tcpcache) MM.empty |
124 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do | 162 | forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do |
125 | let st = tcpState r | 163 | killSession r |
126 | killThread (tcpThread r) | 164 | case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h |
127 | streamGoodbye st | 165 | _ -> return () |
128 | hClose (tcpHandle r) | ||
129 | 166 | ||
130 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 167 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
131 | -> StreamHandshake addr x y | 168 | -> StreamHandshake addr x y |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index 71050fe8..36200586 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -11,6 +11,7 @@ import Debug.Trace | |||
11 | import Control.Arrow | 11 | import Control.Arrow |
12 | import Control.Concurrent | 12 | import Control.Concurrent |
13 | import Control.Concurrent.STM | 13 | import Control.Concurrent.STM |
14 | import Control.Exception | ||
14 | import Control.Monad | 15 | import Control.Monad |
15 | import Crypto.Random | 16 | import Crypto.Random |
16 | import Data.Aeson (ToJSON(..),FromJSON(..)) | 17 | import Data.Aeson (ToJSON(..),FromJSON(..)) |
@@ -92,7 +93,7 @@ tcpStream crypto = StreamHandshake | |||
92 | nil = SessionProtocol | 93 | nil = SessionProtocol |
93 | { streamGoodbye = return () | 94 | { streamGoodbye = return () |
94 | , streamDecode = return Nothing | 95 | , streamDecode = return Nothing |
95 | , streamEncode = \y -> return () | 96 | , streamEncode = \y -> dput XTCP $ "TCP nil <-- " ++ show y |
96 | } | 97 | } |
97 | either (\_ -> return nil) id $ mwelcome <&> \welcome -> do | 98 | either (\_ -> return nil) id $ mwelcome <&> \welcome -> do |
98 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show welcome | 99 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show welcome |
@@ -100,12 +101,13 @@ tcpStream crypto = StreamHandshake | |||
100 | nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello) | 101 | nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello) |
101 | nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) | 102 | nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) |
102 | let them = sessionPublicKey $ runIdentity $ welcomeData welcome | 103 | let them = sessionPublicKey $ runIdentity $ welcomeData welcome |
104 | hvar <- newMVar h | ||
103 | return SessionProtocol | 105 | return SessionProtocol |
104 | { streamGoodbye = do | 106 | { streamGoodbye = do |
105 | dput XTCP $ "Closing " ++ show addr | 107 | dput XTCP $ "Closing " ++ show addr |
106 | return () -- No goodbye packet? Seems rude. | 108 | return () -- No goodbye packet? Seems rude. |
107 | , streamDecode = | 109 | , streamDecode = |
108 | let go = decode <$> hGet h 2 >>= \case | 110 | let go h = decode <$> hGet h 2 >>= \case |
109 | Left e -> do | 111 | Left e -> do |
110 | dput XTCP $ "TCP: Failed to get length: " ++ e | 112 | dput XTCP $ "TCP: Failed to get length: " ++ e |
111 | return Nothing | 113 | return Nothing |
@@ -115,25 +117,27 @@ tcpStream crypto = StreamHandshake | |||
115 | dput XTCP $ "TCP: Failed to decode packet." | 117 | dput XTCP $ "TCP: Failed to decode packet." |
116 | return Nothing | 118 | return Nothing |
117 | Right x -> do | 119 | Right x -> do |
118 | m24 <- timeout 100000 (takeMVar nread) | 120 | m24 <- timeout 1000000 (takeMVar nread) |
119 | fmap join $ forM m24 $ \n24 -> do | 121 | fmap join $ forM m24 $ \n24 -> do |
120 | let r = decrypt (noncef' n24) x >>= decodePlain | 122 | let r = decrypt (noncef' n24) x >>= decodePlain |
121 | putMVar nread (incrementNonce24 n24) | 123 | putMVar nread (incrementNonce24 n24) |
122 | either (dput XTCP) | 124 | either (dput XTCP . ("TCP decryption: " ++)) |
123 | (\x' -> do | 125 | (\x' -> do |
124 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' | 126 | dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' |
125 | return ()) | 127 | return ()) |
126 | r | 128 | r |
127 | return $ either (const Nothing) Just r | 129 | return $ either (const Nothing) Just r |
128 | in go `catchIOError` \e -> do | 130 | in bracket (takeMVar hvar) (putMVar hvar) |
129 | dput XTCP $ "TCP exception: " ++ show e | 131 | $ \h -> go h `catchIOError` \e -> do |
130 | return Nothing | 132 | dput XTCP $ "TCP exception: " ++ show e |
133 | return Nothing | ||
131 | , streamEncode = \y -> do | 134 | , streamEncode = \y -> do |
132 | n24 <- takeMVar nsend | 135 | n24 <- takeMVar nsend |
133 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y | 136 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y |
134 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | 137 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y |
135 | hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) | 138 | ($ h) -- bracket (takeMVar hvar) (putMVar hvar) |
136 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e | 139 | $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) |
140 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e | ||
137 | putMVar nsend (incrementNonce24 n24) | 141 | putMVar nsend (incrementNonce24 n24) |
138 | } | 142 | } |
139 | , streamAddr = nodeAddr | 143 | , streamAddr = nodeAddr |
@@ -154,7 +158,7 @@ nodeSearch tcp = Search | |||
154 | } | 158 | } |
155 | -} | 159 | -} |
156 | 160 | ||
157 | data TCPClient err meth tid = TCPClient | 161 | data TCPClient err tid = TCPClient |
158 | { tcpCrypto :: TransportCrypto | 162 | { tcpCrypto :: TransportCrypto |
159 | , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket) | 163 | , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket) |
160 | , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) | 164 | , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) |
@@ -181,10 +185,10 @@ getTCPNodes tcp seeking dst = do | |||
181 | return $ Just ts | 185 | return $ Just ts |
182 | -} | 186 | -} |
183 | 187 | ||
184 | getUDPNodes :: TCPClient err () Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) | 188 | getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) |
185 | getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst | 189 | getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst |
186 | 190 | ||
187 | getUDPNodes' :: TCPClient err () Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) | 191 | getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) |
188 | getUDPNodes' tcp seeking dst0 = do | 192 | getUDPNodes' tcp seeking dst0 = do |
189 | mgateway <- atomically $ tcpGetGateway tcp dst0 | 193 | mgateway <- atomically $ tcpGetGateway tcp dst0 |
190 | fmap join $ forM mgateway $ \gateway -> do | 194 | fmap join $ forM mgateway $ \gateway -> do |
@@ -201,7 +205,14 @@ getUDPNodes' tcp seeking dst0 = do | |||
201 | wrap2 <- lookupNonceFunction (tcpCrypto tcp) b (UDP.id2key $ UDP.nodeId dst) | 205 | wrap2 <- lookupNonceFunction (tcpCrypto tcp) b (UDP.id2key $ UDP.nodeId dst) |
202 | wrap1 <- lookupNonceFunction (tcpCrypto tcp) c (UDP.id2key $ nodeId gateway) | 206 | wrap1 <- lookupNonceFunction (tcpCrypto tcp) c (UDP.id2key $ nodeId gateway) |
203 | wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) | 207 | wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) |
204 | let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse) | 208 | let meth :: MethodSerializer |
209 | Nonce8 | ||
210 | a -- NodeInfo | ||
211 | (Bool, RelayPacket) | ||
212 | PacketNumber | ||
213 | AnnounceRequest | ||
214 | (Either String AnnounceResponse) | ||
215 | meth = MethodSerializer | ||
205 | { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout | 216 | { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout |
206 | , method = OnionPacketID -- meth | 217 | , method = OnionPacketID -- meth |
207 | , wrapQuery = \n8 src gateway x -> (,) True $ | 218 | , wrapQuery = \n8 src gateway x -> (,) True $ |
@@ -271,6 +282,7 @@ newClient crypto store load = do | |||
271 | OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 | 282 | OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 |
272 | OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o | 283 | OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o |
273 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs | 284 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs |
285 | wut -> IsUnknown (show wut) | ||
274 | , lookupHandler = \case | 286 | , lookupHandler = \case |
275 | PingPacket -> Just MethodHandler | 287 | PingPacket -> Just MethodHandler |
276 | { methodParse = \(_,RelayPing n8) -> Right () | 288 | { methodParse = \(_,RelayPing n8) -> Right () |