{-# LANGUAGE CPP #-} module Control.Concurrent.Tasks where import Control.Concurrent.STM import Control.Exception import Data.Function import Data.List #ifdef THREAD_DEBUG import Control.Concurrent.Lifted.Instrument #else import Control.Concurrent.Lifted import GHC.Conc (labelThread) #endif newtype TaskGroup = TaskGroup { taskQueue :: TChan (String,IO ()) } withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () withTaskGroup glabel numslots action = do tg <- atomically $ newTChan cnt <- atomically $ newTVar 0 thread <- fork $ do myThreadId >>= flip labelThread glabel fix $ \again -> do (slot, (lbl,task)) <- atomically $ do slot <- readTVar cnt check (slot < numslots) writeTVar cnt (succ slot) t <- readTChan tg return (slot,t) _ <- fork $ do myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) task `catch` (\(SomeException e) -> return ()) atomically $ modifyTVar' cnt pred again action (TaskGroup tg) `onException` killThread thread atomically $ do isEmptyTChan tg >>= check readTVar cnt >>= check . (== 0) killThread thread forkTask :: TaskGroup -> String -> IO () -> IO () forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action)