summaryrefslogtreecommitdiff
path: root/src/Control/Concurrent/Tasks.hs
blob: da2e589eaa057d492f8b1fdabf960c6d061e42b8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{-# LANGUAGE CPP #-}
module Control.Concurrent.Tasks where

import Control.Concurrent.STM
import Control.Exception
import Data.Function
import Data.List
#ifdef THREAD_DEBUG
import Control.Concurrent.Lifted.Instrument
#else
import Control.Concurrent.Lifted
import GHC.Conc                  (labelThread)
#endif

newtype TaskGroup = TaskGroup
    { taskQueue :: TChan (String,IO ())
    }

withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO ()
withTaskGroup glabel numslots action = do
    tg <- atomically $ newTChan
    cnt <- atomically $ newTVar 0
    thread <- forkIO $ do
        myThreadId >>= flip labelThread glabel
        fix $ \again -> do
            (slot, (lbl,task)) <- atomically $ do
                slot <- readTVar cnt
                check (slot < numslots)
                writeTVar cnt (succ slot)
                t <- readTChan tg
                return (slot,t)
            _ <- fork $ do
                myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl])
                task `catch` (\(SomeException e) -> return ())
                atomically $ modifyTVar' cnt pred
            again
    action (TaskGroup tg) `onException` killThread thread
    atomically $ do
        isEmptyTChan tg >>= check
        readTVar cnt >>= check . (== 0)
    killThread thread

forkTask :: TaskGroup -> String -> IO () -> IO ()
forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action)