From 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 Mon Sep 17 00:00:00 2001 From: James Crayne Date: Sat, 28 Sep 2019 13:43:29 -0400 Subject: Factor out some new libraries word64-map: Data.Word64Map network-addr: Network.Address tox-crypto: Crypto.Tox lifted-concurrent: Control.Concurrent.Lifted.Instrument Control.Concurrent.Async.Lifted.Instrument psq-wrap: Data.Wrapper.PSQInt Data.Wrapper.PSQ minmax-psq: Data.MinMaxPSQ tasks: Control.Concurrent.Tasks kad: Network.Kademlia Network.Kademlia.Bootstrap Network.Kademlia.Routing Network.Kademlia.CommonAPI Network.Kademlia.Persistence Network.Kademlia.Search --- tasks/src/Control/Concurrent/Tasks.hs | 44 +++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 tasks/src/Control/Concurrent/Tasks.hs (limited to 'tasks/src/Control/Concurrent/Tasks.hs') diff --git a/tasks/src/Control/Concurrent/Tasks.hs b/tasks/src/Control/Concurrent/Tasks.hs new file mode 100644 index 00000000..da2e589e --- /dev/null +++ b/tasks/src/Control/Concurrent/Tasks.hs @@ -0,0 +1,44 @@ +{-# LANGUAGE CPP #-} +module Control.Concurrent.Tasks where + +import Control.Concurrent.STM +import Control.Exception +import Data.Function +import Data.List +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import Control.Concurrent.Lifted +import GHC.Conc (labelThread) +#endif + +newtype TaskGroup = TaskGroup + { taskQueue :: TChan (String,IO ()) + } + +withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () +withTaskGroup glabel numslots action = do + tg <- atomically $ newTChan + cnt <- atomically $ newTVar 0 + thread <- forkIO $ do + myThreadId >>= flip labelThread glabel + fix $ \again -> do + (slot, (lbl,task)) <- atomically $ do + slot <- readTVar cnt + check (slot < numslots) + writeTVar cnt (succ slot) + t <- readTChan tg + return (slot,t) + _ <- fork $ do + myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) + task `catch` (\(SomeException e) -> return ()) + atomically $ modifyTVar' cnt pred + again + action (TaskGroup tg) `onException` killThread thread + atomically $ do + isEmptyTChan tg >>= check + readTVar cnt >>= check . (== 0) + killThread thread + +forkTask :: TaskGroup -> String -> IO () -> IO () +forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) -- cgit v1.2.3