summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-12-17 00:47:42 -0500
committerJoe Crayne <joe@jerkface.net>2018-12-17 00:47:42 -0500
commit37eb345d1ffb40e814766b6df8134ca21e6987a7 (patch)
treeb915748ce6806af9a88ce1115cedcdb7bdaa22ba /src/Network
parent7dcd6ce16ce480791e4207e2c6d554a76fe75d6e (diff)
tcp: this seems to work.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/QueryResponse/TCP.hs147
-rw-r--r--src/Network/Tox/TCP.hs38
2 files changed, 117 insertions, 68 deletions
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index efeab305..a606c51d 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -14,6 +14,7 @@ import Control.Monad
14import Data.ByteString (ByteString,hPut) 14import Data.ByteString (ByteString,hPut)
15import Data.Function 15import Data.Function
16import Data.Hashable 16import Data.Hashable
17import Data.Maybe
17import Data.Ord 18import Data.Ord
18import Data.Time.Clock.POSIX 19import Data.Time.Clock.POSIX
19import Data.Word 20import Data.Word
@@ -29,11 +30,13 @@ import Connection.Tcp (socketFamily)
29import qualified Data.MinMaxPSQ as MM 30import qualified Data.MinMaxPSQ as MM
30import Network.QueryResponse 31import Network.QueryResponse
31 32
32data TCPSession st = TCPSession 33data TCPSession st
33 { tcpHandle :: Handle 34 = PendingTCPSession
34 , tcpState :: st 35 | TCPSession
35 , tcpThread :: ThreadId 36 { tcpHandle :: Handle
36 } 37 , tcpState :: st
38 , tcpThread :: ThreadId
39 }
37 40
38newtype TCPAddress = TCPAddress SockAddr 41newtype TCPAddress = TCPAddress SockAddr
39 deriving (Eq,Ord) 42 deriving (Eq,Ord)
@@ -60,6 +63,10 @@ data StreamHandshake addr x y = StreamHandshake
60 , streamAddr :: addr -> SockAddr 63 , streamAddr :: addr -> SockAddr
61 } 64 }
62 65
66killSession :: TCPSession st -> IO ()
67killSession PendingTCPSession = return ()
68killSession TCPSession{tcpThread=t} = killThread t
69
63acquireConnection :: MVar (Maybe (Either a (x, addr))) 70acquireConnection :: MVar (Maybe (Either a (x, addr)))
64 -> TCPCache (SessionProtocol x y) 71 -> TCPCache (SessionProtocol x y)
65 -> StreamHandshake addr x y 72 -> StreamHandshake addr x y
@@ -67,65 +74,95 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
67 -> Bool 74 -> Bool
68 -> IO (Maybe (y -> IO ())) 75 -> IO (Maybe (y -> IO ()))
69acquireConnection mvar tcpcache stream addr bDoCon = do 76acquireConnection mvar tcpcache stream addr bDoCon = do
70 cache <- atomically $ readTVar (lru tcpcache) 77 now <- getPOSIXTime
71 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of 78 entry <- atomically $ do
79 c <- readTVar (lru tcpcache)
80 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
81 case v of
82 Nothing | bDoCon -> writeTVar (lru tcpcache)
83 $ MM.insert' (TCPAddress $ streamAddr stream addr) PendingTCPSession (Down now) c
84 | otherwise -> return ()
85 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now)
86 return v
87 case entry of
72 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 88 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
73 proto <- getProtocolNumber "tcp" 89 proto <- getProtocolNumber "tcp"
74 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto 90 mh <- catchIOError (do h <- timeout 10000000 $ do
75 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) 91 sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
76 h <- socketToHandle sock ReadWriteMode 92 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
77 return $ Just h) 93 h <- socketToHandle sock ReadWriteMode
94 hSetBuffering h NoBuffering
95 return h
96 return h)
78 $ \e -> return Nothing 97 $ \e -> return Nothing
79 fmap join $ forM mh $ \h -> do 98 ret <- fmap join $ forM mh $ \h -> do
80 st <- streamHello stream addr h 99 st <- streamHello stream addr h
81 t <- getPOSIXTime 100 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
82 rthread <- forkIO $ fix $ \loop -> do 101 rthread <- forkIO $ fix $ \loop -> do
83 x <- streamDecode st 102 x <- streamDecode st
84 case x of 103 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
85 Just u -> do 104 case x of
86 timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) 105 Just u -> do
87 loop 106 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
88 Nothing -> do 107 when (isNothing m) $ do
89 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) 108 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet."
90 atomically $ modifyTVar' (lru tcpcache) 109 loop
91 $ MM.delete (TCPAddress $ streamAddr stream addr) 110 Nothing -> do
92 hClose h 111 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
93 let showAddr a = show (streamAddr stream a) 112 atomically $ modifyTVar' (lru tcpcache)
94 labelThread rthread ("tcp:"++showAddr addr) 113 $ MM.delete (TCPAddress $ streamAddr stream addr)
95 let v = TCPSession 114 c <- atomically $ readTVar (lru tcpcache)
96 { tcpHandle = h 115 now <- getPOSIXTime
97 , tcpState = st 116 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
98 , tcpThread = rthread 117 let stat = case r of
99 } 118 PendingTCPSession -> "pending."
100 retires <- atomically $ do 119 TCPSession {} -> "established."
101 c <- readTVar (lru tcpcache) 120 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat]
102 let (rs,c') = MM.takeView (tcpMax tcpcache) 121 hClose h
103 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c 122 let showAddr a = show (streamAddr stream a)
104 writeTVar (lru tcpcache) c' 123 labelThread rthread ("tcp:"++showAddr addr)
105 return rs 124 let v = TCPSession
106 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 125 { tcpHandle = h
107 myThreadId >>= flip labelThread ("tcp-close:"++show k) 126 , tcpState = st
108 dput XTCP $ "TCP dropped: " ++ show k 127 , tcpThread = rthread
109 killThread (tcpThread r) 128 }
110 streamGoodbye st 129 t <- getPOSIXTime
111 hClose (tcpHandle r) 130 retires <- atomically $ do
131 c <- readTVar (lru tcpcache)
132 let (rs,c') = MM.takeView (tcpMax tcpcache)
133 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
134 writeTVar (lru tcpcache) c'
135 return rs
136 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
137 myThreadId >>= flip labelThread ("tcp-close:"++show k)
138 dput XTCP $ "TCP dropped: " ++ show k
139 killSession r
140 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
141 streamGoodbye st
142 hClose h
143 _ -> return ()
112 144
113 return $ Just $ streamEncode st 145 return $ Just $ streamEncode st
114 Just (tm,v) -> do 146 when (isNothing ret) $ do
115 t <- getPOSIXTime 147 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
116 let TCPSession { tcpHandle = h, tcpState = st } = v 148 return ret
117 cache' = MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache 149 Just (tm, PendingTCPSession) -> do
118 atomically $ writeTVar (lru tcpcache) cache' 150 fmap join $ timeout 10000000 $ atomically $ do
119 return $ Just $ streamEncode st 151 c <- readTVar (lru tcpcache)
152 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
153 case v of
154 Just (_,TCPSession{tcpState=st}) -> return $ Just $ streamEncode st
155 Nothing -> return Nothing
156 _ -> retry
157 Just (tm, v@TCPSession {tcpState=st}) -> return $ Just $ streamEncode st
120 158
121closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO () 159closeAll :: TCPCache (SessionProtocol x y) -> StreamHandshake addr x y -> IO ()
122closeAll tcpcache stream = do 160closeAll tcpcache stream = do
123 cache <- atomically $ swapTVar (lru tcpcache) MM.empty 161 cache <- atomically $ swapTVar (lru tcpcache) MM.empty
124 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do 162 forM_ (MM.toList cache) $ \(MM.Binding (TCPAddress addr) r tm) -> do
125 let st = tcpState r 163 killSession r
126 killThread (tcpThread r) 164 case r of TCPSession{tcpState=st,tcpHandle=h} -> streamGoodbye st >> hClose h
127 streamGoodbye st 165 _ -> return ()
128 hClose (tcpHandle r)
129 166
130tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 167tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
131 -> StreamHandshake addr x y 168 -> StreamHandshake addr x y
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs
index 71050fe8..36200586 100644
--- a/src/Network/Tox/TCP.hs
+++ b/src/Network/Tox/TCP.hs
@@ -11,6 +11,7 @@ import Debug.Trace
11import Control.Arrow 11import Control.Arrow
12import Control.Concurrent 12import Control.Concurrent
13import Control.Concurrent.STM 13import Control.Concurrent.STM
14import Control.Exception
14import Control.Monad 15import Control.Monad
15import Crypto.Random 16import Crypto.Random
16import Data.Aeson (ToJSON(..),FromJSON(..)) 17import Data.Aeson (ToJSON(..),FromJSON(..))
@@ -92,7 +93,7 @@ tcpStream crypto = StreamHandshake
92 nil = SessionProtocol 93 nil = SessionProtocol
93 { streamGoodbye = return () 94 { streamGoodbye = return ()
94 , streamDecode = return Nothing 95 , streamDecode = return Nothing
95 , streamEncode = \y -> return () 96 , streamEncode = \y -> dput XTCP $ "TCP nil <-- " ++ show y
96 } 97 }
97 either (\_ -> return nil) id $ mwelcome <&> \welcome -> do 98 either (\_ -> return nil) id $ mwelcome <&> \welcome -> do
98 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show welcome 99 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show welcome
@@ -100,12 +101,13 @@ tcpStream crypto = StreamHandshake
100 nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello) 101 nsend <- newMVar (sessionBaseNonce $ runIdentity $ helloData hello)
101 nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome) 102 nread <- newMVar (sessionBaseNonce $ runIdentity $ welcomeData welcome)
102 let them = sessionPublicKey $ runIdentity $ welcomeData welcome 103 let them = sessionPublicKey $ runIdentity $ welcomeData welcome
104 hvar <- newMVar h
103 return SessionProtocol 105 return SessionProtocol
104 { streamGoodbye = do 106 { streamGoodbye = do
105 dput XTCP $ "Closing " ++ show addr 107 dput XTCP $ "Closing " ++ show addr
106 return () -- No goodbye packet? Seems rude. 108 return () -- No goodbye packet? Seems rude.
107 , streamDecode = 109 , streamDecode =
108 let go = decode <$> hGet h 2 >>= \case 110 let go h = decode <$> hGet h 2 >>= \case
109 Left e -> do 111 Left e -> do
110 dput XTCP $ "TCP: Failed to get length: " ++ e 112 dput XTCP $ "TCP: Failed to get length: " ++ e
111 return Nothing 113 return Nothing
@@ -115,25 +117,27 @@ tcpStream crypto = StreamHandshake
115 dput XTCP $ "TCP: Failed to decode packet." 117 dput XTCP $ "TCP: Failed to decode packet."
116 return Nothing 118 return Nothing
117 Right x -> do 119 Right x -> do
118 m24 <- timeout 100000 (takeMVar nread) 120 m24 <- timeout 1000000 (takeMVar nread)
119 fmap join $ forM m24 $ \n24 -> do 121 fmap join $ forM m24 $ \n24 -> do
120 let r = decrypt (noncef' n24) x >>= decodePlain 122 let r = decrypt (noncef' n24) x >>= decodePlain
121 putMVar nread (incrementNonce24 n24) 123 putMVar nread (incrementNonce24 n24)
122 either (dput XTCP) 124 either (dput XTCP . ("TCP decryption: " ++))
123 (\x' -> do 125 (\x' -> do
124 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' 126 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x'
125 return ()) 127 return ())
126 r 128 r
127 return $ either (const Nothing) Just r 129 return $ either (const Nothing) Just r
128 in go `catchIOError` \e -> do 130 in bracket (takeMVar hvar) (putMVar hvar)
129 dput XTCP $ "TCP exception: " ++ show e 131 $ \h -> go h `catchIOError` \e -> do
130 return Nothing 132 dput XTCP $ "TCP exception: " ++ show e
133 return Nothing
131 , streamEncode = \y -> do 134 , streamEncode = \y -> do
132 n24 <- takeMVar nsend 135 n24 <- takeMVar nsend
133 dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y 136 dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y
134 let bs = encode $ encrypt (noncef' n24) $ encodePlain y 137 let bs = encode $ encrypt (noncef' n24) $ encodePlain y
135 hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) 138 ($ h) -- bracket (takeMVar hvar) (putMVar hvar)
136 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e 139 $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs)
140 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e
137 putMVar nsend (incrementNonce24 n24) 141 putMVar nsend (incrementNonce24 n24)
138 } 142 }
139 , streamAddr = nodeAddr 143 , streamAddr = nodeAddr
@@ -154,7 +158,7 @@ nodeSearch tcp = Search
154 } 158 }
155-} 159-}
156 160
157data TCPClient err meth tid = TCPClient 161data TCPClient err tid = TCPClient
158 { tcpCrypto :: TransportCrypto 162 { tcpCrypto :: TransportCrypto
159 , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket) 163 , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket)
160 , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) 164 , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo)
@@ -181,10 +185,10 @@ getTCPNodes tcp seeking dst = do
181 return $ Just ts 185 return $ Just ts
182-} 186-}
183 187
184getUDPNodes :: TCPClient err () Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ())) 188getUDPNodes :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe ([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()))
185getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst 189getUDPNodes tcp seeking dst = fmap fst <$> getUDPNodes' tcp seeking dst
186 190
187getUDPNodes' :: TCPClient err () Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo)) 191getUDPNodes' :: TCPClient err Nonce8 -> NodeId -> UDP.NodeInfo -> IO (Maybe (([UDP.NodeInfo], [UDP.NodeInfo], Maybe ()), NodeInfo))
188getUDPNodes' tcp seeking dst0 = do 192getUDPNodes' tcp seeking dst0 = do
189 mgateway <- atomically $ tcpGetGateway tcp dst0 193 mgateway <- atomically $ tcpGetGateway tcp dst0
190 fmap join $ forM mgateway $ \gateway -> do 194 fmap join $ forM mgateway $ \gateway -> do
@@ -201,7 +205,14 @@ getUDPNodes' tcp seeking dst0 = do
201 wrap2 <- lookupNonceFunction (tcpCrypto tcp) b (UDP.id2key $ UDP.nodeId dst) 205 wrap2 <- lookupNonceFunction (tcpCrypto tcp) b (UDP.id2key $ UDP.nodeId dst)
202 wrap1 <- lookupNonceFunction (tcpCrypto tcp) c (UDP.id2key $ nodeId gateway) 206 wrap1 <- lookupNonceFunction (tcpCrypto tcp) c (UDP.id2key $ nodeId gateway)
203 wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) 207 wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst)
204 let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse) 208 let meth :: MethodSerializer
209 Nonce8
210 a -- NodeInfo
211 (Bool, RelayPacket)
212 PacketNumber
213 AnnounceRequest
214 (Either String AnnounceResponse)
215 meth = MethodSerializer
205 { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout 216 { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout
206 , method = OnionPacketID -- meth 217 , method = OnionPacketID -- meth
207 , wrapQuery = \n8 src gateway x -> (,) True $ 218 , wrapQuery = \n8 src gateway x -> (,) True $
@@ -271,6 +282,7 @@ newClient crypto store load = do
271 OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 282 OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8
272 OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o 283 OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o
273 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs 284 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs
285 wut -> IsUnknown (show wut)
274 , lookupHandler = \case 286 , lookupHandler = \case
275 PingPacket -> Just MethodHandler 287 PingPacket -> Just MethodHandler
276 { methodParse = \(_,RelayPing n8) -> Right () 288 { methodParse = \(_,RelayPing n8) -> Right ()