summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Data/Tox/Relay.hs15
-rw-r--r--src/Network/QueryResponse/TCP.hs32
-rw-r--r--src/Network/Tox.hs2
-rw-r--r--src/Network/Tox/TCP.hs76
4 files changed, 76 insertions, 49 deletions
diff --git a/src/Data/Tox/Relay.hs b/src/Data/Tox/Relay.hs
index d1e9fb99..c563db8d 100644
--- a/src/Data/Tox/Relay.hs
+++ b/src/Data/Tox/Relay.hs
@@ -4,6 +4,7 @@
4{-# LANGUAGE GeneralizedNewtypeDeriving #-} 4{-# LANGUAGE GeneralizedNewtypeDeriving #-}
5{-# LANGUAGE KindSignatures #-} 5{-# LANGUAGE KindSignatures #-}
6{-# LANGUAGE MultiParamTypeClasses #-} 6{-# LANGUAGE MultiParamTypeClasses #-}
7{-# LANGUAGE PatternSynonyms #-}
7{-# LANGUAGE StandaloneDeriving #-} 8{-# LANGUAGE StandaloneDeriving #-}
8{-# LANGUAGE UndecidableInstances #-} 9{-# LANGUAGE UndecidableInstances #-}
9module Data.Tox.Relay where 10module Data.Tox.Relay where
@@ -61,9 +62,15 @@ data RelayPacket
61 | RelayData ByteString ConId 62 | RelayData ByteString ConId
62 deriving (Eq,Ord,Show,Data) 63 deriving (Eq,Ord,Show,Data)
63 64
64packetNumber :: RelayPacket -> Word8 65newtype PacketNumber = PacketNumber { packetNumberToWord8 :: Word8 }
65packetNumber (RelayData _ (ConId conid)) = conid -- 0 to 15 not allowed. 66 deriving (Eq,Ord,Show)
66packetNumber rp = fromIntegral $ pred $ constrIndex $ toConstr rp 67
68pattern PingPacket = PacketNumber 4
69pattern OnionPacketID = PacketNumber 8
70
71packetNumber :: RelayPacket -> PacketNumber
72packetNumber (RelayData _ (ConId conid)) = PacketNumber $ conid -- 0 to 15 not allowed.
73packetNumber rp = PacketNumber $ fromIntegral $ pred $ constrIndex $ toConstr rp
67 74
68instance Sized RelayPacket where 75instance Sized RelayPacket where
69 size = mappend (ConstSize 1) $ VarSize $ \x -> case x of 76 size = mappend (ConstSize 1) $ VarSize $ \x -> case x of
@@ -101,7 +108,7 @@ instance Serialize RelayPacket where
101 conid -> (`RelayData` ConId conid) <$> (remaining >>= getBytes) 108 conid -> (`RelayData` ConId conid) <$> (remaining >>= getBytes)
102 109
103 put rp = do 110 put rp = do
104 putWord8 $ packetNumber rp 111 putWord8 $ packetNumberToWord8 $ packetNumber rp
105 case rp of 112 case rp of
106 RoutingRequest k -> putPublicKey k 113 RoutingRequest k -> putPublicKey k
107 RoutingResponse rpid k -> put rpid >> putPublicKey k 114 RoutingResponse rpid k -> put rpid >> putPublicKey k
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index 328a19e1..efeab305 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -19,9 +19,12 @@ import Data.Time.Clock.POSIX
19import Data.Word 19import Data.Word
20import Network.BSD 20import Network.BSD
21import Network.Socket 21import Network.Socket
22import System.Timeout
22import System.IO 23import System.IO
23import System.IO.Error 24import System.IO.Error
24 25
26import DebugTag
27import DPut
25import Connection.Tcp (socketFamily) 28import Connection.Tcp (socketFamily)
26import qualified Data.MinMaxPSQ as MM 29import qualified Data.MinMaxPSQ as MM
27import Network.QueryResponse 30import Network.QueryResponse
@@ -61,11 +64,12 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
61 -> TCPCache (SessionProtocol x y) 64 -> TCPCache (SessionProtocol x y)
62 -> StreamHandshake addr x y 65 -> StreamHandshake addr x y
63 -> addr 66 -> addr
67 -> Bool
64 -> IO (Maybe (y -> IO ())) 68 -> IO (Maybe (y -> IO ()))
65acquireConnection mvar tcpcache stream addr = do 69acquireConnection mvar tcpcache stream addr bDoCon = do
66 cache <- atomically $ readTVar (lru tcpcache) 70 cache <- atomically $ readTVar (lru tcpcache)
67 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of 71 case MM.lookup' (TCPAddress $ streamAddr stream addr) cache of
68 Nothing -> do 72 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
69 proto <- getProtocolNumber "tcp" 73 proto <- getProtocolNumber "tcp"
70 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto 74 mh <- catchIOError (do sock <- socket (socketFamily $ streamAddr stream addr) Stream proto
71 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock) 75 connect sock (streamAddr stream addr) `catchIOError` (\e -> close sock)
@@ -77,10 +81,12 @@ acquireConnection mvar tcpcache stream addr = do
77 t <- getPOSIXTime 81 t <- getPOSIXTime
78 rthread <- forkIO $ fix $ \loop -> do 82 rthread <- forkIO $ fix $ \loop -> do
79 x <- streamDecode st 83 x <- streamDecode st
80 putMVar mvar $ fmap (\u -> Right (u, addr)) x
81 case x of 84 case x of
82 Just _ -> loop 85 Just u -> do
86 timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
87 loop
83 Nothing -> do 88 Nothing -> do
89 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
84 atomically $ modifyTVar' (lru tcpcache) 90 atomically $ modifyTVar' (lru tcpcache)
85 $ MM.delete (TCPAddress $ streamAddr stream addr) 91 $ MM.delete (TCPAddress $ streamAddr stream addr)
86 hClose h 92 hClose h
@@ -91,14 +97,18 @@ acquireConnection mvar tcpcache stream addr = do
91 , tcpState = st 97 , tcpState = st
92 , tcpThread = rthread 98 , tcpThread = rthread
93 } 99 }
94 let (retires,cache') = MM.takeView (tcpMax tcpcache) 100 retires <- atomically $ do
95 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) cache 101 c <- readTVar (lru tcpcache)
102 let (rs,c') = MM.takeView (tcpMax tcpcache)
103 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
104 writeTVar (lru tcpcache) c'
105 return rs
96 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 106 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
97 myThreadId >>= flip labelThread ("tcp-close:"++show k) 107 myThreadId >>= flip labelThread ("tcp-close:"++show k)
108 dput XTCP $ "TCP dropped: " ++ show k
98 killThread (tcpThread r) 109 killThread (tcpThread r)
99 streamGoodbye st 110 streamGoodbye st
100 hClose (tcpHandle r) 111 hClose (tcpHandle r)
101 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress k)
102 112
103 return $ Just $ streamEncode st 113 return $ Just $ streamEncode st
104 Just (tm,v) -> do 114 Just (tm,v) -> do
@@ -119,15 +129,15 @@ closeAll tcpcache stream = do
119 129
120tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 130tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
121 -> StreamHandshake addr x y 131 -> StreamHandshake addr x y
122 -> IO (TransportA err addr x y) 132 -> IO (TransportA err addr x (Bool,y))
123tcpTransport maxcon stream = do 133tcpTransport maxcon stream = do
124 msgvar <- newEmptyMVar 134 msgvar <- newEmptyMVar
125 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 135 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
126 return Transport 136 return Transport
127 { awaitMessage = (takeMVar msgvar >>=) 137 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
128 , sendMessage = \addr y -> do 138 , sendMessage = \addr (bDoCon,y) -> do
129 t <- forkIO $ do 139 t <- forkIO $ do
130 msock <- acquireConnection msgvar tcpcache stream addr 140 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
131 mapM_ ($ y) msock 141 mapM_ ($ y) msock
132 `catchIOError` \e -> return () 142 `catchIOError` \e -> return ()
133 labelThread t "tcp-send" 143 labelThread t "tcp-send"
diff --git a/src/Network/Tox.hs b/src/Network/Tox.hs
index d0304170..c14339e4 100644
--- a/src/Network/Tox.hs
+++ b/src/Network/Tox.hs
@@ -328,7 +328,7 @@ newToxOverTransport keydb addr onNewSession suppliedDHTKey udp tcp = do
328 (orouter,otbl) <- newOnionRouter crypto (dput XRoutes) 328 (orouter,otbl) <- newOnionRouter crypto (dput XRoutes)
329 (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes) 329 (cryptonet,dhtcrypt,onioncrypt,dtacrypt,handshakes)
330 <- toxTransport crypto orouter lookupClose udp 330 <- toxTransport crypto orouter lookupClose udp
331 (sendMessage (clientNet $ tcpClient $ tcpKademliaClient orouter)) 331 (\dst x -> sendMessage (clientNet $ tcpClient $ tcpKademliaClient orouter) dst (True,x))
332 tcp 332 tcp
333 sessions <- initSessions (sendMessage cryptonet) 333 sessions <- initSessions (sendMessage cryptonet)
334 334
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs
index a7881c24..71050fe8 100644
--- a/src/Network/Tox/TCP.hs
+++ b/src/Network/Tox/TCP.hs
@@ -7,6 +7,7 @@ module Network.Tox.TCP
7 , NodeInfo(..) 7 , NodeInfo(..)
8 ) where 8 ) where
9 9
10import Debug.Trace
10import Control.Arrow 11import Control.Arrow
11import Control.Concurrent 12import Control.Concurrent
12import Control.Concurrent.STM 13import Control.Concurrent.STM
@@ -27,6 +28,7 @@ import qualified Data.Vector as Vector
27import Network.Socket (SockAddr(..)) 28import Network.Socket (SockAddr(..))
28import qualified Text.ParserCombinators.ReadP as RP 29import qualified Text.ParserCombinators.ReadP as RP
29import System.IO.Error 30import System.IO.Error
31import System.Timeout
30 32
31import ControlMaybe 33import ControlMaybe
32import Crypto.Tox 34import Crypto.Tox
@@ -113,15 +115,16 @@ tcpStream crypto = StreamHandshake
113 dput XTCP $ "TCP: Failed to decode packet." 115 dput XTCP $ "TCP: Failed to decode packet."
114 return Nothing 116 return Nothing
115 Right x -> do 117 Right x -> do
116 n24 <- takeMVar nread 118 m24 <- timeout 100000 (takeMVar nread)
117 let r = decrypt (noncef' n24) x >>= decodePlain 119 fmap join $ forM m24 $ \n24 -> do
118 putMVar nread (incrementNonce24 n24) 120 let r = decrypt (noncef' n24) x >>= decodePlain
119 either (dput XTCP) 121 putMVar nread (incrementNonce24 n24)
120 (\x' -> do 122 either (dput XTCP)
121 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x' 123 (\x' -> do
122 return ()) 124 dput XTCP $ "TCP:" ++ show addr ++ " --> " ++ show x'
123 r 125 return ())
124 return $ either (const Nothing) Just r 126 r
127 return $ either (const Nothing) Just r
125 in go `catchIOError` \e -> do 128 in go `catchIOError` \e -> do
126 dput XTCP $ "TCP exception: " ++ show e 129 dput XTCP $ "TCP exception: " ++ show e
127 return Nothing 130 return Nothing
@@ -129,14 +132,14 @@ tcpStream crypto = StreamHandshake
129 n24 <- takeMVar nsend 132 n24 <- takeMVar nsend
130 dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y 133 dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y
131 let bs = encode $ encrypt (noncef' n24) $ encodePlain y 134 let bs = encode $ encrypt (noncef' n24) $ encodePlain y
132 hPut h $ encode (fromIntegral $ Data.ByteString.length bs :: Word16) 135 hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs)
133 <> bs 136 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e
134 putMVar nsend (incrementNonce24 n24) 137 putMVar nsend (incrementNonce24 n24)
135 } 138 }
136 , streamAddr = nodeAddr 139 , streamAddr = nodeAddr
137 } 140 }
138 141
139toxTCP :: TransportCrypto -> IO (Transport err NodeInfo RelayPacket) 142toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket))
140toxTCP crypto = tcpTransport 30 (tcpStream crypto) 143toxTCP crypto = tcpTransport 30 (tcpStream crypto)
141 144
142tcpSpace :: KademliaSpace NodeId NodeInfo 145tcpSpace :: KademliaSpace NodeId NodeInfo
@@ -153,7 +156,7 @@ nodeSearch tcp = Search
153 156
154data TCPClient err meth tid = TCPClient 157data TCPClient err meth tid = TCPClient
155 { tcpCrypto :: TransportCrypto 158 { tcpCrypto :: TransportCrypto
156 , tcpClient :: Client err () tid NodeInfo RelayPacket 159 , tcpClient :: Client err PacketNumber tid NodeInfo (Bool,RelayPacket)
157 , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo) 160 , tcpGetGateway :: UDP.NodeInfo -> STM (Maybe NodeInfo)
158 } 161 }
159 162
@@ -200,8 +203,8 @@ getUDPNodes' tcp seeking dst0 = do
200 wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst) 203 wrap0 <- lookupNonceFunction (tcpCrypto tcp) (transportSecret $ tcpCrypto tcp) (UDP.id2key $ UDP.nodeId dst)
201 let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse) 204 let meth = MethodSerializer -- MethodSerializer Nonce8 NodeInfo RelayPacket meth AnnounceRequest (Either String AnnounceResponse)
202 { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout 205 { methodTimeout = \tid addr -> return (addr,12000000) -- 12 second timeout
203 , method = () -- meth 206 , method = OnionPacketID -- meth
204 , wrapQuery = \n8 src gateway x -> 207 , wrapQuery = \n8 src gateway x -> (,) True $
205 OnionPacket n24 $ Addressed (UDP.nodeAddr dst) 208 OnionPacket n24 $ Addressed (UDP.nodeAddr dst)
206 $ wrapOnionPure b (wrap2 n24) (nodeAddr gateway') 209 $ wrapOnionPure b (wrap2 n24) (nodeAddr gateway')
207 $ wrapOnionPure c (wrap1 n24) (UDP.nodeAddr dst) 210 $ wrapOnionPure c (wrap1 n24) (UDP.nodeAddr dst)
@@ -212,7 +215,7 @@ getUDPNodes' tcp seeking dst0 = do
212 , asymmData = pure (x,n8) 215 , asymmData = pure (x,n8)
213 } 216 }
214 , unwrapResponse = \case 217 , unwrapResponse = \case
215 OnionPacketResponse (OnionAnnounceResponse _ n24' r) 218 (_,OnionPacketResponse (OnionAnnounceResponse _ n24' r))
216 -> decrypt (wrap0 n24') r >>= decodePlain 219 -> decrypt (wrap0 n24') r >>= decodePlain
217 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x 220 x -> Left $ "getUDPNodes: unwrapResponse fail " ++ show x
218 } 221 }
@@ -222,26 +225,28 @@ getUDPNodes' tcp seeking dst0 = do
222 return ( (ns,ns, const () <$> mb), gateway ) 225 return ( (ns,ns, const () <$> mb), gateway )
223 226
224 227
225handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (RelayPacket -> RelayPacket)) 228handleOOB :: PublicKey -> ByteString -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x))
226handleOOB k bs src dst = do 229handleOOB k bs src dst = do
227 dput XMisc $ "TODO: handleOOB " ++ show src 230 dput XMisc $ "TODO: handleOOB " ++ show src
228 return Nothing 231 return Nothing
229 232
230handle2route :: OnionMessage Encrypted -> NodeInfo -> NodeInfo -> IO (Maybe (RelayPacket -> RelayPacket)) 233handle2route :: OnionMessage Encrypted -> NodeInfo -> NodeInfo -> IO (Maybe (x -> x))
231handle2route o src dst = do 234handle2route o src dst = do
232 dput XMisc $ "TODO: handle2route " ++ show src 235 dput XMisc $ "TODO: handle2route " ++ show src
233 return Nothing 236 return Nothing
234 237
235tcpPing :: Client err () Nonce8 addr RelayPacket -> addr -> IO (Maybe ()) 238tcpPing :: Show addr => Client err PacketNumber Nonce8 addr (Bool,RelayPacket) -> addr -> IO (Maybe ())
236tcpPing client dst = sendQuery client meth () dst 239tcpPing client dst = do
240 dput XTCP $ "tcpPing " ++ show dst
241 sendQuery client meth () dst
237 where meth = MethodSerializer 242 where meth = MethodSerializer
238 { wrapQuery = \n8 src dst () -> RelayPing n8 243 { wrapQuery = \n8 src dst () -> (True,RelayPing n8)
239 , unwrapResponse = \_ -> () 244 , unwrapResponse = \_ -> ()
240 , methodTimeout = \n8 dst -> return (dst,5000000) 245 , methodTimeout = \n8 dst -> return (dst,5000000)
241 , method = () 246 , method = PingPacket
242 } 247 }
243 248
244type RelayClient = Client String () Nonce8 NodeInfo RelayPacket 249type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)
245 250
246-- | Create a new TCP relay client. Because polymorphic existential record 251-- | Create a new TCP relay client. Because polymorphic existential record
247-- updates are currently hard with GHC, this function accepts parameters for 252-- updates are currently hard with GHC, this function accepts parameters for
@@ -249,29 +254,34 @@ type RelayClient = Client String () Nonce8 NodeInfo RelayPacket
249-- defaults are 'id' and 'tryPutMVar'. The resulting customized table state 254-- defaults are 'id' and 'tryPutMVar'. The resulting customized table state
250-- will be returned to the caller along with the new client. 255-- will be returned to the caller along with the new client.
251newClient :: TransportCrypto 256newClient :: TransportCrypto
252 -> (MVar RelayPacket -> a) -- ^ store mvar for query 257 -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query
253 -> (a -> RelayPacket -> IO void) -- ^ load mvar for query 258 -> (a -> RelayPacket -> IO void) -- ^ load mvar for query
254 -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) 259 -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a)
255 , Client String () Nonce8 NodeInfo RelayPacket) 260 , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket))
256newClient crypto store load = do 261newClient crypto store load = do
257 net <- toxTCP crypto 262 net <- toxTCP crypto
258 drg <- drgNew 263 drg <- drgNew
259 map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) 264 map_var <- atomically $ newTVar (drg, Data.Word64Map.empty)
260 return $ (,) map_var Client 265 return $ (,) map_var Client
261 { clientNet = net 266 { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net
262 , clientDispatcher = DispatchMethods 267 , clientDispatcher = DispatchMethods
263 { classifyInbound = \case 268 { classifyInbound = (. snd) $ \case
264 RelayPing n -> IsQuery () n 269 RelayPing n -> IsQuery PingPacket n
265 RelayPong n -> IsResponse n 270 RelayPong n -> IsResponse n
266 OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8 271 OnionPacketResponse (OnionAnnounceResponse n8 n24 ciphered) -> IsResponse n8
267 OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o 272 OnionPacketResponse o@(OnionToRouteResponse _) -> IsUnsolicited $ handle2route o
268 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs 273 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs
269 , lookupHandler = \() -> Just MethodHandler 274 , lookupHandler = \case
270 { methodParse = \(RelayPing n8) -> Right () 275 PingPacket -> Just MethodHandler
271 , methodSerialize = \n8 src dst () -> RelayPong n8 276 { methodParse = \(_,RelayPing n8) -> Right ()
277 , methodSerialize = \n8 src dst () -> (False, RelayPong n8)
272 , methodAction = \src () -> return () 278 , methodAction = \src () -> return ()
273 } 279 }
274 , tableMethods = transactionMethods' store load (contramap (\(Nonce8 w64) -> w64) w64MapMethods) 280 w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply
281 { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a
282 , noreplyAction = \addr a -> dput XTCP $ "tcp-lookupHandler: "++show w
283 }
284 , tableMethods = transactionMethods' store (\x -> load x . snd) (contramap (\(Nonce8 w64) -> w64) w64MapMethods)
275 $ first (either error Nonce8 . decode) . randomBytesGenerate 8 285 $ first (either error Nonce8 . decode) . randomBytesGenerate 8
276 } 286 }
277 , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors } 287 , clientErrorReporter = logErrors { reportTimeout = reportTimeout ignoreErrors }