From 2fb1f3507075c4cce4f33096ce0080bb14fd2704 Mon Sep 17 00:00:00 2001 From: joe Date: Fri, 15 Sep 2017 03:33:47 -0400 Subject: Moved Tasks to hierarchical location. --- src/Control/Concurrent/Tasks.hs | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 src/Control/Concurrent/Tasks.hs (limited to 'src/Control/Concurrent/Tasks.hs') diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs new file mode 100644 index 00000000..287542ee --- /dev/null +++ b/src/Control/Concurrent/Tasks.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE CPP #-} +module Control.Concurrent.Tasks where + +import Control.Concurrent.STM +import Control.Exception +import Data.Function +import Data.List +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif + +newtype TaskGroup = TaskGroup + { taskQueue :: TChan (String,IO ()) + } + +withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () +withTaskGroup glabel numslots action = do + tg <- atomically $ newTChan + cnt <- atomically $ newTVar 0 + thread <- fork $ do + myThreadId >>= flip labelThread glabel + fix $ \again -> do + (slot, (lbl,task)) <- atomically $ do + slot <- readTVar cnt + check (slot < numslots) + writeTVar cnt (succ slot) + t <- readTChan tg + return (slot,t) + _ <- fork $ do + myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) + task `catch` (\(SomeException e) -> return ()) + atomically $ modifyTVar' cnt pred + again + action (TaskGroup tg) `onException` killThread thread + atomically $ do + isEmptyTChan tg >>= check + readTVar cnt >>= check . (== 0) + killThread thread + +forkTask :: TaskGroup -> String -> IO () -> IO () +forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) -- cgit v1.2.3