diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Control/Concurrent/Tasks.hs | 44 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Search.hs | 2 | ||||
-rw-r--r-- | src/Network/Kademlia.hs | 2 |
3 files changed, 46 insertions, 2 deletions
diff --git a/src/Control/Concurrent/Tasks.hs b/src/Control/Concurrent/Tasks.hs new file mode 100644 index 00000000..287542ee --- /dev/null +++ b/src/Control/Concurrent/Tasks.hs | |||
@@ -0,0 +1,44 @@ | |||
1 | {-# LANGUAGE CPP #-} | ||
2 | module Control.Concurrent.Tasks where | ||
3 | |||
4 | import Control.Concurrent.STM | ||
5 | import Control.Exception | ||
6 | import Data.Function | ||
7 | import Data.List | ||
8 | #ifdef THREAD_DEBUG | ||
9 | import Control.Concurrent.Lifted.Instrument | ||
10 | #else | ||
11 | import Control.Concurrent.Lifted | ||
12 | import GHC.Conc (labelThread) | ||
13 | #endif | ||
14 | |||
15 | newtype TaskGroup = TaskGroup | ||
16 | { taskQueue :: TChan (String,IO ()) | ||
17 | } | ||
18 | |||
19 | withTaskGroup :: String -> Int -> (TaskGroup -> IO ()) -> IO () | ||
20 | withTaskGroup glabel numslots action = do | ||
21 | tg <- atomically $ newTChan | ||
22 | cnt <- atomically $ newTVar 0 | ||
23 | thread <- fork $ do | ||
24 | myThreadId >>= flip labelThread glabel | ||
25 | fix $ \again -> do | ||
26 | (slot, (lbl,task)) <- atomically $ do | ||
27 | slot <- readTVar cnt | ||
28 | check (slot < numslots) | ||
29 | writeTVar cnt (succ slot) | ||
30 | t <- readTChan tg | ||
31 | return (slot,t) | ||
32 | _ <- fork $ do | ||
33 | myThreadId >>= flip labelThread (intercalate "." [glabel,show slot,lbl]) | ||
34 | task `catch` (\(SomeException e) -> return ()) | ||
35 | atomically $ modifyTVar' cnt pred | ||
36 | again | ||
37 | action (TaskGroup tg) `onException` killThread thread | ||
38 | atomically $ do | ||
39 | isEmptyTChan tg >>= check | ||
40 | readTVar cnt >>= check . (== 0) | ||
41 | killThread thread | ||
42 | |||
43 | forkTask :: TaskGroup -> String -> IO () -> IO () | ||
44 | forkTask (TaskGroup q) lbl action = atomically $ writeTChan q (lbl,action) | ||
diff --git a/src/Network/BitTorrent/DHT/Search.hs b/src/Network/BitTorrent/DHT/Search.hs index 16a20620..29e91633 100644 --- a/src/Network/BitTorrent/DHT/Search.hs +++ b/src/Network/BitTorrent/DHT/Search.hs | |||
@@ -6,7 +6,7 @@ | |||
6 | {-# LANGUAGE LambdaCase #-} | 6 | {-# LANGUAGE LambdaCase #-} |
7 | module Network.BitTorrent.DHT.Search where | 7 | module Network.BitTorrent.DHT.Search where |
8 | 8 | ||
9 | import Tasks | 9 | import Control.Concurrent.Tasks |
10 | import Control.Concurrent.STM | 10 | import Control.Concurrent.STM |
11 | import Control.Exception | 11 | import Control.Exception |
12 | import Control.Monad | 12 | import Control.Monad |
diff --git a/src/Network/Kademlia.hs b/src/Network/Kademlia.hs index c6c59ae6..53c37175 100644 --- a/src/Network/Kademlia.hs +++ b/src/Network/Kademlia.hs | |||
@@ -35,7 +35,7 @@ import System.Timeout | |||
35 | import Text.PrettyPrint as PP hiding (($$), (<>)) | 35 | import Text.PrettyPrint as PP hiding (($$), (<>)) |
36 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) | 36 | import Text.PrettyPrint.HughesPJClass hiding (($$), (<>)) |
37 | import System.IO | 37 | import System.IO |
38 | import Tasks | 38 | import Control.Concurrent.Tasks |
39 | 39 | ||
40 | -- | The status of a given node with respect to a given routint table. | 40 | -- | The status of a given node with respect to a given routint table. |
41 | data RoutingStatus | 41 | data RoutingStatus |