diff options
author | Joe Crayne <joe@jerkface.net> | 2018-12-27 19:52:08 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2018-12-27 19:52:08 -0500 |
commit | 3dae31030c20ed9ad831dfba88db781ebe71ca54 (patch) | |
tree | 8bb99468709d7787c6dceef524d024b1c833c0f1 /src/Network | |
parent | 37eb345d1ffb40e814766b6df8134ca21e6987a7 (diff) |
TCP-related Debug info.
Diffstat (limited to 'src/Network')
-rw-r--r-- | src/Network/QueryResponse.hs | 1 | ||||
-rw-r--r-- | src/Network/QueryResponse/TCP.hs | 36 | ||||
-rw-r--r-- | src/Network/Tox/TCP.hs | 24 |
3 files changed, 39 insertions, 22 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs index 0fbbc929..13160d31 100644 --- a/src/Network/QueryResponse.hs +++ b/src/Network/QueryResponse.hs | |||
@@ -198,6 +198,7 @@ forkListener name client = do | |||
198 | thread_id <- forkIO $ do | 198 | thread_id <- forkIO $ do |
199 | myThreadId >>= flip labelThread ("listener."++name) | 199 | myThreadId >>= flip labelThread ("listener."++name) |
200 | fix $ awaitMessage client . const | 200 | fix $ awaitMessage client . const |
201 | dput XMisc $ "Listener died: " ++ name | ||
201 | return $ do | 202 | return $ do |
202 | closeTransport client | 203 | closeTransport client |
203 | killThread thread_id | 204 | killThread thread_id |
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs index a606c51d..bad61727 100644 --- a/src/Network/QueryResponse/TCP.hs +++ b/src/Network/QueryResponse/TCP.hs | |||
@@ -9,6 +9,7 @@ import Control.Concurrent.Lifted | |||
9 | import GHC.Conc (labelThread) | 9 | import GHC.Conc (labelThread) |
10 | #endif | 10 | #endif |
11 | 11 | ||
12 | import Control.Arrow | ||
12 | import Control.Concurrent.STM | 13 | import Control.Concurrent.STM |
13 | import Control.Monad | 14 | import Control.Monad |
14 | import Data.ByteString (ByteString,hPut) | 15 | import Data.ByteString (ByteString,hPut) |
@@ -39,7 +40,7 @@ data TCPSession st | |||
39 | } | 40 | } |
40 | 41 | ||
41 | newtype TCPAddress = TCPAddress SockAddr | 42 | newtype TCPAddress = TCPAddress SockAddr |
42 | deriving (Eq,Ord) | 43 | deriving (Eq,Ord,Show) |
43 | 44 | ||
44 | instance Hashable TCPAddress where | 45 | instance Hashable TCPAddress where |
45 | hashWithSalt salt (TCPAddress x) = case x of | 46 | hashWithSalt salt (TCPAddress x) = case x of |
@@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO () | |||
67 | killSession PendingTCPSession = return () | 68 | killSession PendingTCPSession = return () |
68 | killSession TCPSession{tcpThread=t} = killThread t | 69 | killSession TCPSession{tcpThread=t} = killThread t |
69 | 70 | ||
71 | showStat r = case r of PendingTCPSession -> "pending." | ||
72 | TCPSession {} -> "established." | ||
73 | |||
70 | acquireConnection :: MVar (Maybe (Either a (x, addr))) | 74 | acquireConnection :: MVar (Maybe (Either a (x, addr))) |
71 | -> TCPCache (SessionProtocol x y) | 75 | -> TCPCache (SessionProtocol x y) |
72 | -> StreamHandshake addr x y | 76 | -> StreamHandshake addr x y |
@@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr))) | |||
75 | -> IO (Maybe (y -> IO ())) | 79 | -> IO (Maybe (y -> IO ())) |
76 | acquireConnection mvar tcpcache stream addr bDoCon = do | 80 | acquireConnection mvar tcpcache stream addr bDoCon = do |
77 | now <- getPOSIXTime | 81 | now <- getPOSIXTime |
82 | -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr) | ||
78 | entry <- atomically $ do | 83 | entry <- atomically $ do |
79 | c <- readTVar (lru tcpcache) | 84 | c <- readTVar (lru tcpcache) |
80 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 85 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c |
@@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
84 | | otherwise -> return () | 89 | | otherwise -> return () |
85 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) | 90 | Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) |
86 | return v | 91 | return v |
92 | -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry) | ||
87 | case entry of | 93 | case entry of |
88 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do | 94 | Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do |
89 | proto <- getProtocolNumber "tcp" | 95 | proto <- getProtocolNumber "tcp" |
@@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
98 | ret <- fmap join $ forM mh $ \h -> do | 104 | ret <- fmap join $ forM mh $ \h -> do |
99 | st <- streamHello stream addr h | 105 | st <- streamHello stream addr h |
100 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | 106 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
101 | rthread <- forkIO $ fix $ \loop -> do | 107 | signal <- newTVarIO False |
108 | rthread <- forkIO $ do | ||
109 | atomically (readTVar signal >>= check) | ||
110 | fix $ \loop -> do | ||
102 | x <- streamDecode st | 111 | x <- streamDecode st |
103 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x | 112 | dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x |
104 | case x of | 113 | case x of |
105 | Just u -> do | 114 | Just u -> do |
106 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) | 115 | m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) |
107 | when (isNothing m) $ do | 116 | when (isNothing m) $ do |
108 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." | 117 | dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet." |
118 | tryTakeMVar mvar | ||
119 | return () | ||
109 | loop | 120 | loop |
110 | Nothing -> do | 121 | Nothing -> do |
111 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) | 122 | dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) |
112 | atomically $ modifyTVar' (lru tcpcache) | 123 | do atomically $ modifyTVar' (lru tcpcache) |
113 | $ MM.delete (TCPAddress $ streamAddr stream addr) | 124 | $ MM.delete (TCPAddress $ streamAddr stream addr) |
114 | c <- atomically $ readTVar (lru tcpcache) | 125 | c <- atomically $ readTVar (lru tcpcache) |
115 | now <- getPOSIXTime | 126 | now <- getPOSIXTime |
116 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 127 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
117 | let stat = case r of | 128 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] |
118 | PendingTCPSession -> "pending." | ||
119 | TCPSession {} -> "established." | ||
120 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat] | ||
121 | hClose h | 129 | hClose h |
122 | let showAddr a = show (streamAddr stream a) | 130 | let showAddr a = show (streamAddr stream a) |
123 | labelThread rthread ("tcp:"++showAddr addr) | 131 | labelThread rthread ("tcp:"++showAddr addr) |
@@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
132 | let (rs,c') = MM.takeView (tcpMax tcpcache) | 140 | let (rs,c') = MM.takeView (tcpMax tcpcache) |
133 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c | 141 | $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c |
134 | writeTVar (lru tcpcache) c' | 142 | writeTVar (lru tcpcache) c' |
143 | writeTVar signal True | ||
135 | return rs | 144 | return rs |
136 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 145 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do |
137 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | 146 | myThreadId >>= flip labelThread ("tcp-close:"++show k) |
@@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
146 | when (isNothing ret) $ do | 155 | when (isNothing ret) $ do |
147 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) | 156 | atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) |
148 | return ret | 157 | return ret |
149 | Just (tm, PendingTCPSession) -> do | 158 | Just (tm, PendingTCPSession) |
150 | fmap join $ timeout 10000000 $ atomically $ do | 159 | | not bDoCon -> return Nothing |
160 | | otherwise -> fmap join $ timeout 10000000 $ atomically $ do | ||
151 | c <- readTVar (lru tcpcache) | 161 | c <- readTVar (lru tcpcache) |
152 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c | 162 | let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c |
153 | case v of | 163 | case v of |
@@ -166,17 +176,17 @@ closeAll tcpcache stream = do | |||
166 | 176 | ||
167 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. | 177 | tcpTransport :: Int -- ^ maximum number of TCP links to maintain. |
168 | -> StreamHandshake addr x y | 178 | -> StreamHandshake addr x y |
169 | -> IO (TransportA err addr x (Bool,y)) | 179 | -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y)) |
170 | tcpTransport maxcon stream = do | 180 | tcpTransport maxcon stream = do |
171 | msgvar <- newEmptyMVar | 181 | msgvar <- newEmptyMVar |
172 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) | 182 | tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) |
173 | return Transport | 183 | return $ (,) tcpcache Transport |
174 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | 184 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
175 | , sendMessage = \addr (bDoCon,y) -> do | 185 | , sendMessage = \addr (bDoCon,y) -> do |
176 | t <- forkIO $ do | 186 | t <- forkIO $ do |
177 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | 187 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
178 | mapM_ ($ y) msock | 188 | mapM_ ($ y) msock |
179 | `catchIOError` \e -> return () | 189 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e |
180 | labelThread t "tcp-send" | 190 | labelThread t "tcp-send" |
181 | , closeTransport = closeAll tcpcache stream | 191 | , closeTransport = closeAll tcpcache stream |
182 | } | 192 | } |
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs index 36200586..adb42514 100644 --- a/src/Network/Tox/TCP.hs +++ b/src/Network/Tox/TCP.hs | |||
@@ -132,18 +132,22 @@ tcpStream crypto = StreamHandshake | |||
132 | dput XTCP $ "TCP exception: " ++ show e | 132 | dput XTCP $ "TCP exception: " ++ show e |
133 | return Nothing | 133 | return Nothing |
134 | , streamEncode = \y -> do | 134 | , streamEncode = \y -> do |
135 | dput XTCP $ "TCP(acquire nonce):" ++ show addr ++ " <-- " ++ show y | ||
135 | n24 <- takeMVar nsend | 136 | n24 <- takeMVar nsend |
136 | dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y | 137 | dput XTCP $ "TCP(got nonce):" ++ show addr ++ " <-- " ++ show y |
137 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y | 138 | let bs = encode $ encrypt (noncef' n24) $ encodePlain y |
138 | ($ h) -- bracket (takeMVar hvar) (putMVar hvar) | 139 | ($ h) -- bracket (takeMVar hvar) (putMVar hvar) |
139 | $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) | 140 | $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) |
140 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e | 141 | `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e |
142 | dput XTCP $ "TCP(incrementing nonce): " ++ show addr ++ " <-- " ++ show y | ||
141 | putMVar nsend (incrementNonce24 n24) | 143 | putMVar nsend (incrementNonce24 n24) |
144 | dput XTCP $ "TCP(finished): " ++ show addr ++ " <-- " ++ show y | ||
142 | } | 145 | } |
143 | , streamAddr = nodeAddr | 146 | , streamAddr = nodeAddr |
144 | } | 147 | } |
145 | 148 | ||
146 | toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket)) | 149 | toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol RelayPacket RelayPacket) |
150 | , TransportA err NodeInfo RelayPacket (Bool,RelayPacket) ) | ||
147 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) | 151 | toxTCP crypto = tcpTransport 30 (tcpStream crypto) |
148 | 152 | ||
149 | tcpSpace :: KademliaSpace NodeId NodeInfo | 153 | tcpSpace :: KademliaSpace NodeId NodeInfo |
@@ -267,13 +271,14 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket) | |||
267 | newClient :: TransportCrypto | 271 | newClient :: TransportCrypto |
268 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query | 272 | -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query |
269 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query | 273 | -> (a -> RelayPacket -> IO void) -- ^ load mvar for query |
270 | -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) | 274 | -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) |
275 | , TCPCache (SessionProtocol RelayPacket RelayPacket) ) | ||
271 | , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) | 276 | , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) |
272 | newClient crypto store load = do | 277 | newClient crypto store load = do |
273 | net <- toxTCP crypto | 278 | (tcpcache,net) <- toxTCP crypto |
274 | drg <- drgNew | 279 | drg <- drgNew |
275 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) | 280 | map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) |
276 | return $ (,) map_var Client | 281 | return $ (,) (map_var,tcpcache) Client |
277 | { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net | 282 | { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net |
278 | , clientDispatcher = DispatchMethods | 283 | , clientDispatcher = DispatchMethods |
279 | { classifyInbound = (. snd) $ \case | 284 | { classifyInbound = (. snd) $ \case |
@@ -284,10 +289,11 @@ newClient crypto store load = do | |||
284 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs | 289 | OOBRecv k bs -> IsUnsolicited $ handleOOB k bs |
285 | wut -> IsUnknown (show wut) | 290 | wut -> IsUnknown (show wut) |
286 | , lookupHandler = \case | 291 | , lookupHandler = \case |
287 | PingPacket -> Just MethodHandler | 292 | PingPacket -> trace ("tcp-received-ping") $ Just MethodHandler |
288 | { methodParse = \(_,RelayPing n8) -> Right () | 293 | { methodParse = \case (_,RelayPing n8) -> Right () |
289 | , methodSerialize = \n8 src dst () -> (False, RelayPong n8) | 294 | _ -> trace ("tcp-non-ping") $ Left "TCP: Non-ping?" |
290 | , methodAction = \src () -> return () | 295 | , methodSerialize = \n8 src dst () -> trace ("tcp-made-pong-"++show n8) (False, RelayPong n8) |
296 | , methodAction = \src () -> dput XTCP $ "TCP pinged by "++show src | ||
291 | } | 297 | } |
292 | w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply | 298 | w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply |
293 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a | 299 | { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a |