diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-10-18 10:13:55 +0000 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:53:31 -0500 |
commit | c479c2dd58c12d159c05040a08da6c4c7730c407 (patch) | |
tree | 8f675cba6fc0fcb318078863589a083d2146caf4 /dht | |
parent | c25b96d0665f9bd6c28245c811cbc7c57d0b9694 (diff) |
convert forkIO to forkLabeled (wip)
Diffstat (limited to 'dht')
-rw-r--r-- | dht/Connection/Tcp.hs | 28 | ||||
-rw-r--r-- | dht/Presence/XMPPServer.hs | 18 | ||||
-rw-r--r-- | dht/examples/dhtd.hs | 10 |
3 files changed, 18 insertions, 38 deletions
diff --git a/dht/Connection/Tcp.hs b/dht/Connection/Tcp.hs index fd5d333b..2572eba6 100644 --- a/dht/Connection/Tcp.hs +++ b/dht/Connection/Tcp.hs | |||
@@ -35,12 +35,7 @@ import qualified Data.Map as Map | |||
35 | import Data.Map (Map) | 35 | import Data.Map (Map) |
36 | #endif | 36 | #endif |
37 | import Data.Monoid ( (<>) ) | 37 | import Data.Monoid ( (<>) ) |
38 | #ifdef THREAD_DEBUG | 38 | import Control.Concurrent.ThreadUtil |
39 | import Control.Concurrent.Lifted.Instrument | ||
40 | #else | ||
41 | import Control.Concurrent.Lifted | ||
42 | import GHC.Conc (labelThread) | ||
43 | #endif | ||
44 | 39 | ||
45 | import Control.Concurrent.STM | 40 | import Control.Concurrent.STM |
46 | -- import Control.Concurrent.STM.TMVar | 41 | -- import Control.Concurrent.STM.TMVar |
@@ -268,7 +263,7 @@ server allocate sessionConduits = do | |||
268 | , retrymap = retrymap | 263 | , retrymap = retrymap |
269 | } | 264 | } |
270 | liftIO $ do | 265 | liftIO $ do |
271 | tid <- forkIO $ fix $ \loop -> do | 266 | forkLabeled "server" $ fix $ \loop -> do |
272 | instr <- atomically $ takeTMVar cmds | 267 | instr <- atomically $ takeTMVar cmds |
273 | -- warn $ "instr = " <> bshow instr | 268 | -- warn $ "instr = " <> bshow instr |
274 | let again = do doit server instr | 269 | let again = do doit server instr |
@@ -276,7 +271,6 @@ server allocate sessionConduits = do | |||
276 | loop | 271 | loop |
277 | case instr of Quit -> closeAll server | 272 | case instr of Quit -> closeAll server |
278 | _ -> again | 273 | _ -> again |
279 | labelThread tid "server" | ||
280 | return server | 274 | return server |
281 | where | 275 | where |
282 | closeAll server = do | 276 | closeAll server = do |
@@ -337,8 +331,7 @@ server allocate sessionConduits = do | |||
337 | interruptDelay d | 331 | interruptDelay d |
338 | when (not b) forkit | 332 | when (not b) forkit |
339 | where | 333 | where |
340 | forkit = void . forkIO $ do | 334 | forkit = void . forkLabeled ( "Connect." ++ show addr ) $ do |
341 | myThreadId >>= flip labelThread ( "Connect." ++ show addr ) | ||
342 | proto <- getProtocolNumber "tcp" | 335 | proto <- getProtocolNumber "tcp" |
343 | sock <- socket (socketFamily addr) Stream proto | 336 | sock <- socket (socketFamily addr) Stream proto |
344 | handle (\e -> do -- let t = ioeGetErrorType e | 337 | handle (\e -> do -- let t = ioeGetErrorType e |
@@ -358,8 +351,7 @@ server allocate sessionConduits = do | |||
358 | 351 | ||
359 | doit server (ConnectWithEndlessRetry addr params interval) = do | 352 | doit server (ConnectWithEndlessRetry addr params interval) = do |
360 | proto <- getProtocolNumber "tcp" | 353 | proto <- getProtocolNumber "tcp" |
361 | void . forkIO $ do | 354 | void . forkLabeled ("ConnectWithEndlessRetry." ++ show addr) $ do |
362 | myThreadId >>= flip labelThread ("ConnectWithEndlessRetry." ++ show addr) | ||
363 | timer <- interruptibleDelay | 355 | timer <- interruptibleDelay |
364 | (retryVar,action) <- atomically $ do | 356 | (retryVar,action) <- atomically $ do |
365 | map <- readTVar (retrymap server) | 357 | map <- readTVar (retrymap server) |
@@ -470,8 +462,7 @@ newConnection server sessionConduits params conkey u h inout = do | |||
470 | kontvar <- atomically newEmptyTMVar | 462 | kontvar <- atomically newEmptyTMVar |
471 | -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? | 463 | -- XXX: Why does kontvar store STM (IO ()) instead of just IO () ? |
472 | let _ = kontvar :: TMVar (STM (IO ())) | 464 | let _ = kontvar :: TMVar (STM (IO ())) |
473 | forkIO $ do | 465 | forkLabeled ("connecting...") $ do |
474 | myThreadId >>= flip labelThread ("connecting...") | ||
475 | getkont <- atomically $ takeTMVar kontvar | 466 | getkont <- atomically $ takeTMVar kontvar |
476 | kont <- atomically getkont | 467 | kont <- atomically getkont |
477 | kont | 468 | kont |
@@ -605,8 +596,7 @@ connectionThreads h pinglogic = do | |||
605 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar | 596 | (donew,outs) <- atomically $ liftM2 (,) newEmptyTMVar newEmptyTMVar |
606 | 597 | ||
607 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan | 598 | (doner,incomming) <- atomically $ liftM2 (,) newEmptyTMVar newTChan |
608 | readerThread <- forkIO $ do | 599 | readerThread <- forkLabeled "readerThread" $ do |
609 | myThreadId >>= flip labelThread ("readerThread") | ||
610 | let finished e = do | 600 | let finished e = do |
611 | hClose h | 601 | hClose h |
612 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) | 602 | -- warn $ "finished read: " <> bshow (fmap ioeGetErrorType e) |
@@ -626,8 +616,7 @@ connectionThreads h pinglogic = do | |||
626 | isEof <- hIsEOF h | 616 | isEof <- hIsEOF h |
627 | if isEof then finished Nothing else loop | 617 | if isEof then finished Nothing else loop |
628 | 618 | ||
629 | writerThread <- forkIO . fix $ \loop -> do | 619 | writerThread <- forkLabeled "writerThread" . fix $ \loop -> do |
630 | myThreadId >>= flip labelThread ("writerThread") | ||
631 | let finished = do -- warn $ "finished write" | 620 | let finished = do -- warn $ "finished write" |
632 | -- hClose h -- quit reader | 621 | -- hClose h -- quit reader |
633 | throwTo readerThread (ErrorCall "EOF") | 622 | throwTo readerThread (ErrorCall "EOF") |
@@ -792,8 +781,7 @@ tcpManager grokKey sv = do | |||
792 | Just {} -> return $ return () -- Connection already in progress. | 781 | Just {} -> return $ return () -- Connection already in progress. |
793 | Nothing -> do | 782 | Nothing -> do |
794 | modifyTVar' rmap $ Map.insert k Nothing | 783 | modifyTVar' rmap $ Map.insert k Nothing |
795 | return $ void $ forkIO $ do | 784 | return $ void $ forkLabeled ("resolve."++show k) $ do |
796 | myThreadId >>= flip labelThread ("resolve."++show k) | ||
797 | mconkey <- listToMaybe <$> rslv k | 785 | mconkey <- listToMaybe <$> rslv k |
798 | case mconkey of | 786 | case mconkey of |
799 | Nothing -> atomically $ modifyTVar' rmap $ Map.delete k | 787 | Nothing -> atomically $ modifyTVar' rmap $ Map.delete k |
diff --git a/dht/Presence/XMPPServer.hs b/dht/Presence/XMPPServer.hs index de2dd5d3..272f6efe 100644 --- a/dht/Presence/XMPPServer.hs +++ b/dht/Presence/XMPPServer.hs | |||
@@ -954,7 +954,7 @@ forkConnection sv xmpp saddr cdta pingflag src snk stanzas pp_mvar = do | |||
954 | "." -> show saddr | 954 | "." -> show saddr |
955 | mytoxname -> show saddr {- TODO: remote tox peer name? -} ] | 955 | mytoxname -> show saddr {- TODO: remote tox peer name? -} ] |
956 | 956 | ||
957 | forkIO $ do myThreadId >>= flip labelThread (lbl "xmpp-post.") | 957 | forkLabeled (lbl "xmpp-post.") $ do |
958 | -- This thread handles messages after they are pulled out of | 958 | -- This thread handles messages after they are pulled out of |
959 | -- the slots-queue. Hence, xmpp-post, for post- slots-queue. | 959 | -- the slots-queue. Hence, xmpp-post, for post- slots-queue. |
960 | 960 | ||
@@ -991,12 +991,11 @@ forkConnection sv xmpp saddr cdta pingflag src snk stanzas pp_mvar = do | |||
991 | output <- atomically newTChan | 991 | output <- atomically newTChan |
992 | hacks <- atomically $ newTVar Map.empty | 992 | hacks <- atomically $ newTVar Map.empty |
993 | msgids <- atomically $ newTVar [] | 993 | msgids <- atomically $ newTVar [] |
994 | forkIO $ do | 994 | forkLabeled (lbl "xmpp-pre.") $ do |
995 | -- Here is the pre- slots-queue thread which handles messages as they | 995 | -- Here is the pre- slots-queue thread which handles messages as they |
996 | -- arrive and assigns slots to them if that is appropriate. | 996 | -- arrive and assigns slots to them if that is appropriate. |
997 | 997 | ||
998 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer | 998 | -- mapM_ (atomically . Slotted.push slots Nothing) greetPeer |
999 | myThreadId >>= flip labelThread (lbl "xmpp-pre.") | ||
1000 | 999 | ||
1001 | verbosity <- xmppVerbosity xmpp | 1000 | verbosity <- xmppVerbosity xmpp |
1002 | fix $ \loop -> do | 1001 | fix $ \loop -> do |
@@ -1073,8 +1072,7 @@ forkConnection sv xmpp saddr cdta pingflag src snk stanzas pp_mvar = do | |||
1073 | ] | 1072 | ] |
1074 | what | 1073 | what |
1075 | wlog $ "end xmpp-pre fork: " ++ show (lbl "") | 1074 | wlog $ "end xmpp-pre fork: " ++ show (lbl "") |
1076 | forkIO $ do | 1075 | forkLabeled (lbl "xmpp-reader.") $ do |
1077 | myThreadId >>= flip labelThread (lbl "xmpp-reader.") | ||
1078 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) | 1076 | -- src $$ awaitForever (lift . putStrLn . takeWhile (/=' ') . show) |
1079 | runConduit $ src .| xmppInbound cdta clientOrServer pingflag stanzas output rdone | 1077 | runConduit $ src .| xmppInbound cdta clientOrServer pingflag stanzas output rdone |
1080 | atomically $ putTMVar rdone () | 1078 | atomically $ putTMVar rdone () |
@@ -1319,9 +1317,9 @@ monitor sv params xmpp = do | |||
1319 | -} | 1317 | -} |
1320 | dup <- cloneStanza stanza | 1318 | dup <- cloneStanza stanza |
1321 | 1319 | ||
1322 | t <- forkIO $ do applyStanza sv joined_rooms quitVar xmpp stanza | 1320 | forkLabeled ("process." ++ stanzaTypeString stanza) $ do |
1323 | forwardStanza quitVar xmpp stanza | 1321 | applyStanza sv joined_rooms quitVar xmpp stanza |
1324 | labelThread t $ "process." ++ stanzaTypeString stanza | 1322 | forwardStanza quitVar xmpp stanza |
1325 | 1323 | ||
1326 | -- We need to clone in the case the stanza is passed on as for Message. | 1324 | -- We need to clone in the case the stanza is passed on as for Message. |
1327 | wantStanzas <- getVerbose XJabber | 1325 | wantStanzas <- getVerbose XJabber |
@@ -1795,8 +1793,8 @@ forkXmpp XMPPServer { _xmpp_sv = sv | |||
1795 | { pingInterval = 0 | 1793 | { pingInterval = 0 |
1796 | , timeout = 0 | 1794 | , timeout = 0 |
1797 | } | 1795 | } |
1798 | mt <- forkIO $ do myThreadId >>= flip labelThread ("XMPP.monitor") | 1796 | mt <- forkLabeled "XMPP.monitor" $ do |
1799 | monitor sv peer_params xmpp | 1797 | monitor sv peer_params xmpp |
1800 | dput XMisc $ "Starting peer listen" | 1798 | dput XMisc $ "Starting peer listen" |
1801 | control sv (Listen peer_bind peer_params) | 1799 | control sv (Listen peer_bind peer_params) |
1802 | dput XMisc $ "Starting client listen" | 1800 | dput XMisc $ "Starting client listen" |
diff --git a/dht/examples/dhtd.hs b/dht/examples/dhtd.hs index eb41c598..c7fd4f06 100644 --- a/dht/examples/dhtd.hs +++ b/dht/examples/dhtd.hs | |||
@@ -54,12 +54,7 @@ import System.Posix.Process | |||
54 | import Text.PrettyPrint.HughesPJClass | 54 | import Text.PrettyPrint.HughesPJClass |
55 | import Text.Printf | 55 | import Text.Printf |
56 | import Text.Read | 56 | import Text.Read |
57 | #ifdef THREAD_DEBUG | 57 | import Control.Concurrent.ThreadUtil |
58 | import Control.Concurrent.Lifted.Instrument | ||
59 | #else | ||
60 | import Control.Concurrent.Lifted | ||
61 | import GHC.Conc (labelThread) | ||
62 | #endif | ||
63 | import qualified Data.HashMap.Strict as HashMap | 58 | import qualified Data.HashMap.Strict as HashMap |
64 | import qualified Data.Text as T | 59 | import qualified Data.Text as T |
65 | import qualified Data.Text.Encoding as T | 60 | import qualified Data.Text.Encoding as T |
@@ -1826,8 +1821,7 @@ main = do | |||
1826 | bootstrap btSaved fallbackNodes | 1821 | bootstrap btSaved fallbackNodes |
1827 | return () | 1822 | return () |
1828 | 1823 | ||
1829 | forkIO $ do | 1824 | forkLabeled "XMPP.stanzas" $ do |
1830 | myThreadId >>= flip labelThread "XMPP.stanzas" | ||
1831 | let console = cwPresenceChan <$> (mstate >>= consoleWriter) | 1825 | let console = cwPresenceChan <$> (mstate >>= consoleWriter) |
1832 | fix $ \loop -> do | 1826 | fix $ \loop -> do |
1833 | what <- atomically | 1827 | what <- atomically |