summaryrefslogtreecommitdiff
path: root/src/Control
diff options
context:
space:
mode:
Diffstat (limited to 'src/Control')
-rw-r--r--src/Control/Concurrent/Async/Lifted/Instrument.hs5
-rw-r--r--src/Control/Concurrent/Lifted/Instrument.hs98
-rw-r--r--src/Control/Concurrent/Tasks.hs44
-rw-r--r--src/Control/TriadCommittee.hs89
4 files changed, 0 insertions, 236 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs
deleted file mode 100644
index eab0fadc..00000000
--- a/src/Control/Concurrent/Async/Lifted/Instrument.hs
+++ /dev/null
@@ -1,5 +0,0 @@
1module Control.Concurrent.Async.Lifted.Instrument
2 ( module Control.Concurrent.Async.Lifted
3 ) where
4
5import Control.Concurrent.Async.Lifted
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs
deleted file mode 100644
index fc3b6369..00000000
--- a/src/Control/Concurrent/Lifted/Instrument.hs
+++ /dev/null
@@ -1,98 +0,0 @@
1{-# LANGUAGE FlexibleContexts #-}
2module Control.Concurrent.Lifted.Instrument
3 ( module Control.Concurrent.Lifted
4 , forkIO
5 , forkOS
6 , fork
7 , labelThread
8 , threadsInformation
9 , PerThread(..)
10 ) where
11
12import qualified Control.Concurrent.Lifted as Raw
13import Control.Concurrent.Lifted hiding (fork,forkOS)
14import Control.Exception (fromException)
15import Control.Monad.Trans.Control
16import System.IO.Unsafe
17import qualified Data.Map.Strict as Map
18import Control.Exception.Lifted
19import Control.Monad.Base
20import qualified GHC.Conc as GHC
21import Data.Time()
22import Data.Time.Clock
23import DPut
24import DebugTag
25
26
27data PerThread = PerThread
28 { lbl :: String
29 , startTime :: UTCTime
30 }
31 deriving (Eq,Ord,Show)
32
33data GlobalState = GlobalState
34 { threads :: !(Map.Map ThreadId PerThread)
35 , reportException :: String -> IO ()
36 }
37
38globals :: MVar GlobalState
39globals = unsafePerformIO $ newMVar $ GlobalState
40 { threads = Map.empty
41 , reportException = dput XMisc
42 }
43{-# NOINLINE globals #-}
44
45
46forkIO :: IO () -> IO ThreadId
47forkIO = instrumented GHC.forkIO
48{-# INLINE forkIO #-}
49
50forkOS :: MonadBaseControl IO m => m () -> m ThreadId
51forkOS = instrumented Raw.forkOS
52{-# INLINE forkOS #-}
53
54fork :: MonadBaseControl IO m => m () -> m ThreadId
55fork = instrumented Raw.fork
56{-# INLINE fork #-}
57
58instrumented :: MonadBaseControl IO m =>
59 (m () -> m ThreadId) -> m () -> m ThreadId
60instrumented rawFork action = do
61 t <- rawFork $ do
62 tid <- myThreadId
63 tm <- liftBase getCurrentTime
64 bracket_ (modifyThreads $! Map.insert tid (PerThread "" tm))
65 (return ())
66 $ do catch action $ \e -> case fromException e of
67 Just ThreadKilled -> return ()
68 Nothing -> liftBase $ do
69 g <- takeMVar globals
70 let l = concat [ show e
71 , " ("
72 , maybe "" lbl $ Map.lookup tid (threads g)
73 , ")"
74 ]
75 reportException g l
76 putMVar globals $! g { threads = Map.insert tid (PerThread l tm) $ threads g }
77 throwIO e
78 -- Remove the thread only if it terminated normally or was killed.
79 modifyThreads $! Map.delete tid
80 return t
81
82labelThread :: ThreadId -> String -> IO ()
83labelThread tid s = do
84 GHC.labelThread tid s
85 modifyThreads $! Map.adjust (\pt -> pt { lbl = s }) tid
86{-# INLINE labelThread #-}
87
88threadsInformation :: IO [(ThreadId,PerThread)]
89threadsInformation = do
90 m <- threads <$> readMVar globals
91 return $ Map.toList m
92
93
94modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m ()
95modifyThreads f = do
96 g <- takeMVar globals
97 let f' st = st { threads = f (threads st) }
98 putMVar globals $! f' g
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs
deleted file mode 100644
index da2e589e..00000000
--- a/src/Control/Concurrent/Tasks.hs
+++ /dev/null
@@ -1,44 +0,0 @@
1{-# LANGUAGE CPP #-}
2module Control.Concurrent.Tasks where
3
4import Control.Concurrent.STM
5import Control.Exception
6import Data.Function
7import Data.List
8#ifdef THREAD_DEBUG
9import Control.Concurrent.Lifted.Instrument
10#else
11import Control.Concurrent.Lifted
12import GHC.Conc (labelThread)
13#endif
14
15newtype TaskGroup = TaskGroup
16 { taskQueue :: TChan (String,IO ())
17 }
18
19withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO ()
20withTaskGroup glabel numslots action = do
21 tg <- atomically $ newTChan
22 cnt <- atomically $ newTVar 0
23 thread <- forkIO $ do
24 myThreadId >>= flip labelThread glabel
25 fix $ \again -> do
26 (slot, (lbl,task)) <- atomically $ do
27 slot <- readTVar cnt
28 check (slot < numslots)
29 writeTVar cnt (succ slot)
30 t <- readTChan tg
31 return (slot,t)
32 _ <- fork $ do
33 myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl])
34 task `catch` (\(SomeException e) -> return ())
35 atomically $ modifyTVar' cnt pred
36 again
37 action (TaskGroup tg) `onException` killThread thread
38 atomically $ do
39 isEmptyTChan tg >>= check
40 readTVar cnt >>= check . (== 0)
41 killThread thread
42
43forkTask :: TaskGroup -> String -> IO () -> IO ()
44forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action)
diff --git a/src/Control/TriadCommittee.hs b/src/Control/TriadCommittee.hs
deleted file mode 100644
index 88e665b6..00000000
--- a/src/Control/TriadCommittee.hs
+++ /dev/null
@@ -1,89 +0,0 @@
1{-# LANGUAGE TupleSections #-}
2module Control.TriadCommittee where
3
4import Control.Concurrent.STM
5import Control.Monad
6import Data.Maybe
7
8
9data TriadSlot = SlotA | SlotB | SlotC
10 deriving (Eq,Ord,Enum,Show,Read)
11
12data TriadCommittee voter a = TriadCommittee
13 { triadDecider :: TVar TriadSlot
14 , triadA :: TVar (Maybe (voter,a))
15 , triadB :: TVar (Maybe (voter,a))
16 , triadC :: TVar (Maybe (voter,a))
17 , triadNewDecision :: a -> STM ()
18 }
19
20triadSlot :: TriadSlot -> TriadCommittee voter a -> TVar (Maybe (voter,a))
21triadSlot SlotA = triadA
22triadSlot SlotB = triadB
23triadSlot SlotC = triadC
24
25triadDecision :: a -> TriadCommittee voter a -> STM a
26triadDecision fallback triad = do
27 slot <- readTVar (triadDecider triad)
28 maybe fallback snd <$> readTVar (triadSlot slot triad)
29
30
31newTriadCommittee :: (a -> STM ()) -> STM (TriadCommittee voter a)
32newTriadCommittee onChange =
33 TriadCommittee <$> newTVar SlotA
34 <*> newTVar Nothing
35 <*> newTVar Nothing
36 <*> newTVar Nothing
37 <*> pure onChange
38
39
40triadCountVotes :: Eq a => Maybe a -> TriadCommittee voter a -> STM ()
41triadCountVotes prior triad = do
42 a <- fmap ((SlotA,) . snd) <$> readTVar (triadA triad)
43 b <- fmap ((SlotB,) . snd) <$> readTVar (triadB triad)
44 c <- fmap ((SlotC,) . snd) <$> readTVar (triadC triad)
45 let (slot,vote) = case catMaybes [a,b,c] of
46 [ (x,xvote)
47 , (y,yvote)
48 , (z,zvote) ] -> if xvote == yvote then (x,Just xvote)
49 else (z,Just zvote)
50 [] -> (SlotA,Nothing)
51 ((slot,vote):_) -> (slot, Just vote)
52 writeTVar (triadDecider triad) slot
53 case vote of
54 Just v | vote /= prior -> triadNewDecision triad v
55 _ -> return ()
56
57
58addVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> a -> STM ()
59addVote triad voter vote = do
60 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
61 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
62 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
63 let avail (_,Nothing) = True
64 avail (_,Just x ) = (x == voter)
65 slots = filter avail [a,b,c]
66 forM_ (take 1 slots) $ \(slot,_) -> do
67 prior <- do
68 slotp <- readTVar (triadDecider triad)
69 fmap snd <$> readTVar (triadSlot slotp triad)
70 writeTVar (triadSlot slot triad)
71 (Just (voter,vote))
72 triadCountVotes prior triad
73
74
75delVote :: (Eq voter, Eq a) => TriadCommittee voter a -> voter -> STM ()
76delVote triad voter = do
77 a <- (SlotA,) . fmap fst <$> readTVar (triadA triad)
78 b <- (SlotB,) . fmap fst <$> readTVar (triadB triad)
79 c <- (SlotC,) . fmap fst <$> readTVar (triadC triad)
80 let match (_,Just x ) = (x == voter)
81 match _ = False
82 slots = filter match [a,b,c]
83 forM_ (take 1 slots) $ \(slot,_) -> do
84 prior <- do
85 slotp <- readTVar (triadDecider triad)
86 fmap snd <$> readTVar (triadSlot slotp triad)
87 writeTVar (triadSlot slot triad) Nothing
88 triadCountVotes prior triad
89