summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Control/Concurrent/Async/Lifted/Instrument.hs5
-rw-r--r--src/Control/Concurrent/Lifted/Instrument.hs78
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs13
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs4
-rw-r--r--src/Network/KRPC/Manager.hs11
-rw-r--r--src/Network/StreamServer.hs16
6 files changed, 120 insertions, 7 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs
new file mode 100644
index 00000000..eab0fadc
--- /dev/null
+++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs
@@ -0,0 +1,5 @@
1module Control.Concurrent.Async.Lifted.Instrument
2 ( module Control.Concurrent.Async.Lifted
3 ) where
4
5import Control.Concurrent.Async.Lifted
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs
new file mode 100644
index 00000000..9ec5deef
--- /dev/null
+++ b/src/Control/Concurrent/Lifted/Instrument.hs
@@ -0,0 +1,78 @@
1{-# LANGUAGE FlexibleContexts #-}
2module Control.Concurrent.Lifted.Instrument
3 ( module Control.Concurrent.Lifted
4 , forkIO
5 , fork
6 , labelThread
7 , threadsInformation
8 , PerThread(..)
9 ) where
10
11import qualified Control.Concurrent.Lifted as Raw
12import Control.Concurrent.Lifted hiding (fork)
13import Control.Monad.Trans.Control
14import System.IO.Unsafe
15import System.Mem.Weak
16import qualified Data.Map as Map
17import qualified Data.IntMap as IntMap
18import Control.Exception.Lifted
19import Control.Monad.Base
20-- import Control.Monad.IO.Class
21import qualified GHC.Conc as GHC
22import Data.Time.Clock
23
24data PerThread = PerThread
25 { -- wkid :: Weak ThreadId
26 lbl :: String
27 , startTime :: UTCTime
28 }
29 deriving (Eq,Ord,Show) -- ,Data,Generic)
30
31data GlobalState = GlobalState
32 { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread
33 -- , uniqSource :: Int
34 }
35
36globals :: MVar GlobalState
37globals = unsafePerformIO $ newMVar $ GlobalState
38 { threads = Map.empty
39 -- , uniqSource = 0
40 }
41{-# NOINLINE globals #-}
42
43
44forkIO :: IO () -> IO ThreadId
45forkIO = fork
46
47fork :: MonadBaseControl IO m => m () -> m ThreadId
48fork action = do
49 t <- Raw.fork $ do
50 -- wkid <- myThreadId >>= liftBase . mkWeakThreadId
51 -- tid <- newUniq
52 tid <- myThreadId
53 tm <- liftBase getCurrentTime
54 bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm))
55 (modifyThreads $ Map.delete tid)
56 action
57 return t
58
59labelThread :: ThreadId -> String -> IO ()
60labelThread tid s = do
61 GHC.labelThread tid s
62 putStrLn $ "labelThread "++s++" "++show tid
63 modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid
64
65threadsInformation :: IO [PerThread]
66threadsInformation = do
67 m <- threads <$> readMVar globals
68 return $ Map.elems m
69
70-- newUniq :: MonadBaseControl IO m => m Int
71-- newUniq = do
72-- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st))
73
74modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m ()
75modifyThreads f = do
76 g <- takeMVar globals
77 let f' st = st { threads = f (threads st) }
78 putMVar globals (f' g)
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 73b3d492..47d17622 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -9,6 +9,7 @@
9-- Normally, you don't need to import this module, use 9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead. 10-- "Network.BitTorrent.DHT" instead.
11-- 11--
12{-# LANGUAGE CPP #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE ScopedTypeVariables #-} 14{-# LANGUAGE ScopedTypeVariables #-}
14{-# LANGUAGE TemplateHaskell #-} 15{-# LANGUAGE TemplateHaskell #-}
@@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query
49 , (<@>) 50 , (<@>)
50 ) where 51 ) where
51 52
53#ifdef THREAD_DEBUG
54import Control.Concurrent.Lifted.Instrument hiding (yield)
55#else
56import GHC.Conc (labelThread)
52import Control.Concurrent.Lifted hiding (yield) 57import Control.Concurrent.Lifted hiding (yield)
58#endif
53import Control.Exception.Lifted hiding (Handler) 59import Control.Exception.Lifted hiding (Handler)
54import Control.Monad.Reader 60import Control.Monad.Reader
55import Control.Monad.Logger 61import Control.Monad.Logger
@@ -227,6 +233,7 @@ refreshNodes nid = do
227-- routing table. 233-- routing table.
228insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId 234insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId
229insertNode info witnessed_ip0 = fork $ do 235insertNode info witnessed_ip0 = fork $ do
236 -- myThreadId >>= liftIO . flip labelThread "DHT.insertNode"
230 var <- asks routingInfo 237 var <- asks routingInfo
231 tm <- getTimestamp 238 tm <- getTimestamp
232 let showTable = do 239 let showTable = do
@@ -286,8 +293,10 @@ insertNode info witnessed_ip0 = fork $ do
286 return ps 293 return ps
287 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 294 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
288 showTable 295 showTable
289 _ <- fork $ forM_ ps $ \(CheckPing ns)-> do 296 _ <- fork $ do
290 forM_ ns $ \n -> do 297 myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults"
298 forM_ ps $ \(CheckPing ns)-> do
299 forM_ ns $ \n -> do
291 (b,mip) <- probeNode (nodeAddr n) 300 (b,mip) <- probeNode (nodeAddr n)
292 let alive = PingResult n b 301 let alive = PingResult n b
293 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 302 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 8dc3f7ac..bad783a5 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session
72import Prelude hiding (ioError) 72import Prelude hiding (ioError)
73 73
74import Control.Concurrent.STM 74import Control.Concurrent.STM
75#ifdef THREAD_DEBUG
76import Control.Concurrent.Async.Lifted.Instrument
77#else
75import Control.Concurrent.Async.Lifted 78import Control.Concurrent.Async.Lifted
79#endif
76import Control.Exception.Lifted hiding (Handler) 80import Control.Exception.Lifted hiding (Handler)
77import Control.Monad.Base 81import Control.Monad.Base
78import Control.Monad.Logger 82import Control.Monad.Logger
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index 4852eb38..22d111e2 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -7,6 +7,7 @@
7-- 7--
8-- Normally, you don't need to import this module. 8-- Normally, you don't need to import this module.
9-- 9--
10{-# LANGUAGE CPP #-}
10{-# LANGUAGE OverloadedStrings #-} 11{-# LANGUAGE OverloadedStrings #-}
11{-# LANGUAGE FlexibleInstances #-} 12{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
@@ -41,8 +42,12 @@ module Network.KRPC.Manager
41 ) where 42 ) where
42 43
43import Control.Applicative 44import Control.Applicative
44import Control.Concurrent 45#ifdef THREAD_DEBUG
45import Control.Concurrent.Lifted (fork) 46import Control.Concurrent.Lifted.Instrument
47#else
48import GHC.Conc (labelThread)
49import Control.Concurrent.Lifted
50#endif
46import Control.Exception hiding (Handler) 51import Control.Exception hiding (Handler)
47import qualified Control.Exception.Lifted as E (Handler (..)) 52import qualified Control.Exception.Lifted as E (Handler (..))
48import Control.Exception.Lifted as Lifted (catches, finally) 53import Control.Exception.Lifted as Lifted (catches, finally)
@@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do
432-- 437--
433handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () 438handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m ()
434handleQuery raw q addr = void $ fork $ do 439handleQuery raw q addr = void $ fork $ do
440 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
435 Manager {..} <- getManager 441 Manager {..} <- getManager
436 res <- dispatchHandler q addr 442 res <- dispatchHandler q addr
437 let resbe = either toBEncode toBEncode res 443 let resbe = either toBEncode toBEncode res
@@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m ()
481listen = do 487listen = do
482 Manager {..} <- getManager 488 Manager {..} <- getManager
483 tid <- fork $ do 489 tid <- fork $ do
490 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
484 listener `Lifted.finally` 491 listener `Lifted.finally`
485 liftIO (takeMVar listenerThread) 492 liftIO (takeMVar listenerThread)
486 liftIO $ putMVar listenerThread tid 493 liftIO $ putMVar listenerThread tid
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs
index a6cead0e..34b9388e 100644
--- a/src/Network/StreamServer.hs
+++ b/src/Network/StreamServer.hs
@@ -1,4 +1,5 @@
1-- | This module implements a bare-bones TCP or Unix socket server. 1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-} 3{-# LANGUAGE TypeFamilies #-}
3{-# LANGUAGE TypeOperators #-} 4{-# LANGUAGE TypeOperators #-}
4{-# LANGUAGE OverloadedStrings #-} 5{-# LANGUAGE OverloadedStrings #-}
@@ -34,7 +35,12 @@ import System.IO
34 ) 35 )
35import Control.Monad 36import Control.Monad
36import Control.Monad.Fix (fix) 37import Control.Monad.Fix (fix)
37import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) 38#ifdef THREAD_DEBUG
39import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId)
40#else
41import GHC.Conc (labelThread)
42import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId)
43#endif
38import Control.Exception (catch,handle,try,finally) 44import Control.Exception (catch,handle,try,finally)
39import System.IO.Error (tryIOError) 45import System.IO.Error (tryIOError)
40import System.Mem.Weak 46import System.Mem.Weak
@@ -113,7 +119,9 @@ streamServer cfg addr = do
113 threadDelay 5000000 119 threadDelay 5000000
114 loop 120 loop
115 listen sock maxListenQueue 121 listen sock maxListenQueue
116 thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 122 thread <- mkWeakThreadId <=< forkIO $ do
123 myThreadId >>= flip labelThread "StreamServer.acceptLoop"
124 acceptLoop cfg sock 0
117 return (ServerHandle sock thread) 125 return (ServerHandle sock thread)
118 126
119-- | Not exported. This, combined with 'acceptException' form a mutually recursive 127-- | Not exported. This, combined with 'acceptException' form a mutually recursive
@@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
124 con <- accept sock 132 con <- accept sock
125 let conkey = n + 1 133 let conkey = n + 1
126 h <- socketToHandle (fst con) ReadWriteMode 134 h <- socketToHandle (fst con) ReadWriteMode
127 forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h 135 forkIO $ do
136 myThreadId >>= flip labelThread "StreamServer.session"
137 serverSession cfg (restrictHandleSocket h (fst con)) conkey h
128 acceptLoop cfg sock (n + 1) 138 acceptLoop cfg sock (n + 1)
129 139
130acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () 140acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()