diff options
-rw-r--r-- | dht/Presence/DNSCache.hs | 16 | ||||
-rw-r--r-- | dht/src/Control/Concurrent/ThreadUtil.hs | 7 | ||||
-rw-r--r-- | dht/src/Network/QueryResponse/TCP.hs | 13 |
3 files changed, 18 insertions, 18 deletions
diff --git a/dht/Presence/DNSCache.hs b/dht/Presence/DNSCache.hs index e28655c5..14581fee 100644 --- a/dht/Presence/DNSCache.hs +++ b/dht/Presence/DNSCache.hs | |||
@@ -22,12 +22,7 @@ module DNSCache | |||
22 | , withPort | 22 | , withPort |
23 | ) where | 23 | ) where |
24 | 24 | ||
25 | #ifdef THREAD_DEBUG | 25 | import Control.Concurrent.ThreadUtil |
26 | import Control.Concurrent.Lifted.Instrument | ||
27 | #else | ||
28 | import Control.Concurrent.Lifted | ||
29 | import GHC.Conc (labelThread) | ||
30 | #endif | ||
31 | import Control.Arrow | 26 | import Control.Arrow |
32 | import Control.Concurrent.STM | 27 | import Control.Concurrent.STM |
33 | import Data.Text ( Text ) | 28 | import Data.Text ( Text ) |
@@ -108,10 +103,10 @@ make6mapped4 :: SockAddr -> SockAddr | |||
108 | make6mapped4 addr@(SockAddrInet6 {}) = addr | 103 | make6mapped4 addr@(SockAddrInet6 {}) = addr |
109 | make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0 | 104 | make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0 |
110 | 105 | ||
111 | tryForkOS :: IO () -> IO ThreadId | 106 | tryForkOS :: String -> IO () -> IO ThreadId |
112 | tryForkOS action = catchIOError (forkOS action) $ \e -> do | 107 | tryForkOS lbl action = catchIOError (forkOSLabeled lbl action) $ \e -> do |
113 | dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out." | 108 | dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out." |
114 | forkIO action | 109 | forkLabeled lbl action |
115 | 110 | ||
116 | 111 | ||
117 | -- Attempt to resolve the given domain name. Returns an empty list if the | 112 | -- Attempt to resolve the given domain name. Returns an empty list if the |
@@ -126,8 +121,7 @@ rawForwardResolve :: | |||
126 | rawForwardResolve dns onFail timeout addrtext = do | 121 | rawForwardResolve dns onFail timeout addrtext = do |
127 | r <- atomically newEmptyTMVar | 122 | r <- atomically newEmptyTMVar |
128 | mvar <- interruptibleDelay | 123 | mvar <- interruptibleDelay |
129 | rt <- tryForkOS $ do | 124 | rt <- tryForkOS ("resolve."++show addrtext) $ do |
130 | myThreadId >>= flip labelThread ("resolve."++show addrtext) | ||
131 | resolver r mvar | 125 | resolver r mvar |
132 | startDelay mvar timeout | 126 | startDelay mvar timeout |
133 | did <- atomically $ tryPutTMVar r [] | 127 | did <- atomically $ tryPutTMVar r [] |
diff --git a/dht/src/Control/Concurrent/ThreadUtil.hs b/dht/src/Control/Concurrent/ThreadUtil.hs index 2888e899..a258d933 100644 --- a/dht/src/Control/Concurrent/ThreadUtil.hs +++ b/dht/src/Control/Concurrent/ThreadUtil.hs | |||
@@ -21,4 +21,11 @@ forkLabeled lbl action = do | |||
21 | labelThread t lbl | 21 | labelThread t lbl |
22 | return t | 22 | return t |
23 | {-# INLINE forkLabeled #-} | 23 | {-# INLINE forkLabeled #-} |
24 | |||
25 | forkOSLabeled :: String -> IO () -> IO ThreadId | ||
26 | forkOSLabeled lbl action = do | ||
27 | t <- forkOS action | ||
28 | labelThread t lbl | ||
29 | return t | ||
30 | {-# INLINE forkOSLabeled #-} | ||
24 | #endif | 31 | #endif |
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs index de6e1eb7..e4831fb2 100644 --- a/dht/src/Network/QueryResponse/TCP.hs +++ b/dht/src/Network/QueryResponse/TCP.hs | |||
@@ -19,6 +19,7 @@ import Data.Maybe | |||
19 | import Data.Ord | 19 | import Data.Ord |
20 | import Data.Time.Clock.POSIX | 20 | import Data.Time.Clock.POSIX |
21 | import Data.Word | 21 | import Data.Word |
22 | import Data.String (IsString(..)) | ||
22 | import Network.BSD | 23 | import Network.BSD |
23 | import Network.Socket | 24 | import Network.Socket |
24 | import System.Timeout | 25 | import System.Timeout |
@@ -68,6 +69,7 @@ killSession :: TCPSession st -> IO () | |||
68 | killSession PendingTCPSession = return () | 69 | killSession PendingTCPSession = return () |
69 | killSession TCPSession{tcpThread=t} = killThread t | 70 | killSession TCPSession{tcpThread=t} = killThread t |
70 | 71 | ||
72 | showStat :: IsString p => TCPSession st -> p | ||
71 | showStat r = case r of PendingTCPSession -> "pending." | 73 | showStat r = case r of PendingTCPSession -> "pending." |
72 | TCPSession {} -> "established." | 74 | TCPSession {} -> "established." |
73 | 75 | ||
@@ -105,7 +107,8 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
105 | st <- streamHello stream addr h | 107 | st <- streamHello stream addr h |
106 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) | 108 | dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) |
107 | signal <- newTVarIO False | 109 | signal <- newTVarIO False |
108 | rthread <- forkIO $ do | 110 | let showAddr a = show (streamAddr stream a) |
111 | rthread <- forkLabeled ("tcp:"++showAddr addr) $ do | ||
109 | atomically (readTVar signal >>= check) | 112 | atomically (readTVar signal >>= check) |
110 | fix $ \loop -> do | 113 | fix $ \loop -> do |
111 | x <- streamDecode st | 114 | x <- streamDecode st |
@@ -127,8 +130,6 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
127 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do | 130 | forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do |
128 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] | 131 | dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] |
129 | hClose h | 132 | hClose h |
130 | let showAddr a = show (streamAddr stream a) | ||
131 | labelThread rthread ("tcp:"++showAddr addr) | ||
132 | let v = TCPSession | 133 | let v = TCPSession |
133 | { tcpHandle = h | 134 | { tcpHandle = h |
134 | , tcpState = st | 135 | , tcpState = st |
@@ -142,8 +143,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do | |||
142 | writeTVar (lru tcpcache) c' | 143 | writeTVar (lru tcpcache) c' |
143 | writeTVar signal True | 144 | writeTVar signal True |
144 | return rs | 145 | return rs |
145 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do | 146 | forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do |
146 | myThreadId >>= flip labelThread ("tcp-close:"++show k) | ||
147 | dput XTCP $ "TCP dropped: " ++ show k | 147 | dput XTCP $ "TCP dropped: " ++ show k |
148 | killSession r | 148 | killSession r |
149 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do | 149 | case r of TCPSession {tcpState=st,tcpHandle=h} -> do |
@@ -184,10 +184,9 @@ tcpTransport maxcon stream = do | |||
184 | return $ (,) tcpcache Transport | 184 | return $ (,) tcpcache Transport |
185 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) | 185 | { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) |
186 | , sendMessage = \addr (bDoCon,y) -> do | 186 | , sendMessage = \addr (bDoCon,y) -> do |
187 | t <- forkIO $ do | 187 | void . forkLabeled "tcp-send" $ do |
188 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon | 188 | msock <- acquireConnection msgvar tcpcache stream addr bDoCon |
189 | mapM_ ($ y) msock | 189 | mapM_ ($ y) msock |
190 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e | 190 | `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e |
191 | labelThread t "tcp-send" | ||
192 | , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing | 191 | , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing |
193 | } | 192 | } |