diff options
Diffstat (limited to 'src/Control/Concurrent')
-rw-r--r-- | src/Control/Concurrent/Tasks.hs | 44 |
1 files changed, 44 insertions, 0 deletions
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs new file mode 100644 index 00000000..287542ee --- /dev/null +++ b/src/Control/Concurrent/Tasks.hs | |||
@@ -0,0 +1,44 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module Control.Concurrent.Tasks where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Data.Function | ||
7 | import Data.List | ||
8 | #ifdef THREAD_DEBUG | ||
9 | import Control.Concurrent.Lifted.Instrument | ||
10 | #else | ||
11 | import Control.Concurrent.Lifted | ||
12 | import GHC.Conc (labelThread) | ||
13 | #endif | ||
14 | |||
15 | newtype TaskGroup = TaskGroup | ||
16 | { taskQueue :: TChan (String,IO ()) | ||
17 | } | ||
18 | |||
19 | withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () | ||
20 | withTaskGroup glabel numslots action = do | ||
21 | tg <- atomically $ newTChan | ||
22 | cnt <- atomically $ newTVar 0 | ||
23 | thread <- fork $ do | ||
24 | myThreadId >>= flip labelThread glabel | ||
25 | fix $ \again -> do | ||
26 | (slot, (lbl,task)) <- atomically $ do | ||
27 | slot <- readTVar cnt | ||
28 | check (slot < numslots) | ||
29 | writeTVar cnt (succ slot) | ||
30 | t <- readTChan tg | ||
31 | return (slot,t) | ||
32 | _ <- fork $ do | ||
33 | myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) | ||
34 | task `catch` (\(SomeException e) -> return ()) | ||
35 | atomically $ modifyTVar' cnt pred | ||
36 | again | ||
37 | action (TaskGroup tg) `onException` killThread thread | ||
38 | atomically $ do | ||
39 | isEmptyTChan tg >>= check | ||
40 | readTVar cnt >>= check . (== 0) | ||
41 | killThread thread | ||
42 | |||
43 | forkTask :: TaskGroup -> String -> IO () -> IO () | ||
44 | forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) | ||