From 058ccb22f43e9053fa37ed719d31c72dd6dac27c Mon Sep 17 00:00:00 2001 From: joe Date: Mon, 23 Jan 2017 22:32:17 -0500 Subject: Added thread-debug flag and "threads" command. --- bittorrent.cabal | 13 +++- examples/dhtd.hs | 22 ++++++- src/Control/Concurrent/Async/Lifted/Instrument.hs | 5 ++ src/Control/Concurrent/Lifted/Instrument.hs | 78 +++++++++++++++++++++++ src/Network/BitTorrent/DHT/Query.hs | 13 +++- src/Network/BitTorrent/DHT/Session.hs | 4 ++ src/Network/KRPC/Manager.hs | 11 +++- src/Network/StreamServer.hs | 16 ++++- 8 files changed, 151 insertions(+), 11 deletions(-) create mode 100644 src/Control/Concurrent/Async/Lifted/Instrument.hs create mode 100644 src/Control/Concurrent/Lifted/Instrument.hs diff --git a/bittorrent.cabal b/bittorrent.cabal index b3165137..90053932 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal @@ -62,6 +62,10 @@ flag aeson description: Use aeson for pretty-printing bencoded data. default: True +flag thread-debug + description: Add instrumentation to threads. + default: True + library default-language: Haskell2010 default-extensions: PatternGuards @@ -122,7 +126,6 @@ library Network.BitTorrent.Internal.Types System.Torrent.FileMap System.Torrent.Tree - build-depends: lifted-base , convertible >= 1.0 , pretty >= 1.1 @@ -216,6 +219,11 @@ library if flag(aeson) build-depends: aeson, aeson-pretty, unordered-containers, vector cpp-options: -DBENCODE_AESON + if flag(thread-debug) + exposed-modules: Control.Concurrent.Lifted.Instrument + Control.Concurrent.Async.Lifted.Instrument + cpp-options: -DTHREAD_DEBUG + if flag(builder) build-depends: bytestring >= 0.9, bytestring-builder else @@ -363,6 +371,9 @@ executable dhtd , monad-logger , bittorrent , unix + if flag(thread-debug) + build-depends: time + cpp-options: -DTHREAD_DEBUG -- Utility to work with torrent files. executable mktorrent diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 4afceb50..0345e1ee 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs @@ -4,10 +4,10 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE CPP #-} import Control.Arrow; -import Control.Concurrent -import Control.Exception.Lifted as Lifted import Control.Monad import Control.Monad.Logger import Control.Monad.Reader @@ -32,6 +32,13 @@ import qualified Network.BitTorrent.DHT.Routing as R import Network.BitTorrent.DHT.Session import Network.SocketLike import Network.StreamServer +import Control.Exception.Lifted as Lifted +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +import Data.Time.Clock +#else +import Control.Concurrent +#endif mkNodeAddr :: SockAddr -> NodeAddr IPv4 mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) @@ -84,6 +91,8 @@ noDebugPrints _ = \case LevelDebug -> False noLogging :: LogSource -> LogLevel -> Bool noLogging _ _ = False +allNoise :: LogSource -> LogLevel -> Bool +allNoise _ _ = True resume :: DHT IPv4 (Maybe B.ByteString) resume = do @@ -98,7 +107,7 @@ resume = do godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b godht f = do a <- btBindAddr "8008" False - dht def { optTimeout = 5 } a noDebugPrints $ do + dht def { optTimeout = 5 } a allNoise $ do me0 <- asks tentativeNodeId printReport [("tentative node-id",show $ pPrint me0) ,("listen-address", show a) @@ -163,6 +172,13 @@ clientSession st signalQuit sock n h = do ("pid", _) -> cmd $ return $ do pid <- getProcessID hPutClient h (show pid) +#ifdef THREAD_DEBUG + ("threads", _) -> cmd $ return $ do + ts <- threadsInformation + tm <- getCurrentTime + let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts + hPutClient h $ showReport r +#endif _ -> cmd0 $ hPutClient h "error." diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs new file mode 100644 index 00000000..eab0fadc --- /dev/null +++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs @@ -0,0 +1,5 @@ +module Control.Concurrent.Async.Lifted.Instrument + ( module Control.Concurrent.Async.Lifted + ) where + +import Control.Concurrent.Async.Lifted diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs new file mode 100644 index 00000000..9ec5deef --- /dev/null +++ b/src/Control/Concurrent/Lifted/Instrument.hs @@ -0,0 +1,78 @@ +{-# LANGUAGE FlexibleContexts #-} +module Control.Concurrent.Lifted.Instrument + ( module Control.Concurrent.Lifted + , forkIO + , fork + , labelThread + , threadsInformation + , PerThread(..) + ) where + +import qualified Control.Concurrent.Lifted as Raw +import Control.Concurrent.Lifted hiding (fork) +import Control.Monad.Trans.Control +import System.IO.Unsafe +import System.Mem.Weak +import qualified Data.Map as Map +import qualified Data.IntMap as IntMap +import Control.Exception.Lifted +import Control.Monad.Base +-- import Control.Monad.IO.Class +import qualified GHC.Conc as GHC +import Data.Time.Clock + +data PerThread = PerThread + { -- wkid :: Weak ThreadId + lbl :: String + , startTime :: UTCTime + } + deriving (Eq,Ord,Show) -- ,Data,Generic) + +data GlobalState = GlobalState + { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread + -- , uniqSource :: Int + } + +globals :: MVar GlobalState +globals = unsafePerformIO $ newMVar $ GlobalState + { threads = Map.empty + -- , uniqSource = 0 + } +{-# NOINLINE globals #-} + + +forkIO :: IO () -> IO ThreadId +forkIO = fork + +fork :: MonadBaseControl IO m => m () -> m ThreadId +fork action = do + t <- Raw.fork $ do + -- wkid <- myThreadId >>= liftBase . mkWeakThreadId + -- tid <- newUniq + tid <- myThreadId + tm <- liftBase getCurrentTime + bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm)) + (modifyThreads $ Map.delete tid) + action + return t + +labelThread :: ThreadId -> String -> IO () +labelThread tid s = do + GHC.labelThread tid s + putStrLn $ "labelThread "++s++" "++show tid + modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid + +threadsInformation :: IO [PerThread] +threadsInformation = do + m <- threads <$> readMVar globals + return $ Map.elems m + +-- newUniq :: MonadBaseControl IO m => m Int +-- newUniq = do +-- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st)) + +modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m () +modifyThreads f = do + g <- takeMVar globals + let f' st = st { threads = f (threads st) } + putMVar globals (f' g) diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 73b3d492..47d17622 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs @@ -9,6 +9,7 @@ -- Normally, you don't need to import this module, use -- "Network.BitTorrent.DHT" instead. -- +{-# LANGUAGE CPP #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} @@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query , (<@>) ) where +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument hiding (yield) +#else +import GHC.Conc (labelThread) import Control.Concurrent.Lifted hiding (yield) +#endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Reader import Control.Monad.Logger @@ -227,6 +233,7 @@ refreshNodes nid = do -- routing table. insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId insertNode info witnessed_ip0 = fork $ do + -- myThreadId >>= liftIO . flip labelThread "DHT.insertNode" var <- asks routingInfo tm <- getTimestamp let showTable = do @@ -286,8 +293,10 @@ insertNode info witnessed_ip0 = fork $ do return ps ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 showTable - _ <- fork $ forM_ ps $ \(CheckPing ns)-> do - forM_ ns $ \n -> do + _ <- fork $ do + myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults" + forM_ ps $ \(CheckPing ns)-> do + forM_ ns $ \n -> do (b,mip) <- probeNode (nodeAddr n) let alive = PingResult n b $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 8dc3f7ac..bad783a5 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session import Prelude hiding (ioError) import Control.Concurrent.STM +#ifdef THREAD_DEBUG +import Control.Concurrent.Async.Lifted.Instrument +#else import Control.Concurrent.Async.Lifted +#endif import Control.Exception.Lifted hiding (Handler) import Control.Monad.Base import Control.Monad.Logger diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4852eb38..22d111e2 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs @@ -7,6 +7,7 @@ -- -- Normally, you don't need to import this module. -- +{-# LANGUAGE CPP #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleContexts #-} @@ -41,8 +42,12 @@ module Network.KRPC.Manager ) where import Control.Applicative -import Control.Concurrent -import Control.Concurrent.Lifted (fork) +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument +#else +import GHC.Conc (labelThread) +import Control.Concurrent.Lifted +#endif import Control.Exception hiding (Handler) import qualified Control.Exception.Lifted as E (Handler (..)) import Control.Exception.Lifted as Lifted (catches, finally) @@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do -- handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () handleQuery raw q addr = void $ fork $ do + myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" Manager {..} <- getManager res <- dispatchHandler q addr let resbe = either toBEncode toBEncode res @@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m () listen = do Manager {..} <- getManager tid <- fork $ do + myThreadId >>= liftIO . flip labelThread "KRPC.listen" listener `Lifted.finally` liftIO (takeMVar listenerThread) liftIO $ putMVar listenerThread tid diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs index a6cead0e..34b9388e 100644 --- a/src/Network/StreamServer.hs +++ b/src/Network/StreamServer.hs @@ -1,4 +1,5 @@ -- | This module implements a bare-bones TCP or Unix socket server. +{-# LANGUAGE CPP #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE OverloadedStrings #-} @@ -34,7 +35,12 @@ import System.IO ) import Control.Monad import Control.Monad.Fix (fix) -import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) +#ifdef THREAD_DEBUG +import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId) +#else +import GHC.Conc (labelThread) +import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId) +#endif import Control.Exception (catch,handle,try,finally) import System.IO.Error (tryIOError) import System.Mem.Weak @@ -113,7 +119,9 @@ streamServer cfg addr = do threadDelay 5000000 loop listen sock maxListenQueue - thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 + thread <- mkWeakThreadId <=< forkIO $ do + myThreadId >>= flip labelThread "StreamServer.acceptLoop" + acceptLoop cfg sock 0 return (ServerHandle sock thread) -- | Not exported. This, combined with 'acceptException' form a mutually recursive @@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do con <- accept sock let conkey = n + 1 h <- socketToHandle (fst con) ReadWriteMode - forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h + forkIO $ do + myThreadId >>= flip labelThread "StreamServer.session" + serverSession cfg (restrictHandleSocket h (fst con)) conkey h acceptLoop cfg sock (n + 1) acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () -- cgit v1.2.3