blob: 287542eed5d193a7756ea98271a1b8c9ecd7763f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
{-# LANGUAGE CPP #-}
module Control.Concurrent.Tasks where
import Control.Concurrent.STM
import Control.Exception
import Data.Function
import Data.List
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc (labelThread)
#endif
newtype TaskGroup = TaskGroup
{ taskQueue :: TChan (String,IO ())
}
withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO ()
withTaskGroup glabel numslots action = do
tg <- atomically $ newTChan
cnt <- atomically $ newTVar 0
thread <- fork $ do
myThreadId >>= flip labelThread glabel
fix $ \again -> do
(slot, (lbl,task)) <- atomically $ do
slot <- readTVar cnt
check (slot < numslots)
writeTVar cnt (succ slot)
t <- readTChan tg
return (slot,t)
_ <- fork $ do
myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl])
task `catch` (\(SomeException e) -> return ())
atomically $ modifyTVar' cnt pred
again
action (TaskGroup tg) `onException` killThread thread
atomically $ do
isEmptyTChan tg >>= check
readTVar cnt >>= check . (== 0)
killThread thread
forkTask :: TaskGroup -> String -> IO () -> IO ()
forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action)
|