summaryrefslogtreecommitdiff
path: root/src/Control/Concurrent
diff options
context:
space:
mode:
Diffstat (limited to 'src/Control/Concurrent')
-rw-r--r--src/Control/Concurrent/Tasks.hs44
1 files changed, 44 insertions, 0 deletions
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs
new file mode 100644
index 00000000..287542ee
--- /dev/null
+++ b/src/Control/Concurrent/Tasks.hs
@@ -0,0 +1,44 @@
1{-# LANGUAGE CPP #-}
2module Control.Concurrent.Tasks where
3
4import Control.Concurrent.STM
5import Control.Exception
6import Data.Function
7import Data.List
8#ifdef THREAD_DEBUG
9import Control.Concurrent.Lifted.Instrument
10#else
11import Control.Concurrent.Lifted
12import GHC.Conc (labelThread)
13#endif
14
15newtype TaskGroup = TaskGroup
16 { taskQueue :: TChan (String,IO ())
17 }
18
19withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO ()
20withTaskGroup glabel numslots action = do
21 tg <- atomically $ newTChan
22 cnt <- atomically $ newTVar 0
23 thread <- fork $ do
24 myThreadId >>= flip labelThread glabel
25 fix $ \again -> do
26 (slot, (lbl,task)) <- atomically $ do
27 slot <- readTVar cnt
28 check (slot < numslots)
29 writeTVar cnt (succ slot)
30 t <- readTChan tg
31 return (slot,t)
32 _ <- fork $ do
33 myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl])
34 task `catch` (\(SomeException e) -> return ())
35 atomically $ modifyTVar' cnt pred
36 again
37 action (TaskGroup tg) `onException` killThread thread
38 atomically $ do
39 isEmptyTChan tg >>= check
40 readTVar cnt >>= check . (== 0)
41 killThread thread
42
43forkTask :: TaskGroup -> String -> IO () -> IO ()
44forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action)