diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-10-18 10:35:34 +0000 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:53:46 -0500 |
commit | ad35d7aa97d2fad2615f5d0dd4aee4e984d403f6 (patch) | |
tree | f8ad06edb3499ff1f801846db99b4b7419f5e55a /dht/src/Network/QueryResponse | |
parent | c479c2dd58c12d159c05040a08da6c4c7730c407 (diff) |
more forkLabeled, and now forkOSLabeled
Diffstat (limited to 'dht/src/Network/QueryResponse')
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs index de6e1eb7..e4831fb2 100644 --- a/dht/src/Network/QueryResponse/TCP.hs +++ b/dht/src/Network/QueryResponse/TCP.hs | |||
@@ -19,6 +19,7 @@ import Data.Maybe | |||
19 | import Data.Ord | 19 | import Data.Ord |
20 | import Data.Time.Clock.POSIX | 20 | import Data.Time.Clock.POSIX |
21 | import Data.Word | 21 | import Data.Word |
22 | import Data.String (IsString(..)) | ||
22 | import Network.BSD | 23 | import Network.BSD |
23 | import Network.Socket | 24 | import Network.Socket |
24 | import System.Timeout | 25 | import System.Timeout |
@@ -68,6 +69,7 @@ killSession :: TCPSession st -> IO () | |||
68 | killSession PendingTCPSession = return () | 69 | killSession PendingTCPSession = return () |
69 | killSession TCPSession{tcpThread=t} = killThread t | 70 | killSession TCPSession{tcpThread=t} = killThread t |
70 | 71 | ||
72 | showStat :: IsString p => TCPSession st -> p | ||
71 | showStat r = case r of PendingTCPSession -> "pending." | 73 | showStat r = case r of PendingTCPSession -> "pending." |
72 | TCPSession {} -> "established." | 74 | TCPSession {} -> "established." |
73 | 75 | ||
@@ -105,7 +107,8 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
105 | st <- streamHello stream addr h | 107 | st <- streamHello stream addr h |
106 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | 108 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
107 | signal <- newTVarIO False | 109 | signal <- newTVarIO False |
108 | rthread <- forkIO $ do | 110 | let showAddr a = show (streamAddr stream a) |
111 | rthread <- forkLabeled ("tcp:"++showAddr addr) $ do | ||
109 | atomically (readTVar signal >>= check) | 112 | atomically (readTVar signal >>= check) |
110 | fix $ \loop -> do | 113 | fix $ \loop -> do |
111 | x <- streamDecode st | 114 | x <- streamDecode st |
@@ -127,8 +130,6 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
127 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 130 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
128 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | 131 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] |
129 | hClose h | 132 | hClose h |
130 | let showAddr a = show (streamAddr stream a) | ||
131 | labelThread rthread ("tcp:"++showAddr addr) | ||
132 | let v = TCPSession | 133 | let v = TCPSession |
133 | { tcpHandle = h | 134 | { tcpHandle = h |
134 | , tcpState = st | 135 | , tcpState = st |
@@ -142,8 +143,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
142 | writeTVar (lru tcpcache) c' | 143 | writeTVar (lru tcpcache) c' |
143 | writeTVar signal True | 144 | writeTVar signal True |
144 | return rs | 145 | return rs |
145 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 146 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do |
146 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | ||
147 | dput XTCP $ "TCP dropped: " ++ show k | 147 | dput XTCP $ "TCP dropped: " ++ show k |
148 | killSession r | 148 | killSession r |
149 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | 149 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do |
@@ -184,10 +184,9 @@ tcpTransport maxcon stream = do | |||
184 | return $ (,) tcpcache Transport | 184 | return $ (,) tcpcache Transport |
185 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | 185 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
186 | , sendMessage = \addr (bDoCon,y) -> do | 186 | , sendMessage = \addr (bDoCon,y) -> do |
187 | t <- forkIO $ do | 187 | void . forkLabeled "tcp-send" $ do |
188 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | 188 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
189 | mapM_ ($ y) msock | 189 | mapM_ ($ y) msock |
190 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | 190 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e |
191 | labelThread t "tcp-send" | ||
192 | , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing | 191 | , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing |
193 | } | 192 | } |