summaryrefslogtreecommitdiff
path: root/dht
diff options
context:
space:
mode:
authorJames Crayne <jim.crayne@gmail.com>2019-10-18 10:35:34 +0000
committerJoe Crayne <joe@jerkface.net>2020-01-01 19:53:46 -0500
commitad35d7aa97d2fad2615f5d0dd4aee4e984d403f6 (patch)
treef8ad06edb3499ff1f801846db99b4b7419f5e55a /dht
parentc479c2dd58c12d159c05040a08da6c4c7730c407 (diff)
more forkLabeled, and now forkOSLabeled
Diffstat (limited to 'dht')
-rw-r--r--dht/Presence/DNSCache.hs16
-rw-r--r--dht/src/Control/Concurrent/ThreadUtil.hs7
-rw-r--r--dht/src/Network/QueryResponse/TCP.hs13
3 files changed, 18 insertions, 18 deletions
diff --git a/dht/Presence/DNSCache.hs b/dht/Presence/DNSCache.hs
index e28655c5..14581fee 100644
--- a/dht/Presence/DNSCache.hs
+++ b/dht/Presence/DNSCache.hs
@@ -22,12 +22,7 @@ module DNSCache
22 , withPort 22 , withPort
23 ) where 23 ) where
24 24
25#ifdef THREAD_DEBUG 25import Control.Concurrent.ThreadUtil
26import Control.Concurrent.Lifted.Instrument
27#else
28import Control.Concurrent.Lifted
29import GHC.Conc (labelThread)
30#endif
31import Control.Arrow 26import Control.Arrow
32import Control.Concurrent.STM 27import Control.Concurrent.STM
33import Data.Text ( Text ) 28import Data.Text ( Text )
@@ -108,10 +103,10 @@ make6mapped4 :: SockAddr -> SockAddr
108make6mapped4 addr@(SockAddrInet6 {}) = addr 103make6mapped4 addr@(SockAddrInet6 {}) = addr
109make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0 104make6mapped4 addr@(SockAddrInet port a) = SockAddrInet6 port 0 (0,0,0xFFFF,fromBE32 a) 0
110 105
111tryForkOS :: IO () -> IO ThreadId 106tryForkOS :: String -> IO () -> IO ThreadId
112tryForkOS action = catchIOError (forkOS action) $ \e -> do 107tryForkOS lbl action = catchIOError (forkOSLabeled lbl action) $ \e -> do
113 dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out." 108 dput XMisc $ "DNSCache: Link with -threaded to avoid excessively long time-out."
114 forkIO action 109 forkLabeled lbl action
115 110
116 111
117-- Attempt to resolve the given domain name. Returns an empty list if the 112-- Attempt to resolve the given domain name. Returns an empty list if the
@@ -126,8 +121,7 @@ rawForwardResolve ::
126rawForwardResolve dns onFail timeout addrtext = do 121rawForwardResolve dns onFail timeout addrtext = do
127 r <- atomically newEmptyTMVar 122 r <- atomically newEmptyTMVar
128 mvar <- interruptibleDelay 123 mvar <- interruptibleDelay
129 rt <- tryForkOS $ do 124 rt <- tryForkOS ("resolve."++show addrtext) $ do
130 myThreadId >>= flip labelThread ("resolve."++show addrtext)
131 resolver r mvar 125 resolver r mvar
132 startDelay mvar timeout 126 startDelay mvar timeout
133 did <- atomically $ tryPutTMVar r [] 127 did <- atomically $ tryPutTMVar r []
diff --git a/dht/src/Control/Concurrent/ThreadUtil.hs b/dht/src/Control/Concurrent/ThreadUtil.hs
index 2888e899..a258d933 100644
--- a/dht/src/Control/Concurrent/ThreadUtil.hs
+++ b/dht/src/Control/Concurrent/ThreadUtil.hs
@@ -21,4 +21,11 @@ forkLabeled lbl action = do
21 labelThread t lbl 21 labelThread t lbl
22 return t 22 return t
23{-# INLINE forkLabeled #-} 23{-# INLINE forkLabeled #-}
24
25forkOSLabeled :: String -> IO () -> IO ThreadId
26forkOSLabeled lbl action = do
27 t <- forkOS action
28 labelThread t lbl
29 return t
30{-# INLINE forkOSLabeled #-}
24#endif 31#endif
diff --git a/dht/src/Network/QueryResponse/TCP.hs b/dht/src/Network/QueryResponse/TCP.hs
index de6e1eb7..e4831fb2 100644
--- a/dht/src/Network/QueryResponse/TCP.hs
+++ b/dht/src/Network/QueryResponse/TCP.hs
@@ -19,6 +19,7 @@ import Data.Maybe
19import Data.Ord 19import Data.Ord
20import Data.Time.Clock.POSIX 20import Data.Time.Clock.POSIX
21import Data.Word 21import Data.Word
22import Data.String (IsString(..))
22import Network.BSD 23import Network.BSD
23import Network.Socket 24import Network.Socket
24import System.Timeout 25import System.Timeout
@@ -68,6 +69,7 @@ killSession :: TCPSession st -> IO ()
68killSession PendingTCPSession = return () 69killSession PendingTCPSession = return ()
69killSession TCPSession{tcpThread=t} = killThread t 70killSession TCPSession{tcpThread=t} = killThread t
70 71
72showStat :: IsString p => TCPSession st -> p
71showStat r = case r of PendingTCPSession -> "pending." 73showStat r = case r of PendingTCPSession -> "pending."
72 TCPSession {} -> "established." 74 TCPSession {} -> "established."
73 75
@@ -105,7 +107,8 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
105 st <- streamHello stream addr h 107 st <- streamHello stream addr h
106 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr) 108 dput XTCP $ "TCP Connected! " ++ show (streamAddr stream addr)
107 signal <- newTVarIO False 109 signal <- newTVarIO False
108 rthread <- forkIO $ do 110 let showAddr a = show (streamAddr stream a)
111 rthread <- forkLabeled ("tcp:"++showAddr addr) $ do
109 atomically (readTVar signal >>= check) 112 atomically (readTVar signal >>= check)
110 fix $ \loop -> do 113 fix $ \loop -> do
111 x <- streamDecode st 114 x <- streamDecode st
@@ -127,8 +130,6 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
127 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do 130 forM_ (zip [1..] $ MM.toList c) $ \(i,MM.Binding (TCPAddress addr) r (Down tm)) -> do
128 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r] 131 dput XTCP $ unwords [show i ++ ".", "Still connected:", show addr, show (now - tm), showStat r]
129 hClose h 132 hClose h
130 let showAddr a = show (streamAddr stream a)
131 labelThread rthread ("tcp:"++showAddr addr)
132 let v = TCPSession 133 let v = TCPSession
133 { tcpHandle = h 134 { tcpHandle = h
134 , tcpState = st 135 , tcpState = st
@@ -142,8 +143,7 @@ acquireConnection mvar tcpcache stream addr bDoCon = do
142 writeTVar (lru tcpcache) c' 143 writeTVar (lru tcpcache) c'
143 writeTVar signal True 144 writeTVar signal True
144 return rs 145 return rs
145 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkIO $ do 146 forM_ retires $ \(MM.Binding (TCPAddress k) r _) -> void $ forkLabeled ("tcp-close:"++show k) $ do
146 myThreadId >>= flip labelThread ("tcp-close:"++show k)
147 dput XTCP $ "TCP dropped: " ++ show k 147 dput XTCP $ "TCP dropped: " ++ show k
148 killSession r 148 killSession r
149 case r of TCPSession {tcpState=st,tcpHandle=h} -> do 149 case r of TCPSession {tcpState=st,tcpHandle=h} -> do
@@ -184,10 +184,9 @@ tcpTransport maxcon stream = do
184 return $ (,) tcpcache Transport 184 return $ (,) tcpcache Transport
185 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing) 185 { awaitMessage = \f -> takeMVar msgvar >>= \x -> f x `catchIOError` (\e -> dput XTCP ("TCP transport stopped. " ++ show e) >> f Nothing)
186 , sendMessage = \addr (bDoCon,y) -> do 186 , sendMessage = \addr (bDoCon,y) -> do
187 t <- forkIO $ do 187 void . forkLabeled "tcp-send" $ do
188 msock <- acquireConnection msgvar tcpcache stream addr bDoCon 188 msock <- acquireConnection msgvar tcpcache stream addr bDoCon
189 mapM_ ($ y) msock 189 mapM_ ($ y) msock
190 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e 190 `catchIOError` \e -> dput XTCP $ "TCP-send: " ++ show e
191 labelThread t "tcp-send"
192 , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing 191 , closeTransport = closeAll tcpcache stream >> putMVar msgvar Nothing
193 } 192 }