diff options
author | James Crayne <jim.crayne@gmail.com> | 2019-09-28 13:43:29 -0400 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-01 19:27:53 -0500 |
commit | 11987749fc6e6d3e53ea737d46d5ab13a16faeb8 (patch) | |
tree | 5716463275c2d3e902889db619908ded2a73971c /src/Control/Concurrent | |
parent | add2c76bced51fde5e9917e7449ef52be70faf87 (diff) |
Factor out some new libraries
word64-map:
Data.Word64Map
network-addr:
Network.Address
tox-crypto:
Crypto.Tox
lifted-concurrent:
Control.Concurrent.Lifted.Instrument
Control.Concurrent.Async.Lifted.Instrument
psq-wrap:
Data.Wrapper.PSQInt
Data.Wrapper.PSQ
minmax-psq:
Data.MinMaxPSQ
tasks:
Control.Concurrent.Tasks
kad:
Network.Kademlia
Network.Kademlia.Bootstrap
Network.Kademlia.Routing
Network.Kademlia.CommonAPI
Network.Kademlia.Persistence
Network.Kademlia.Search
Diffstat (limited to 'src/Control/Concurrent')
-rw-r--r-- | src/Control/Concurrent/Async/Lifted/Instrument.hs | 5 | ||||
-rw-r--r-- | src/Control/Concurrent/Lifted/Instrument.hs | 98 | ||||
-rw-r--r-- | src/Control/Concurrent/Tasks.hs | 44 |
3 files changed, 0 insertions, 147 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs deleted file mode 100644 index eab0fadc..00000000 --- a/src/Control/Concurrent/Async/Lifted/Instrument.hs +++ /dev/null | |||
@@ -1,5 +0,0 @@ | |||
1 | module Control.Concurrent.Async.Lifted.Instrument | ||
2 | ( module Control.Concurrent.Async.Lifted | ||
3 | ) where | ||
4 | |||
5 | import Control.Concurrent.Async.Lifted | ||
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs deleted file mode 100644 index fc3b6369..00000000 --- a/src/Control/Concurrent/Lifted/Instrument.hs +++ /dev/null | |||
@@ -1,98 +0,0 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | module Control.Concurrent.Lifted.Instrument | ||
3 | ( module Control.Concurrent.Lifted | ||
4 | , forkIO | ||
5 | , forkOS | ||
6 | , fork | ||
7 | , labelThread | ||
8 | , threadsInformation | ||
9 | , PerThread(..) | ||
10 | ) where | ||
11 | |||
12 | import qualified Control.Concurrent.Lifted as Raw | ||
13 | import Control.Concurrent.Lifted hiding (fork,forkOS) | ||
14 | import Control.Exception (fromException) | ||
15 | import Control.Monad.Trans.Control | ||
16 | import System.IO.Unsafe | ||
17 | import qualified Data.Map.Strict as Map | ||
18 | import Control.Exception.Lifted | ||
19 | import Control.Monad.Base | ||
20 | import qualified GHC.Conc as GHC | ||
21 | import Data.Time() | ||
22 | import Data.Time.Clock | ||
23 | import DPut | ||
24 | import DebugTag | ||
25 | |||
26 | |||
27 | data PerThread = PerThread | ||
28 | { lbl :: String | ||
29 | , startTime :: UTCTime | ||
30 | } | ||
31 | deriving (Eq,Ord,Show) | ||
32 | |||
33 | data GlobalState = GlobalState | ||
34 | { threads :: !(Map.Map ThreadId PerThread) | ||
35 | , reportException :: String -> IO () | ||
36 | } | ||
37 | |||
38 | globals :: MVar GlobalState | ||
39 | globals = unsafePerformIO $ newMVar $ GlobalState | ||
40 | { threads = Map.empty | ||
41 | , reportException = dput XMisc | ||
42 | } | ||
43 | {-# NOINLINE globals #-} | ||
44 | |||
45 | |||
46 | forkIO :: IO () -> IO ThreadId | ||
47 | forkIO = instrumented GHC.forkIO | ||
48 | {-# INLINE forkIO #-} | ||
49 | |||
50 | forkOS :: MonadBaseControl IO m => m () -> m ThreadId | ||
51 | forkOS = instrumented Raw.forkOS | ||
52 | {-# INLINE forkOS #-} | ||
53 | |||
54 | fork :: MonadBaseControl IO m => m () -> m ThreadId | ||
55 | fork = instrumented Raw.fork | ||
56 | {-# INLINE fork #-} | ||
57 | |||
58 | instrumented :: MonadBaseControl IO m => | ||
59 | (m () -> m ThreadId) -> m () -> m ThreadId | ||
60 | instrumented rawFork action = do | ||
61 | t <- rawFork $ do | ||
62 | tid <- myThreadId | ||
63 | tm <- liftBase getCurrentTime | ||
64 | bracket_ (modifyThreads $! Map.insert tid (PerThread "" tm)) | ||
65 | (return ()) | ||
66 | $ do catch action $ \e -> case fromException e of | ||
67 | Just ThreadKilled -> return () | ||
68 | Nothing -> liftBase $ do | ||
69 | g <- takeMVar globals | ||
70 | let l = concat [ show e | ||
71 | , " (" | ||
72 | , maybe "" lbl $ Map.lookup tid (threads g) | ||
73 | , ")" | ||
74 | ] | ||
75 | reportException g l | ||
76 | putMVar globals $! g { threads = Map.insert tid (PerThread l tm) $ threads g } | ||
77 | throwIO e | ||
78 | -- Remove the thread only if it terminated normally or was killed. | ||
79 | modifyThreads $! Map.delete tid | ||
80 | return t | ||
81 | |||
82 | labelThread :: ThreadId -> String -> IO () | ||
83 | labelThread tid s = do | ||
84 | GHC.labelThread tid s | ||
85 | modifyThreads $! Map.adjust (\pt -> pt { lbl = s }) tid | ||
86 | {-# INLINE labelThread #-} | ||
87 | |||
88 | threadsInformation :: IO [(ThreadId,PerThread)] | ||
89 | threadsInformation = do | ||
90 | m <- threads <$> readMVar globals | ||
91 | return $ Map.toList m | ||
92 | |||
93 | |||
94 | modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m () | ||
95 | modifyThreads f = do | ||
96 | g <- takeMVar globals | ||
97 | let f' st = st { threads = f (threads st) } | ||
98 | putMVar globals $! f' g | ||
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs deleted file mode 100644 index da2e589e..00000000 --- a/src/Control/Concurrent/Tasks.hs +++ /dev/null | |||
@@ -1,44 +0,0 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module Control.Concurrent.Tasks where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Data.Function | ||
7 | import Data.List | ||
8 | #ifdef THREAD_DEBUG | ||
9 | import Control.Concurrent.Lifted.Instrument | ||
10 | #else | ||
11 | import Control.Concurrent.Lifted | ||
12 | import GHC.Conc (labelThread) | ||
13 | #endif | ||
14 | |||
15 | newtype TaskGroup = TaskGroup | ||
16 | { taskQueue :: TChan (String,IO ()) | ||
17 | } | ||
18 | |||
19 | withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () | ||
20 | withTaskGroup glabel numslots action = do | ||
21 | tg <- atomically $ newTChan | ||
22 | cnt <- atomically $ newTVar 0 | ||
23 | thread <- forkIO $ do | ||
24 | myThreadId >>= flip labelThread glabel | ||
25 | fix $ \again -> do | ||
26 | (slot, (lbl,task)) <- atomically $ do | ||
27 | slot <- readTVar cnt | ||
28 | check (slot < numslots) | ||
29 | writeTVar cnt (succ slot) | ||
30 | t <- readTChan tg | ||
31 | return (slot,t) | ||
32 | _ <- fork $ do | ||
33 | myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) | ||
34 | task `catch` (\(SomeException e) -> return ()) | ||
35 | atomically $ modifyTVar' cnt pred | ||
36 | again | ||
37 | action (TaskGroup tg) `onException` killThread thread | ||
38 | atomically $ do | ||
39 | isEmptyTChan tg >>= check | ||
40 | readTVar cnt >>= check . (== 0) | ||
41 | killThread thread | ||
42 | |||
43 | forkTask :: TaskGroup -> String -> IO () -> IO () | ||
44 | forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) | ||