diff options
-rw-r--r-- | OnionRouter.hs | 5 | ||||
-rw-r--r-- | TCPProber.hs | 2 | ||||
-rw-r--r-- | examples/dhtd.hs | 12 | ||||
-rw-r--r-- | src/Network/QueryResponse.hs | 1 | ||||
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 36 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 24 |
6 files changed, 56 insertions, 24 deletions
diff --git a/OnionRouter.hs b/OnionRouter.hs index 0bbcd20d..55a08d48 100644 --- a/OnionRouter.hs +++ b/OnionRouter.hs | |||
@@ -11,6 +11,7 @@ import Network.Kademlia.Bootstrap | |||
11 | import Network.Kademlia.Routing as R | 11 | import Network.Kademlia.Routing as R |
12 | import Network.Kademlia.Search | 12 | import Network.Kademlia.Search |
13 | import Network.QueryResponse | 13 | import Network.QueryResponse |
14 | import Network.QueryResponse.TCP | ||
14 | import Network.Tox.NodeId | 15 | import Network.Tox.NodeId |
15 | import Network.Tox.Onion.Transport as Onion | 16 | import Network.Tox.Onion.Transport as Onion |
16 | import qualified Data.Tox.Relay as TCP | 17 | import qualified Data.Tox.Relay as TCP |
@@ -82,6 +83,7 @@ data OnionRouter = OnionRouter | |||
82 | , tcpKademliaClient :: TCP.TCPClient String Nonce8 | 83 | , tcpKademliaClient :: TCP.TCPClient String Nonce8 |
83 | -- | This thread maintains the TCP relay table. | 84 | -- | This thread maintains the TCP relay table. |
84 | , tcpKademliaThread :: ThreadId | 85 | , tcpKademliaThread :: ThreadId |
86 | , tcpProberState :: TCPCache (SessionProtocol TCP.RelayPacket TCP.RelayPacket) | ||
85 | , tcpProber :: TCP.TCPProber | 87 | , tcpProber :: TCP.TCPProber |
86 | , tcpProberThread :: ThreadId | 88 | , tcpProberThread :: ThreadId |
87 | -- | Kademlia table of TCP relays. | 89 | -- | Kademlia table of TCP relays. |
@@ -162,7 +164,7 @@ newOnionRouter crypto perror = do | |||
162 | pq <- newTVar W64.empty | 164 | pq <- newTVar W64.empty |
163 | rm <- newArray (0,11) Nothing | 165 | rm <- newArray (0,11) Nothing |
164 | return (rlog,pq,rm) | 166 | return (rlog,pq,rm) |
165 | ((tbl,tcptbl),tcp) <- do | 167 | ((tbl,(tcptbl,tcpcons)),tcp) <- do |
166 | (tcptbl, client) <- TCP.newClient crypto Left $ \case | 168 | (tcptbl, client) <- TCP.newClient crypto Left $ \case |
167 | Left v -> void . tryPutMVar v . (,) False | 169 | Left v -> void . tryPutMVar v . (,) False |
168 | Right v -> \case | 170 | Right v -> \case |
@@ -249,6 +251,7 @@ newOnionRouter crypto perror = do | |||
249 | , routeLog = rlog | 251 | , routeLog = rlog |
250 | , routeThread = error "forkRouteBuilder not invoked (missing onion route builder thread)." | 252 | , routeThread = error "forkRouteBuilder not invoked (missing onion route builder thread)." |
251 | , tcpKademliaThread = error "forkRouteBuilder not invoked (missing TCP bucket maintenance thread)." | 253 | , tcpKademliaThread = error "forkRouteBuilder not invoked (missing TCP bucket maintenance thread)." |
254 | , tcpProberState = tcpcons | ||
252 | , tcpProber = prober | 255 | , tcpProber = prober |
253 | , tcpProberThread = error "forkRouteBuilder not invoked (missing TCP probe thread)." | 256 | , tcpProberThread = error "forkRouteBuilder not invoked (missing TCP probe thread)." |
254 | , routeLogger = perror | 257 | , routeLogger = perror |
diff --git a/TCPProber.hs b/TCPProber.hs index 506ef142..baddb39e 100644 --- a/TCPProber.hs +++ b/TCPProber.hs | |||
@@ -168,6 +168,6 @@ nodeSearch prober tcp = Search | |||
168 | { searchSpace = TCP.tcpSpace | 168 | { searchSpace = TCP.tcpSpace |
169 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort | 169 | , searchNodeAddress = TCP.nodeIP &&& TCP.tcpPort |
170 | , searchQuery = getNodes prober tcp | 170 | , searchQuery = getNodes prober tcp |
171 | , searchAlpha = 1 | 171 | , searchAlpha = 8 |
172 | , searchK = 16 | 172 | , searchK = 16 |
173 | } | 173 | } |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 96cfbe0e..b4198c1d 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -72,6 +72,7 @@ import DebugUtil | |||
72 | import Network.UPNP as UPNP | 72 | import Network.UPNP as UPNP |
73 | import Network.Address hiding (NodeId, NodeInfo(..)) | 73 | import Network.Address hiding (NodeId, NodeInfo(..)) |
74 | import Network.QueryResponse | 74 | import Network.QueryResponse |
75 | import qualified Network.QueryResponse.TCP as TCP | ||
75 | import Network.StreamServer | 76 | import Network.StreamServer |
76 | import Network.Kademlia.Bootstrap (refreshBuckets,bootstrap) | 77 | import Network.Kademlia.Bootstrap (refreshBuckets,bootstrap) |
77 | import Network.Kademlia.CommonAPI | 78 | import Network.Kademlia.CommonAPI |
@@ -399,6 +400,7 @@ clientSession s@Session{..} sock cnum h = do | |||
399 | , ["sessions"] | 400 | , ["sessions"] |
400 | , ["session"] | 401 | , ["session"] |
401 | , ["netcrypto"] | 402 | , ["netcrypto"] |
403 | , ["tcp"] | ||
402 | , ["onion"] | 404 | , ["onion"] |
403 | , ["g"] | 405 | , ["g"] |
404 | , ["p"] | 406 | , ["p"] |
@@ -673,6 +675,14 @@ clientSession s@Session{..} sock cnum h = do | |||
673 | setVerbose tag | 675 | setVerbose tag |
674 | hPutClient h $ "Showing " ++ show tag ++ " messages." | 676 | hPutClient h $ "Showing " ++ show tag ++ " messages." |
675 | 677 | ||
678 | ("tcp",s) | "" <- strp s | ||
679 | -> cmd0 $ join $ atomically $ do | ||
680 | tcps <- readTVar (TCP.lru $ tcpProberState onionRouter) | ||
681 | return $ do | ||
682 | now <- getPOSIXTime | ||
683 | forM (MM.toList tcps) $ \(MM.Binding (TCP.TCPAddress addr) tcp (Down tm)) -> do | ||
684 | hPutClientChunk h $ unwords [show addr, show (now - tm), TCP.showStat tcp] ++ "\n" | ||
685 | hPutClient h $ show (MM.size tcps) ++ " active or pending connections.\n" | ||
676 | 686 | ||
677 | ("onion", s) | "" <- strp $ map toLower s | 687 | ("onion", s) | "" <- strp $ map toLower s |
678 | -> cmd0 $ do | 688 | -> cmd0 $ do |
@@ -692,6 +702,7 @@ clientSession s@Session{..} sock cnum h = do | |||
692 | tcp_cache <- readTVar (TCP.probeCache $ tcpProber onionRouter) | 702 | tcp_cache <- readTVar (TCP.probeCache $ tcpProber onionRouter) |
693 | tcp_queue <- readTVar (TCP.probeQueue $ tcpProber onionRouter) | 703 | tcp_queue <- readTVar (TCP.probeQueue $ tcpProber onionRouter) |
694 | tcpmode <- readTVar (tcpMode onionRouter) | 704 | tcpmode <- readTVar (tcpMode onionRouter) |
705 | tcps <- readTVar (TCP.lru $ tcpProberState onionRouter) | ||
695 | let showRecord :: Int -> Int -> [String] | 706 | let showRecord :: Int -> Int -> [String] |
696 | showRecord n wanted_ver | 707 | showRecord n wanted_ver |
697 | | Just RouteRecord{responseCount,timeoutCount,routeVersion,routeBirthTime | 708 | | Just RouteRecord{responseCount,timeoutCount,routeVersion,routeBirthTime |
@@ -709,6 +720,7 @@ clientSession s@Session{..} sock cnum h = do | |||
709 | ++ if tcpmode then "" else " *" | 720 | ++ if tcpmode then "" else " *" |
710 | , "trampolines(TCP): " ++ show (IntMap.size tts,ttcnt,ticnt) | 721 | , "trampolines(TCP): " ++ show (IntMap.size tts,ttcnt,ticnt) |
711 | ++ if tcpmode then " *" else "" | 722 | ++ if tcpmode then " *" else "" |
723 | , "active TCP: " ++ show (MM.size tcps) | ||
712 | , "pending: " ++ show (W64.size pqs) | 724 | , "pending: " ++ show (W64.size pqs) |
713 | , "TCP spill,cache,queue: " | 725 | , "TCP spill,cache,queue: " |
714 | ++ show (PSQ.size tcp_spill, PSQ.size tcp_cache, PSQ.size tcp_queue)] | 726 | ++ show (PSQ.size tcp_spill, PSQ.size tcp_cache, PSQ.size tcp_queue)] |
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index 0fbbc929..13160d31 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs | |||
@@ -198,6 +198,7 @@ forkListener name client = do | |||
198 | thread_id <- forkIO $ do | 198 | thread_id <- forkIO $ do |
199 | myThreadId >>= flip labelThread ("listener."++name) | 199 | myThreadId >>= flip labelThread ("listener."++name) |
200 | fix $ awaitMessage client . const | 200 | fix $ awaitMessage client . const |
201 | dput XMisc $ "Listener died: " ++ name | ||
201 | return $ do | 202 | return $ do |
202 | closeTransport client | 203 | closeTransport client |
203 | killThread thread_id | 204 | killThread thread_id |
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index a606c51d..bad61727 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -9,6 +9,7 @@ import Control.Concurrent.Lifted | |||
9 | import GHC.Conc (labelThread) | 9 | import GHC.Conc (labelThread) |
10 | #endif | 10 | #endif |
11 | 11 | ||
12 | import Control.Arrow | ||
12 | import Control.Concurrent.STM | 13 | import Control.Concurrent.STM |
13 | import Control.Monad | 14 | import Control.Monad |
14 | import Data.ByteString (ByteString,hPut) | 15 | import Data.ByteString (ByteString,hPut) |
@@ -39,7 +40,7 @@ data TCPSession st | |||
39 | } | 40 | } |
40 | 41 | ||
41 | newtype TCPAddress = TCPAddress SockAddr | 42 | newtype TCPAddress = TCPAddress SockAddr |
42 | deriving (Eq,Ord) | 43 | deriving (Eq,Ord,Show) |
43 | 44 | ||
44 | instance Hashable TCPAddress where | 45 | instance Hashable TCPAddress where |
45 | hashWithSalt salt (TCPAddress x) = case x of | 46 | hashWithSalt salt (TCPAddress x) = case x of |
@@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO () | |||
67 | killSession PendingTCPSession = return () | 68 | killSession PendingTCPSession = return () |
68 | killSession TCPSession{tcpThread=t} = killThread t | 69 | killSession TCPSession{tcpThread=t} = killThread t |
69 | 70 | ||
71 | showStat r = case r of PendingTCPSession -> "pending." | ||
72 | TCPSession {} -> "established." | ||
73 | |||
70 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 74 | acquireConnection :: MVar (Maybe (Either a (x, addr))) |
71 | -> TCPCache (SessionProtocol x y) | 75 | -> TCPCache (SessionProtocol x y) |
72 | -> StreamHandshake addr x y | 76 | -> StreamHandshake addr x y |
@@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
75 | -> IO (Maybe (y -> IO ())) | 79 | -> IO (Maybe (y -> IO ())) |
76 | acquireConnection mvar tcpcache stream addr bDoCon = do | 80 | acquireConnection mvar tcpcache stream addr bDoCon = do |
77 | now <- getPOSIXTime | 81 | now <- getPOSIXTime |
82 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
78 | entry <- atomically $ do | 83 | entry <- atomically $ do |
79 | c <- readTVar (lru tcpcache) | 84 | c <- readTVar (lru tcpcache) |
80 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 85 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c |
@@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
84 | | otherwise -> return () | 89 | | otherwise -> return () |
85 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | 90 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) |
86 | return v | 91 | return v |
92 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
87 | case entry of | 93 | case entry of |
88 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 94 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
89 | proto <- getProtocolNumber "tcp" | 95 | proto <- getProtocolNumber "tcp" |
@@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
98 | ret <- fmap join $ forM mh $ \h -> do | 104 | ret <- fmap join $ forM mh $ \h -> do |
99 | st <- streamHello stream addr h | 105 | st <- streamHello stream addr h |
100 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | 106 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
101 | rthread <- forkIO $ fix $ \loop -> do | 107 | signal <- newTVarIO False |
108 | rthread <- forkIO $ do | ||
109 | atomically (readTVar signal >>= check) | ||
110 | fix $ \loop -> do | ||
102 | x <- streamDecode st | 111 | x <- streamDecode st |
103 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | 112 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
104 | case x of | 113 | case x of |
105 | Just u -> do | 114 | Just u -> do |
106 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | 115 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) |
107 | when (isNothing m) $ do | 116 | when (isNothing m) $ do |
108 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." | 117 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." |
118 | tryTakeMVar mvar | ||
119 | return () | ||
109 | loop | 120 | loop |
110 | Nothing -> do | 121 | Nothing -> do |
111 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | 122 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) |
112 | atomically $ modifyTVar' (lru tcpcache) | 123 | do atomically $ modifyTVar' (lru tcpcache) |
113 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 124 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
114 | c <- atomically $ readTVar (lru tcpcache) | 125 | c <- atomically $ readTVar (lru tcpcache) |
115 | now <- getPOSIXTime | 126 | now <- getPOSIXTime |
116 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 127 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
117 | let stat = case r of | 128 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] |
118 | PendingTCPSession -> "pending." | ||
119 | TCPSession {} -> "established." | ||
120 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] | ||
121 | hClose h | 129 | hClose h |
122 | let showAddr a = show (streamAddr stream a) | 130 | let showAddr a = show (streamAddr stream a) |
123 | labelThread rthread ("tcp:"++showAddr addr) | 131 | labelThread rthread ("tcp:"++showAddr addr) |
@@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
132 | let (rs,c') = MM.takeView (tcpMax tcpcache) | 140 | let (rs,c') = MM.takeView (tcpMax tcpcache) |
133 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | 141 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c |
134 | writeTVar (lru tcpcache) c' | 142 | writeTVar (lru tcpcache) c' |
143 | writeTVar signal True | ||
135 | return rs | 144 | return rs |
136 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 145 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
137 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 146 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
@@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
146 | when (isNothing ret) $ do | 155 | when (isNothing ret) $ do |
147 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | 156 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) |
148 | return ret | 157 | return ret |
149 | Just (tm, PendingTCPSession) -> do | 158 | Just (tm, PendingTCPSession) |
150 | fmap join $ timeout 10000000 $ atomically $ do | 159 | | not bDoCon -> return Nothing |
160 | | otherwise -> fmap join $ timeout 10000000 $ atomically $ do | ||
151 | c <- readTVar (lru tcpcache) | 161 | c <- readTVar (lru tcpcache) |
152 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 162 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c |
153 | case v of | 163 | case v of |
@@ -166,17 +176,17 @@ closeAll tcpcache stream = do | |||
166 | 176 | ||
167 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 177 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
168 | -> StreamHandshake addr x y | 178 | -> StreamHandshake addr x y |
169 | -> IO (TransportA err addr x (Bool,y)) | 179 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) |
170 | tcpTransport maxcon stream = do | 180 | tcpTransport maxcon stream = do |
171 | msgvar <- newEmptyMVar | 181 | msgvar <- newEmptyMVar |
172 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 182 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
173 | return Transport | 183 | return $ (,) tcpcache Transport |
174 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | 184 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
175 | , sendMessage = \addr (bDoCon,y) -> do | 185 | , sendMessage = \addr (bDoCon,y) -> do |
176 | t <- forkIO $ do | 186 | t <- forkIO $ do |
177 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | 187 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
178 | mapM_ ($ y) msock | 188 | mapM_ ($ y) msock |
179 | `catchIOError` \e -> return () | 189 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e |
180 | labelThread t "tcp-send" | 190 | labelThread t "tcp-send" |
181 | , closeTransport = closeAll tcpcache stream | 191 | , closeTransport = closeAll tcpcache stream |
182 | } | 192 | } |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index 36200586..adb42514 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -132,18 +132,22 @@ tcpStream crypto = StreamHandshake | |||
132 | dput XTCP $ "TCP exception: " ++ show e | 132 | dput XTCP $ "TCP exception: " ++ show e |
133 | return Nothing | 133 | return Nothing |
134 | , streamEncode = \y -> do | 134 | , streamEncode = \y -> do |
135 | dput XTCP $ "TCP(acquire nonce):" ++ show addr ++ " <-- " ++ show y | ||
135 | n24 <- takeMVar nsend | 136 | n24 <- takeMVar nsend |
136 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y | 137 | dput XTCP $ "TCP(got nonce):" ++ show addr ++ " <-- " ++ show y |
137 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | 138 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y |
138 | ($ h) -- bracket (takeMVar hvar) (putMVar hvar) | 139 | ($ h) -- bracket (takeMVar hvar) (putMVar hvar) |
139 | $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) | 140 | $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) |
140 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e | 141 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e |
142 | dput XTCP $ "TCP(incrementing nonce): " ++ show addr ++ " <-- " ++ show y | ||
141 | putMVar nsend (incrementNonce24 n24) | 143 | putMVar nsend (incrementNonce24 n24) |
144 | dput XTCP $ "TCP(finished): " ++ show addr ++ " <-- " ++ show y | ||
142 | } | 145 | } |
143 | , streamAddr = nodeAddr | 146 | , streamAddr = nodeAddr |
144 | } | 147 | } |
145 | 148 | ||
146 | toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket)) | 149 | toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol RelayPacket RelayPacket) |
150 | , TransportA err NodeInfo RelayPacket (Bool,RelayPacket) ) | ||
147 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) | 151 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) |
148 | 152 | ||
149 | tcpSpace :: KademliaSpace NodeId NodeInfo | 153 | tcpSpace :: KademliaSpace NodeId NodeInfo |
@@ -267,13 +271,14 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) | |||
267 | newClient :: TransportCrypto | 271 | newClient :: TransportCrypto |
268 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query | 272 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query |
269 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query | 273 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query |
270 | -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 274 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
275 | , TCPCache (SessionProtocol RelayPacket RelayPacket) ) | ||
271 | , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) | 276 | , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) |
272 | newClient crypto store load = do | 277 | newClient crypto store load = do |
273 | net <- toxTCP crypto | 278 | (tcpcache,net) <- toxTCP crypto |
274 | drg <- drgNew | 279 | drg <- drgNew |
275 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) | 280 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) |
276 | return $ (,) map_var Client | 281 | return $ (,) (map_var,tcpcache) Client |
277 | { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net | 282 | { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net |
278 | , clientDispatcher = DispatchMethods | 283 | , clientDispatcher = DispatchMethods |
279 | { classifyInbound = (. snd) $ \case | 284 | { classifyInbound = (. snd) $ \case |
@@ -284,10 +289,11 @@ newClient crypto store load = do | |||
284 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs | 289 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs |
285 | wut -> IsUnknown (show wut) | 290 | wut -> IsUnknown (show wut) |
286 | , lookupHandler = \case | 291 | , lookupHandler = \case |
287 | PingPacket -> Just MethodHandler | 292 | PingPacket -> trace ("tcp-received-ping") $ Just MethodHandler |
288 | { methodParse = \(_,RelayPing n8) -> Right () | 293 | { methodParse = \case (_,RelayPing n8) -> Right () |
289 | , methodSerialize = \n8 src dst () -> (False, RelayPong n8) | 294 | _ -> trace ("tcp-non-ping") $ Left "TCP: Non-ping?" |
290 | , methodAction = \src () -> return () | 295 | , methodSerialize = \n8 src dst () -> trace ("tcp-made-pong-"++show n8) (False, RelayPong n8) |
296 | , methodAction = \src () -> dput XTCP $ "TCP pinged by "++show src | ||
291 | } | 297 | } |
292 | w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply | 298 | w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply |
293 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a | 299 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a |