diff options
Diffstat (limited to 'src/Control')
-rw-r--r-- | src/Control/Concurrent/Async/Lifted/Instrument.hs | 5 | ||||
-rw-r--r-- | src/Control/Concurrent/Lifted/Instrument.hs | 98 | ||||
-rw-r--r-- | src/Control/Concurrent/Tasks.hs | 44 | ||||
-rw-r--r-- | src/Control/TriadCommittee.hs | 89 |
4 files changed, 0 insertions, 236 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs deleted file mode 100644 index eab0fadc..00000000 --- a/src/Control/Concurrent/Async/Lifted/Instrument.hs +++ /dev/null | |||
@@ -1,5 +0,0 @@ | |||
1 | module Control.Concurrent.Async.Lifted.Instrument | ||
2 | ( module Control.Concurrent.Async.Lifted | ||
3 | ) where | ||
4 | |||
5 | import Control.Concurrent.Async.Lifted | ||
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs deleted file mode 100644 index fc3b6369..00000000 --- a/src/Control/Concurrent/Lifted/Instrument.hs +++ /dev/null | |||
@@ -1,98 +0,0 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | module Control.Concurrent.Lifted.Instrument | ||
3 | ( module Control.Concurrent.Lifted | ||
4 | , forkIO | ||
5 | , forkOS | ||
6 | , fork | ||
7 | , labelThread | ||
8 | , threadsInformation | ||
9 | , PerThread(..) | ||
10 | ) where | ||
11 | |||
12 | import qualified Control.Concurrent.Lifted as Raw | ||
13 | import Control.Concurrent.Lifted hiding (fork,forkOS) | ||
14 | import Control.Exception (fromException) | ||
15 | import Control.Monad.Trans.Control | ||
16 | import System.IO.Unsafe | ||
17 | import qualified Data.Map.Strict as Map | ||
18 | import Control.Exception.Lifted | ||
19 | import Control.Monad.Base | ||
20 | import qualified GHC.Conc as GHC | ||
21 | import Data.Time() | ||
22 | import Data.Time.Clock | ||
23 | import DPut | ||
24 | import DebugTag | ||
25 | |||
26 | |||
27 | data PerThread = PerThread | ||
28 | { lbl :: String | ||
29 | , startTime :: UTCTime | ||
30 | } | ||
31 | deriving (Eq,Ord,Show) | ||
32 | |||
33 | data GlobalState = GlobalState | ||
34 | { threads :: !(Map.Map ThreadId PerThread) | ||
35 | , reportException :: String -> IO () | ||
36 | } | ||
37 | |||
38 | globals :: MVar GlobalState | ||
39 | globals = unsafePerformIO $ newMVar $ GlobalState | ||
40 | { threads = Map.empty | ||
41 | , reportException = dput XMisc | ||
42 | } | ||
43 | {-# NOINLINE globals #-} | ||
44 | |||
45 | |||
46 | forkIO :: IO () -> IO ThreadId | ||
47 | forkIO = instrumented GHC.forkIO | ||
48 | {-# INLINE forkIO #-} | ||
49 | |||
50 | forkOS :: MonadBaseControl IO m => m () -> m ThreadId | ||
51 | forkOS = instrumented Raw.forkOS | ||
52 | {-# INLINE forkOS #-} | ||
53 | |||
54 | fork :: MonadBaseControl IO m => m () -> m ThreadId | ||
55 | fork = instrumented Raw.fork | ||
56 | {-# INLINE fork #-} | ||
57 | |||
58 | instrumented :: MonadBaseControl IO m => | ||
59 | (m () -> m ThreadId) -> m () -> m ThreadId | ||
60 | instrumented rawFork action = do | ||
61 | t <- rawFork $ do | ||
62 | tid <- myThreadId | ||
63 | tm <- liftBase getCurrentTime | ||
64 | bracket_ (modifyThreads $! Map.insert tid (PerThread "" tm)) | ||
65 | (return ()) | ||
66 | $ do catch action $ \e -> case fromException e of | ||
67 | Just ThreadKilled -> return () | ||
68 | Nothing -> liftBase $ do | ||
69 | g <- takeMVar globals | ||
70 | let l = concat [ show e | ||
71 | , " (" | ||
72 | , maybe "" lbl $ Map.lookup tid (threads g) | ||
73 | , ")" | ||
74 | ] | ||
75 | reportException g l | ||
76 | putMVar globals $! g { threads = Map.insert tid (PerThread l tm) $ threads g } | ||
77 | throwIO e | ||
78 | -- Remove the thread only if it terminated normally or was killed. | ||
79 | modifyThreads $! Map.delete tid | ||
80 | return t | ||
81 | |||
82 | labelThread :: ThreadId -> String -> IO () | ||
83 | labelThread tid s = do | ||
84 | GHC.labelThread tid s | ||
85 | modifyThreads $! Map.adjust (\pt -> pt { lbl = s }) tid | ||
86 | {-# INLINE labelThread #-} | ||
87 | |||
88 | threadsInformation :: IO [(ThreadId,PerThread)] | ||
89 | threadsInformation = do | ||
90 | m <- threads <$> readMVar globals | ||
91 | return $ Map.toList m | ||
92 | |||
93 | |||
94 | modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m () | ||
95 | modifyThreads f = do | ||
96 | g <- takeMVar globals | ||
97 | let f' st = st { threads = f (threads st) } | ||
98 | putMVar globals $! f' g | ||
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs deleted file mode 100644 index da2e589e..00000000 --- a/src/Control/Concurrent/Tasks.hs +++ /dev/null | |||
@@ -1,44 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module Control.Concurrent.Tasks where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Data.Function | ||
7 | import Data.List | ||
8 | #ifdef THREAD_DEBUG | ||
9 | import Control.Concurrent.Lifted.Instrument | ||
10 | #else | ||
11 | import Control.Concurrent.Lifted | ||
12 | import GHC.Conc (labelThread) | ||
13 | #endif | ||
14 | |||
15 | newtype TaskGroup = TaskGroup | ||
16 | { taskQueue :: TChan (String,IO ()) | ||
17 | } | ||
18 | |||
19 | withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () | ||
20 | withTaskGroup glabel numslots action = do | ||
21 | tg <- atomically $ newTChan | ||
22 | cnt <- atomically $ newTVar 0 | ||
23 | thread <- forkIO $ do | ||
24 | myThreadId >>= flip labelThread glabel | ||
25 | fix $ \again -> do | ||
26 | (slot, (lbl,task)) <- atomically $ do | ||
27 | slot <- readTVar cnt | ||
28 | check (slot < numslots) | ||
29 | writeTVar cnt (succ slot) | ||
30 | t <- readTChan tg | ||
31 | return (slot,t) | ||
32 | _ <- fork $ do | ||
33 | myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) | ||
34 | task `catch` (\(SomeException e) -> return ()) | ||
35 | atomically $ modifyTVar' cnt pred | ||
36 | again | ||
37 | action (TaskGroup tg) `onException` killThread thread | ||
38 | atomically $ do | ||
39 | isEmptyTChan tg >>= check | ||
40 | readTVar cnt >>= check . (== 0) | ||
41 | killThread thread | ||
42 | |||
43 | forkTask :: TaskGroup -> String -> IO () -> IO () | ||
44 | forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) | ||
diff --git a/src/Control/TriadCommittee.hs b/src/Control/TriadCommittee.hs deleted file mode 100644 index 88e665b6..00000000 --- a/src/Control/TriadCommittee.hs +++ /dev/null | |||
@@ -1,89 +0,0 @@ | |||
1 | {-# LANGUAGE TupleSections #-} | ||
2 | module Control.TriadCommittee where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Monad | ||
6 | import Data.Maybe | ||
7 | |||
8 | |||
9 | data TriadSlot = SlotA | SlotB | SlotC | ||
10 | deriving (Eq,Ord,Enum,Show,Read) | ||
11 | |||
12 | data TriadCommittee voter a = TriadCommittee | ||
13 | { triadDecider :: TVar TriadSlot | ||
14 | , triadA :: TVar (Maybe (voter,a)) | ||
15 | , triadB :: TVar (Maybe (voter,a)) | ||
16 | , triadC :: TVar (Maybe (voter,a)) | ||
17 | , triadNewDecision :: a -> STM () | ||
18 | } | ||
19 | |||
20 | triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a)) | ||
21 | triadSlot SlotA = triadA | ||
22 | triadSlot SlotB = triadB | ||
23 | triadSlot SlotC = triadC | ||
24 | |||
25 | triadDecision :: a -> TriadCommittee voter a -> STM a | ||
26 | triadDecision fallback triad = do | ||
27 | slot <- readTVar (triadDecider triad) | ||
28 | maybe fallback snd <$> readTVar (triadSlot slot triad) | ||
29 | |||
30 | |||
31 | newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a) | ||
32 | newTriadCommittee onChange = | ||
33 | TriadCommittee <$> newTVar SlotA | ||
34 | <*> newTVar Nothing | ||
35 | <*> newTVar Nothing | ||
36 | <*> newTVar Nothing | ||
37 | <*> pure onChange | ||
38 | |||
39 | |||
40 | triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM () | ||
41 | triadCountVotes prior triad = do | ||
42 | a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad) | ||
43 | b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad) | ||
44 | c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad) | ||
45 | let (slot,vote) = case catMaybes [a,b,c] of | ||
46 | [ (x,xvote) | ||
47 | , (y,yvote) | ||
48 | , (z,zvote) ] -> if xvote == yvote then (x,Just xvote) | ||
49 | else (z,Just zvote) | ||
50 | [] -> (SlotA,Nothing) | ||
51 | ((slot,vote):_) -> (slot, Just vote) | ||
52 | writeTVar (triadDecider triad) slot | ||
53 | case vote of | ||
54 | Just v | vote /= prior -> triadNewDecision triad v | ||
55 | _ -> return () | ||
56 | |||
57 | |||
58 | addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM () | ||
59 | addVote triad voter vote = do | ||
60 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
61 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
62 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
63 | let avail (_,Nothing) = True | ||
64 | avail (_,Just x ) = (x == voter) | ||
65 | slots = filter avail [a,b,c] | ||
66 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
67 | prior <- do | ||
68 | slotp <- readTVar (triadDecider triad) | ||
69 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
70 | writeTVar (triadSlot slot triad) | ||
71 | (Just (voter,vote)) | ||
72 | triadCountVotes prior triad | ||
73 | |||
74 | |||
75 | delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM () | ||
76 | delVote triad voter = do | ||
77 | a <- (SlotA,) . fmap fst <$> readTVar (triadA triad) | ||
78 | b <- (SlotB,) . fmap fst <$> readTVar (triadB triad) | ||
79 | c <- (SlotC,) . fmap fst <$> readTVar (triadC triad) | ||
80 | let match (_,Just x ) = (x == voter) | ||
81 | match _ = False | ||
82 | slots = filter match [a,b,c] | ||
83 | forM_ (take 1 slots) $ \(slot,_) -> do | ||
84 | prior <- do | ||
85 | slotp <- readTVar (triadDecider triad) | ||
86 | fmap snd <$> readTVar (triadSlot slotp triad) | ||
87 | writeTVar (triadSlot slot triad) Nothing | ||
88 | triadCountVotes prior triad | ||
89 | |||