summaryrefslogtreecommitdiff
path: root/src/Network
diff options
context:
space:
mode:
authorJoe Crayne <joe@jerkface.net>2018-12-27 19:52:08 -0500
committerJoe Crayne <joe@jerkface.net>2018-12-27 19:52:08 -0500
commit3dae31030c20ed9ad831dfba88db781ebe71ca54 (patch)
tree8bb99468709d7787c6dceef524d024b1c833c0f1 /src/Network
parent37eb345d1ffb40e814766b6df8134ca21e6987a7 (diff)
TCP-related Debug info.
Diffstat (limited to 'src/Network')
-rw-r--r--src/Network/QueryResponse.hs1
-rw-r--r--src/Network/QueryResponse/TCP.hs36
-rw-r--r--src/Network/Tox/TCP.hs24
3 files changed, 39 insertions, 22 deletions
diff --git a/src/Network/QueryResponse.hs b/src/Network/QueryResponse.hs
index 0fbbc929..13160d31 100644
--- a/src/Network/QueryResponse.hs
+++ b/src/Network/QueryResponse.hs
@@ -198,6 +198,7 @@ forkListener name client = do
198 thread_id <- forkIO $ do 198 thread_id <- forkIO $ do
199 myThreadId >>= flip labelThread ("listener."++name) 199 myThreadId >>= flip labelThread ("listener."++name)
200 fix $ awaitMessage client . const 200 fix $ awaitMessage client . const
201 dput XMisc $ "Listener died: " ++ name
201 return $ do 202 return $ do
202 closeTransport client 203 closeTransport client
203 killThread thread_id 204 killThread thread_id
diff --git a/src/Network/QueryResponse/TCP.hs b/src/Network/QueryResponse/TCP.hs
index a606c51d..bad61727 100644
--- a/src/Network/QueryResponse/TCP.hs
+++ b/src/Network/QueryResponse/TCP.hs
@@ -9,6 +9,7 @@ import Control.Concurrent.Lifted
9import GHC.Conc (labelThread) 9import GHC.Conc (labelThread)
10#endif 10#endif
11 11
12import Control.Arrow
12import Control.Concurrent.STM 13import Control.Concurrent.STM
13import Control.Monad 14import Control.Monad
14import Data.ByteString (ByteString,hPut) 15import Data.ByteString (ByteString,hPut)
@@ -39,7 +40,7 @@ data TCPSession st
39 } 40 }
40 41
41newtype TCPAddress = TCPAddress SockAddr 42newtype TCPAddress = TCPAddress SockAddr
42 deriving (Eq,Ord) 43 deriving (Eq,Ord,Show)
43 44
44instance Hashable TCPAddress where 45instance Hashable TCPAddress where
45 hashWithSalt salt (TCPAddress x) = case x of 46 hashWithSalt salt (TCPAddress x) = case x of
@@ -67,6 +68,9 @@ killSession :: TCPSession st -> IO ()
67killSession PendingTCPSession = return () 68killSession PendingTCPSession = return ()
68killSession TCPSession{tcpThread=t} = killThread t 69killSession TCPSession{tcpThread=t} = killThread t
69 70
71showStat r = case r of PendingTCPSession -> "pending."
72 TCPSession {} -> "established."
73
70acquireConnection :: MVar (Maybe (Either a (x, addr))) 74acquireConnection :: MVar (Maybe (Either a (x, addr)))
71 -> TCPCache (SessionProtocol x y) 75 -> TCPCache (SessionProtocol x y)
72 -> StreamHandshake addr x y 76 -> StreamHandshake addr x y
@@ -75,6 +79,7 @@ acquireConnection :: MVar (Maybe (Either a (x, addr)))
75 -> IO (Maybe (y -> IO ())) 79 -> IO (Maybe (y -> IO ()))
76acquireConnection mvar tcpcache stream addr bDoCon = do 80acquireConnection mvar tcpcache stream addr bDoCon = do
77 now <- getPOSIXTime 81 now <- getPOSIXTime
82 -- dput XTCP $ "acquireConnection 0 " ++ show (streamAddr stream addr)
78 entry <- atomically $ do 83 entry <- atomically $ do
79 c <- readTVar (lru tcpcache) 84 c <- readTVar (lru tcpcache)
80 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 85 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
@@ -84,6 +89,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
84 | otherwise -> return () 89 | otherwise -> return ()
85 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now) 90 Just (tm, v) -> modifyTVar' (lru tcpcache) $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down now)
86 return v 91 return v
92 -- dput XTCP $ "acquireConnection 1 " ++ show (streamAddr stream addr, fmap (second showStat) entry)
87 case entry of 93 case entry of
88 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do 94 Nothing -> fmap join $ forM (guard bDoCon) $ \() -> do
89 proto <- getProtocolNumber "tcp" 95 proto <- getProtocolNumber "tcp"
@@ -98,26 +104,28 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
98 ret <- fmap join $ forM mh $ \h -> do 104 ret <- fmap join $ forM mh $ \h -> do
99 st <- streamHello stream addr h 105 st <- streamHello stream addr h
100 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) 106 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
101 rthread <- forkIO $ fix $ \loop -> do 107 signal <- newTVarIO False
108 rthread <- forkIO $ do
109 atomically (readTVar signal >>= check)
110 fix $ \loop -> do
102 x <- streamDecode st 111 x <- streamDecode st
103 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x 112 dput XTCP $ "TCP streamDecode " ++ show (streamAddr stream addr) ++ " --> " ++ maybe "Nothing" (const "got") x
104 case x of 113 case x of
105 Just u -> do 114 Just u -> do
106 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr) 115 m <- timeout (1000000) $ putMVar mvar $ Just $ Right (u, addr)
107 when (isNothing m) $ do 116 when (isNothing m) $ do
108 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ "dropped packet." 117 dput XTCP $ "TCP "++show (streamAddr stream addr) ++ " dropped packet."
118 tryTakeMVar mvar
119 return ()
109 loop 120 loop
110 Nothing -> do 121 Nothing -> do
111 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr) 122 dput XTCP $ "TCP disconnected: " ++ show (streamAddr stream addr)
112 atomically $ modifyTVar' (lru tcpcache) 123 do atomically $ modifyTVar' (lru tcpcache)
113 $ MM.delete (TCPAddress $ streamAddr stream addr) 124 $ MM.delete (TCPAddress $ streamAddr stream addr)
114 c <- atomically $ readTVar (lru tcpcache) 125 c <- atomically $ readTVar (lru tcpcache)
115 now <- getPOSIXTime 126 now <- getPOSIXTime
116 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do 127 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
117 let stat = case r of 128 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
118 PendingTCPSession -> "pending."
119 TCPSession {} -> "established."
120 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), stat]
121 hClose h 129 hClose h
122 let showAddr a = show (streamAddr stream a) 130 let showAddr a = show (streamAddr stream a)
123 labelThread rthread ("tcp:"++showAddr addr) 131 labelThread rthread ("tcp:"++showAddr addr)
@@ -132,6 +140,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
132 let (rs,c') = MM.takeView (tcpMax tcpcache) 140 let (rs,c') = MM.takeView (tcpMax tcpcache)
133 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c 141 $ MM.insert' (TCPAddress $ streamAddr stream addr) v (Down t) c
134 writeTVar (lru tcpcache) c' 142 writeTVar (lru tcpcache) c'
143 writeTVar signal True
135 return rs 144 return rs
136 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 145 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do
137 myThreadId >>= flip labelThread ("tcp-close:"++show k) 146 myThreadId >>= flip labelThread ("tcp-close:"++show k)
@@ -146,8 +155,9 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
146 when (isNothing ret) $ do 155 when (isNothing ret) $ do
147 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr) 156 atomically $ modifyTVar' (lru tcpcache) $ MM.delete (TCPAddress $ streamAddr stream addr)
148 return ret 157 return ret
149 Just (tm, PendingTCPSession) -> do 158 Just (tm, PendingTCPSession)
150 fmap join $ timeout 10000000 $ atomically $ do 159 | not bDoCon -> return Nothing
160 | otherwise -> fmap join $ timeout 10000000 $ atomically $ do
151 c <- readTVar (lru tcpcache) 161 c <- readTVar (lru tcpcache)
152 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c 162 let v = MM.lookup' (TCPAddress $ streamAddr stream addr) c
153 case v of 163 case v of
@@ -166,17 +176,17 @@ closeAll tcpcache stream = do
166 176
167tcpTransport :: Int -- ^ maximum number of TCP links to maintain. 177tcpTransport :: Int -- ^ maximum number of TCP links to maintain.
168 -> StreamHandshake addr x y 178 -> StreamHandshake addr x y
169 -> IO (TransportA err addr x (Bool,y)) 179 -> IO (TCPCache (SessionProtocol x y), TransportA err addr x (Bool,y))
170tcpTransport maxcon stream = do 180tcpTransport maxcon stream = do
171 msgvar <- newEmptyMVar 181 msgvar <- newEmptyMVar
172 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty) 182 tcpcache <- atomically $ (`TCPCache` maxcon) <$> newTVar (MM.empty)
173 return Transport 183 return $ (,) tcpcache Transport
174 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) 184 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
175 , sendMessage = \addr (bDoCon,y) -> do 185 , sendMessage = \addr (bDoCon,y) -> do
176 t <- forkIO $ do 186 t <- forkIO $ do
177 msock <- acquireConnection msgvar tcpcache stream addr bDoCon 187 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
178 mapM_ ($ y) msock 188 mapM_ ($ y) msock
179 `catchIOError` \e -> return () 189 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
180 labelThread t "tcp-send" 190 labelThread t "tcp-send"
181 , closeTransport = closeAll tcpcache stream 191 , closeTransport = closeAll tcpcache stream
182 } 192 }
diff --git a/src/Network/Tox/TCP.hs b/src/Network/Tox/TCP.hs
index 36200586..adb42514 100644
--- a/src/Network/Tox/TCP.hs
+++ b/src/Network/Tox/TCP.hs
@@ -132,18 +132,22 @@ tcpStream crypto = StreamHandshake
132 dput XTCP $ "TCP exception: " ++ show e 132 dput XTCP $ "TCP exception: " ++ show e
133 return Nothing 133 return Nothing
134 , streamEncode = \y -> do 134 , streamEncode = \y -> do
135 dput XTCP $ "TCP(acquire nonce):" ++ show addr ++ " <-- " ++ show y
135 n24 <- takeMVar nsend 136 n24 <- takeMVar nsend
136 dput XTCP $ "TCP:" ++ show addr ++ " <-- " ++ show y 137 dput XTCP $ "TCP(got nonce):" ++ show addr ++ " <-- " ++ show y
137 let bs = encode $ encrypt (noncef' n24) $ encodePlain y 138 let bs = encode $ encrypt (noncef' n24) $ encodePlain y
138 ($ h) -- bracket (takeMVar hvar) (putMVar hvar) 139 ($ h) -- bracket (takeMVar hvar) (putMVar hvar)
139 $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs) 140 $ \h -> hPut h (encode (fromIntegral $ Data.ByteString.length bs :: Word16) <> bs)
140 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e 141 `catchIOError` \e -> dput XTCP $ "TCP write exception: " ++ show e
142 dput XTCP $ "TCP(incrementing nonce): " ++ show addr ++ " <-- " ++ show y
141 putMVar nsend (incrementNonce24 n24) 143 putMVar nsend (incrementNonce24 n24)
144 dput XTCP $ "TCP(finished): " ++ show addr ++ " <-- " ++ show y
142 } 145 }
143 , streamAddr = nodeAddr 146 , streamAddr = nodeAddr
144 } 147 }
145 148
146toxTCP :: TransportCrypto -> IO (TransportA err NodeInfo RelayPacket (Bool,RelayPacket)) 149toxTCP :: TransportCrypto -> IO ( TCPCache (SessionProtocol RelayPacket RelayPacket)
150 , TransportA err NodeInfo RelayPacket (Bool,RelayPacket) )
147toxTCP crypto = tcpTransport 30 (tcpStream crypto) 151toxTCP crypto = tcpTransport 30 (tcpStream crypto)
148 152
149tcpSpace :: KademliaSpace NodeId NodeInfo 153tcpSpace :: KademliaSpace NodeId NodeInfo
@@ -267,13 +271,14 @@ type RelayClient = Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)
267newClient :: TransportCrypto 271newClient :: TransportCrypto
268 -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query 272 -> (MVar (Bool,RelayPacket) -> a) -- ^ store mvar for query
269 -> (a -> RelayPacket -> IO void) -- ^ load mvar for query 273 -> (a -> RelayPacket -> IO void) -- ^ load mvar for query
270 -> IO ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a) 274 -> IO ( ( TVar (ChaChaDRG, Data.Word64Map.Word64Map a)
275 , TCPCache (SessionProtocol RelayPacket RelayPacket) )
271 , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket)) 276 , Client String PacketNumber Nonce8 NodeInfo (Bool,RelayPacket))
272newClient crypto store load = do 277newClient crypto store load = do
273 net <- toxTCP crypto 278 (tcpcache,net) <- toxTCP crypto
274 drg <- drgNew 279 drg <- drgNew
275 map_var <- atomically $ newTVar (drg, Data.Word64Map.empty) 280 map_var <- atomically $ newTVar (drg, Data.Word64Map.empty)
276 return $ (,) map_var Client 281 return $ (,) (map_var,tcpcache) Client
277 { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net 282 { clientNet = {- XXX: Client type forces this pointless layering. -} layerTransport ((Right .) . (,) . (,) False) (,) net
278 , clientDispatcher = DispatchMethods 283 , clientDispatcher = DispatchMethods
279 { classifyInbound = (. snd) $ \case 284 { classifyInbound = (. snd) $ \case
@@ -284,10 +289,11 @@ newClient crypto store load = do
284 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs 289 OOBRecv k bs -> IsUnsolicited $ handleOOB k bs
285 wut -> IsUnknown (show wut) 290 wut -> IsUnknown (show wut)
286 , lookupHandler = \case 291 , lookupHandler = \case
287 PingPacket -> Just MethodHandler 292 PingPacket -> trace ("tcp-received-ping") $ Just MethodHandler
288 { methodParse = \(_,RelayPing n8) -> Right () 293 { methodParse = \case (_,RelayPing n8) -> Right ()
289 , methodSerialize = \n8 src dst () -> (False, RelayPong n8) 294 _ -> trace ("tcp-non-ping") $ Left "TCP: Non-ping?"
290 , methodAction = \src () -> return () 295 , methodSerialize = \n8 src dst () -> trace ("tcp-made-pong-"++show n8) (False, RelayPong n8)
296 , methodAction = \src () -> dput XTCP $ "TCP pinged by "++show src
291 } 297 }
292 w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply 298 w -> trace ("tcp-lookupHandler: "++show w) $ Just NoReply
293 { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a 299 { methodParse = \x -> Left "tcp-lookuphandler?" -- :: x -> Either err a