summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjoe <joe@jerkface.net>2017-01-23 22:32:17 -0500
committerjoe <joe@jerkface.net>2017-01-23 22:33:08 -0500
commit058ccb22f43e9053fa37ed719d31c72dd6dac27c (patch)
treef6faea43c0b4cc9428e0b8cb8d0b836a9ec13107
parent6a2506745dd06ad0849a1b0d440ad9751a69cf81 (diff)
Added thread-debug flag and "threads" command.
-rw-r--r--bittorrent.cabal13
-rw-r--r--examples/dhtd.hs22
-rw-r--r--src/Control/Concurrent/Async/Lifted/Instrument.hs5
-rw-r--r--src/Control/Concurrent/Lifted/Instrument.hs78
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs13
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs4
-rw-r--r--src/Network/KRPC/Manager.hs11
-rw-r--r--src/Network/StreamServer.hs16
8 files changed, 151 insertions, 11 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal
index b3165137..90053932 100644
--- a/bittorrent.cabal
+++ b/bittorrent.cabal
@@ -62,6 +62,10 @@ flag aeson
62 description: Use aeson for pretty-printing bencoded data. 62 description: Use aeson for pretty-printing bencoded data.
63 default: True 63 default: True
64 64
65flag thread-debug
66 description: Add instrumentation to threads.
67 default: True
68
65library 69library
66 default-language: Haskell2010 70 default-language: Haskell2010
67 default-extensions: PatternGuards 71 default-extensions: PatternGuards
@@ -122,7 +126,6 @@ library
122 Network.BitTorrent.Internal.Types 126 Network.BitTorrent.Internal.Types
123 System.Torrent.FileMap 127 System.Torrent.FileMap
124 System.Torrent.Tree 128 System.Torrent.Tree
125
126 build-depends: lifted-base 129 build-depends: lifted-base
127 , convertible >= 1.0 130 , convertible >= 1.0
128 , pretty >= 1.1 131 , pretty >= 1.1
@@ -216,6 +219,11 @@ library
216 if flag(aeson) 219 if flag(aeson)
217 build-depends: aeson, aeson-pretty, unordered-containers, vector 220 build-depends: aeson, aeson-pretty, unordered-containers, vector
218 cpp-options: -DBENCODE_AESON 221 cpp-options: -DBENCODE_AESON
222 if flag(thread-debug)
223 exposed-modules: Control.Concurrent.Lifted.Instrument
224 Control.Concurrent.Async.Lifted.Instrument
225 cpp-options: -DTHREAD_DEBUG
226
219 if flag(builder) 227 if flag(builder)
220 build-depends: bytestring >= 0.9, bytestring-builder 228 build-depends: bytestring >= 0.9, bytestring-builder
221 else 229 else
@@ -363,6 +371,9 @@ executable dhtd
363 , monad-logger 371 , monad-logger
364 , bittorrent 372 , bittorrent
365 , unix 373 , unix
374 if flag(thread-debug)
375 build-depends: time
376 cpp-options: -DTHREAD_DEBUG
366 377
367-- Utility to work with torrent files. 378-- Utility to work with torrent files.
368executable mktorrent 379executable mktorrent
diff --git a/examples/dhtd.hs b/examples/dhtd.hs
index 4afceb50..0345e1ee 100644
--- a/examples/dhtd.hs
+++ b/examples/dhtd.hs
@@ -4,10 +4,10 @@
4{-# LANGUAGE OverloadedStrings #-} 4{-# LANGUAGE OverloadedStrings #-}
5{-# LANGUAGE ScopedTypeVariables #-} 5{-# LANGUAGE ScopedTypeVariables #-}
6{-# LANGUAGE TupleSections #-} 6{-# LANGUAGE TupleSections #-}
7{-# LANGUAGE RecordWildCards #-}
8{-# LANGUAGE CPP #-}
7 9
8import Control.Arrow; 10import Control.Arrow;
9import Control.Concurrent
10import Control.Exception.Lifted as Lifted
11import Control.Monad 11import Control.Monad
12import Control.Monad.Logger 12import Control.Monad.Logger
13import Control.Monad.Reader 13import Control.Monad.Reader
@@ -32,6 +32,13 @@ import qualified Network.BitTorrent.DHT.Routing as R
32import Network.BitTorrent.DHT.Session 32import Network.BitTorrent.DHT.Session
33import Network.SocketLike 33import Network.SocketLike
34import Network.StreamServer 34import Network.StreamServer
35import Control.Exception.Lifted as Lifted
36#ifdef THREAD_DEBUG
37import Control.Concurrent.Lifted.Instrument
38import Data.Time.Clock
39#else
40import Control.Concurrent
41#endif
35 42
36mkNodeAddr :: SockAddr -> NodeAddr IPv4 43mkNodeAddr :: SockAddr -> NodeAddr IPv4
37mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) 44mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr)
@@ -84,6 +91,8 @@ noDebugPrints _ = \case LevelDebug -> False
84noLogging :: LogSource -> LogLevel -> Bool 91noLogging :: LogSource -> LogLevel -> Bool
85noLogging _ _ = False 92noLogging _ _ = False
86 93
94allNoise :: LogSource -> LogLevel -> Bool
95allNoise _ _ = True
87 96
88resume :: DHT IPv4 (Maybe B.ByteString) 97resume :: DHT IPv4 (Maybe B.ByteString)
89resume = do 98resume = do
@@ -98,7 +107,7 @@ resume = do
98godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b 107godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b
99godht f = do 108godht f = do
100 a <- btBindAddr "8008" False 109 a <- btBindAddr "8008" False
101 dht def { optTimeout = 5 } a noDebugPrints $ do 110 dht def { optTimeout = 5 } a allNoise $ do
102 me0 <- asks tentativeNodeId 111 me0 <- asks tentativeNodeId
103 printReport [("tentative node-id",show $ pPrint me0) 112 printReport [("tentative node-id",show $ pPrint me0)
104 ,("listen-address", show a) 113 ,("listen-address", show a)
@@ -163,6 +172,13 @@ clientSession st signalQuit sock n h = do
163 ("pid", _) -> cmd $ return $ do 172 ("pid", _) -> cmd $ return $ do
164 pid <- getProcessID 173 pid <- getProcessID
165 hPutClient h (show pid) 174 hPutClient h (show pid)
175#ifdef THREAD_DEBUG
176 ("threads", _) -> cmd $ return $ do
177 ts <- threadsInformation
178 tm <- getCurrentTime
179 let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts
180 hPutClient h $ showReport r
181#endif
166 182
167 _ -> cmd0 $ hPutClient h "error." 183 _ -> cmd0 $ hPutClient h "error."
168 184
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs
new file mode 100644
index 00000000..eab0fadc
--- /dev/null
+++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs
@@ -0,0 +1,5 @@
1module Control.Concurrent.Async.Lifted.Instrument
2 ( module Control.Concurrent.Async.Lifted
3 ) where
4
5import Control.Concurrent.Async.Lifted
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs
new file mode 100644
index 00000000..9ec5deef
--- /dev/null
+++ b/src/Control/Concurrent/Lifted/Instrument.hs
@@ -0,0 +1,78 @@
1{-# LANGUAGE FlexibleContexts #-}
2module Control.Concurrent.Lifted.Instrument
3 ( module Control.Concurrent.Lifted
4 , forkIO
5 , fork
6 , labelThread
7 , threadsInformation
8 , PerThread(..)
9 ) where
10
11import qualified Control.Concurrent.Lifted as Raw
12import Control.Concurrent.Lifted hiding (fork)
13import Control.Monad.Trans.Control
14import System.IO.Unsafe
15import System.Mem.Weak
16import qualified Data.Map as Map
17import qualified Data.IntMap as IntMap
18import Control.Exception.Lifted
19import Control.Monad.Base
20-- import Control.Monad.IO.Class
21import qualified GHC.Conc as GHC
22import Data.Time.Clock
23
24data PerThread = PerThread
25 { -- wkid :: Weak ThreadId
26 lbl :: String
27 , startTime :: UTCTime
28 }
29 deriving (Eq,Ord,Show) -- ,Data,Generic)
30
31data GlobalState = GlobalState
32 { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread
33 -- , uniqSource :: Int
34 }
35
36globals :: MVar GlobalState
37globals = unsafePerformIO $ newMVar $ GlobalState
38 { threads = Map.empty
39 -- , uniqSource = 0
40 }
41{-# NOINLINE globals #-}
42
43
44forkIO :: IO () -> IO ThreadId
45forkIO = fork
46
47fork :: MonadBaseControl IO m => m () -> m ThreadId
48fork action = do
49 t <- Raw.fork $ do
50 -- wkid <- myThreadId >>= liftBase . mkWeakThreadId
51 -- tid <- newUniq
52 tid <- myThreadId
53 tm <- liftBase getCurrentTime
54 bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm))
55 (modifyThreads $ Map.delete tid)
56 action
57 return t
58
59labelThread :: ThreadId -> String -> IO ()
60labelThread tid s = do
61 GHC.labelThread tid s
62 putStrLn $ "labelThread "++s++" "++show tid
63 modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid
64
65threadsInformation :: IO [PerThread]
66threadsInformation = do
67 m <- threads <$> readMVar globals
68 return $ Map.elems m
69
70-- newUniq :: MonadBaseControl IO m => m Int
71-- newUniq = do
72-- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st))
73
74modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m ()
75modifyThreads f = do
76 g <- takeMVar globals
77 let f' st = st { threads = f (threads st) }
78 putMVar globals (f' g)
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 73b3d492..47d17622 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -9,6 +9,7 @@
9-- Normally, you don't need to import this module, use 9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead. 10-- "Network.BitTorrent.DHT" instead.
11-- 11--
12{-# LANGUAGE CPP #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE ScopedTypeVariables #-} 14{-# LANGUAGE ScopedTypeVariables #-}
14{-# LANGUAGE TemplateHaskell #-} 15{-# LANGUAGE TemplateHaskell #-}
@@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query
49 , (<@>) 50 , (<@>)
50 ) where 51 ) where
51 52
53#ifdef THREAD_DEBUG
54import Control.Concurrent.Lifted.Instrument hiding (yield)
55#else
56import GHC.Conc (labelThread)
52import Control.Concurrent.Lifted hiding (yield) 57import Control.Concurrent.Lifted hiding (yield)
58#endif
53import Control.Exception.Lifted hiding (Handler) 59import Control.Exception.Lifted hiding (Handler)
54import Control.Monad.Reader 60import Control.Monad.Reader
55import Control.Monad.Logger 61import Control.Monad.Logger
@@ -227,6 +233,7 @@ refreshNodes nid = do
227-- routing table. 233-- routing table.
228insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId 234insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId
229insertNode info witnessed_ip0 = fork $ do 235insertNode info witnessed_ip0 = fork $ do
236 -- myThreadId >>= liftIO . flip labelThread "DHT.insertNode"
230 var <- asks routingInfo 237 var <- asks routingInfo
231 tm <- getTimestamp 238 tm <- getTimestamp
232 let showTable = do 239 let showTable = do
@@ -286,8 +293,10 @@ insertNode info witnessed_ip0 = fork $ do
286 return ps 293 return ps
287 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 294 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
288 showTable 295 showTable
289 _ <- fork $ forM_ ps $ \(CheckPing ns)-> do 296 _ <- fork $ do
290 forM_ ns $ \n -> do 297 myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults"
298 forM_ ps $ \(CheckPing ns)-> do
299 forM_ ns $ \n -> do
291 (b,mip) <- probeNode (nodeAddr n) 300 (b,mip) <- probeNode (nodeAddr n)
292 let alive = PingResult n b 301 let alive = PingResult n b
293 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 302 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 8dc3f7ac..bad783a5 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session
72import Prelude hiding (ioError) 72import Prelude hiding (ioError)
73 73
74import Control.Concurrent.STM 74import Control.Concurrent.STM
75#ifdef THREAD_DEBUG
76import Control.Concurrent.Async.Lifted.Instrument
77#else
75import Control.Concurrent.Async.Lifted 78import Control.Concurrent.Async.Lifted
79#endif
76import Control.Exception.Lifted hiding (Handler) 80import Control.Exception.Lifted hiding (Handler)
77import Control.Monad.Base 81import Control.Monad.Base
78import Control.Monad.Logger 82import Control.Monad.Logger
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index 4852eb38..22d111e2 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -7,6 +7,7 @@
7-- 7--
8-- Normally, you don't need to import this module. 8-- Normally, you don't need to import this module.
9-- 9--
10{-# LANGUAGE CPP #-}
10{-# LANGUAGE OverloadedStrings #-} 11{-# LANGUAGE OverloadedStrings #-}
11{-# LANGUAGE FlexibleInstances #-} 12{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
@@ -41,8 +42,12 @@ module Network.KRPC.Manager
41 ) where 42 ) where
42 43
43import Control.Applicative 44import Control.Applicative
44import Control.Concurrent 45#ifdef THREAD_DEBUG
45import Control.Concurrent.Lifted (fork) 46import Control.Concurrent.Lifted.Instrument
47#else
48import GHC.Conc (labelThread)
49import Control.Concurrent.Lifted
50#endif
46import Control.Exception hiding (Handler) 51import Control.Exception hiding (Handler)
47import qualified Control.Exception.Lifted as E (Handler (..)) 52import qualified Control.Exception.Lifted as E (Handler (..))
48import Control.Exception.Lifted as Lifted (catches, finally) 53import Control.Exception.Lifted as Lifted (catches, finally)
@@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do
432-- 437--
433handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () 438handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m ()
434handleQuery raw q addr = void $ fork $ do 439handleQuery raw q addr = void $ fork $ do
440 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
435 Manager {..} <- getManager 441 Manager {..} <- getManager
436 res <- dispatchHandler q addr 442 res <- dispatchHandler q addr
437 let resbe = either toBEncode toBEncode res 443 let resbe = either toBEncode toBEncode res
@@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m ()
481listen = do 487listen = do
482 Manager {..} <- getManager 488 Manager {..} <- getManager
483 tid <- fork $ do 489 tid <- fork $ do
490 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
484 listener `Lifted.finally` 491 listener `Lifted.finally`
485 liftIO (takeMVar listenerThread) 492 liftIO (takeMVar listenerThread)
486 liftIO $ putMVar listenerThread tid 493 liftIO $ putMVar listenerThread tid
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs
index a6cead0e..34b9388e 100644
--- a/src/Network/StreamServer.hs
+++ b/src/Network/StreamServer.hs
@@ -1,4 +1,5 @@
1-- | This module implements a bare-bones TCP or Unix socket server. 1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-} 3{-# LANGUAGE TypeFamilies #-}
3{-# LANGUAGE TypeOperators #-} 4{-# LANGUAGE TypeOperators #-}
4{-# LANGUAGE OverloadedStrings #-} 5{-# LANGUAGE OverloadedStrings #-}
@@ -34,7 +35,12 @@ import System.IO
34 ) 35 )
35import Control.Monad 36import Control.Monad
36import Control.Monad.Fix (fix) 37import Control.Monad.Fix (fix)
37import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) 38#ifdef THREAD_DEBUG
39import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId)
40#else
41import GHC.Conc (labelThread)
42import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId)
43#endif
38import Control.Exception (catch,handle,try,finally) 44import Control.Exception (catch,handle,try,finally)
39import System.IO.Error (tryIOError) 45import System.IO.Error (tryIOError)
40import System.Mem.Weak 46import System.Mem.Weak
@@ -113,7 +119,9 @@ streamServer cfg addr = do
113 threadDelay 5000000 119 threadDelay 5000000
114 loop 120 loop
115 listen sock maxListenQueue 121 listen sock maxListenQueue
116 thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 122 thread <- mkWeakThreadId <=< forkIO $ do
123 myThreadId >>= flip labelThread "StreamServer.acceptLoop"
124 acceptLoop cfg sock 0
117 return (ServerHandle sock thread) 125 return (ServerHandle sock thread)
118 126
119-- | Not exported. This, combined with 'acceptException' form a mutually recursive 127-- | Not exported. This, combined with 'acceptException' form a mutually recursive
@@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
124 con <- accept sock 132 con <- accept sock
125 let conkey = n + 1 133 let conkey = n + 1
126 h <- socketToHandle (fst con) ReadWriteMode 134 h <- socketToHandle (fst con) ReadWriteMode
127 forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h 135 forkIO $ do
136 myThreadId >>= flip labelThread "StreamServer.session"
137 serverSession cfg (restrictHandleSocket h (fst con)) conkey h
128 acceptLoop cfg sock (n + 1) 138 acceptLoop cfg sock (n + 1)
129 139
130acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () 140acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()