diff options
author | James Crayne <jim.crayne@gmail.com> | 2017-01-24 05:53:02 +0000 |
---|---|---|
committer | James Crayne <jim.crayne@gmail.com> | 2017-01-24 05:53:02 +0000 |
commit | 01a455efc105feb2f76820ca5cc2a4f74f40b2d7 (patch) | |
tree | a0c9fd48f5b0a4aa8b9b1da1f9c2f2f381863a3a | |
parent | 0d9d130d864394d75d08f3396c62fa5b8176573f (diff) | |
parent | 1c01fae3a00942fd0d42f8b8e832e2665a679213 (diff) |
Merge branch 'dht-only' of 10.0.0.137:bittorrent into dht-only
-rw-r--r-- | bittorrent.cabal | 14 | ||||
-rw-r--r-- | examples/dhtd.hs | 39 | ||||
-rw-r--r-- | src/Control/Concurrent/Async/Lifted/Instrument.hs | 5 | ||||
-rw-r--r-- | src/Control/Concurrent/Lifted/Instrument.hs | 78 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 16 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 10 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 9 | ||||
-rw-r--r-- | src/Network/KRPC/Manager.hs | 11 | ||||
-rw-r--r-- | src/Network/StreamServer.hs | 16 |
9 files changed, 173 insertions, 25 deletions
diff --git a/bittorrent.cabal b/bittorrent.cabal index 2dee30ee..90053932 100644 --- a/bittorrent.cabal +++ b/bittorrent.cabal | |||
@@ -62,6 +62,10 @@ flag aeson | |||
62 | description: Use aeson for pretty-printing bencoded data. | 62 | description: Use aeson for pretty-printing bencoded data. |
63 | default: True | 63 | default: True |
64 | 64 | ||
65 | flag thread-debug | ||
66 | description: Add instrumentation to threads. | ||
67 | default: True | ||
68 | |||
65 | library | 69 | library |
66 | default-language: Haskell2010 | 70 | default-language: Haskell2010 |
67 | default-extensions: PatternGuards | 71 | default-extensions: PatternGuards |
@@ -122,7 +126,6 @@ library | |||
122 | Network.BitTorrent.Internal.Types | 126 | Network.BitTorrent.Internal.Types |
123 | System.Torrent.FileMap | 127 | System.Torrent.FileMap |
124 | System.Torrent.Tree | 128 | System.Torrent.Tree |
125 | |||
126 | build-depends: lifted-base | 129 | build-depends: lifted-base |
127 | , convertible >= 1.0 | 130 | , convertible >= 1.0 |
128 | , pretty >= 1.1 | 131 | , pretty >= 1.1 |
@@ -216,6 +219,11 @@ library | |||
216 | if flag(aeson) | 219 | if flag(aeson) |
217 | build-depends: aeson, aeson-pretty, unordered-containers, vector | 220 | build-depends: aeson, aeson-pretty, unordered-containers, vector |
218 | cpp-options: -DBENCODE_AESON | 221 | cpp-options: -DBENCODE_AESON |
222 | if flag(thread-debug) | ||
223 | exposed-modules: Control.Concurrent.Lifted.Instrument | ||
224 | Control.Concurrent.Async.Lifted.Instrument | ||
225 | cpp-options: -DTHREAD_DEBUG | ||
226 | |||
219 | if flag(builder) | 227 | if flag(builder) |
220 | build-depends: bytestring >= 0.9, bytestring-builder | 228 | build-depends: bytestring >= 0.9, bytestring-builder |
221 | else | 229 | else |
@@ -362,6 +370,10 @@ executable dhtd | |||
362 | , data-default | 370 | , data-default |
363 | , monad-logger | 371 | , monad-logger |
364 | , bittorrent | 372 | , bittorrent |
373 | , unix | ||
374 | if flag(thread-debug) | ||
375 | build-depends: time | ||
376 | cpp-options: -DTHREAD_DEBUG | ||
365 | 377 | ||
366 | -- Utility to work with torrent files. | 378 | -- Utility to work with torrent files. |
367 | executable mktorrent | 379 | executable mktorrent |
diff --git a/examples/dhtd.hs b/examples/dhtd.hs index 19b45acb..569e1d3d 100644 --- a/examples/dhtd.hs +++ b/examples/dhtd.hs | |||
@@ -4,10 +4,10 @@ | |||
4 | {-# LANGUAGE OverloadedStrings #-} | 4 | {-# LANGUAGE OverloadedStrings #-} |
5 | {-# LANGUAGE ScopedTypeVariables #-} | 5 | {-# LANGUAGE ScopedTypeVariables #-} |
6 | {-# LANGUAGE TupleSections #-} | 6 | {-# LANGUAGE TupleSections #-} |
7 | {-# LANGUAGE RecordWildCards #-} | ||
8 | {-# LANGUAGE CPP #-} | ||
7 | 9 | ||
8 | import Control.Arrow; | 10 | import Control.Arrow; |
9 | import Control.Concurrent | ||
10 | import Control.Exception.Lifted as Lifted | ||
11 | import Control.Monad | 11 | import Control.Monad |
12 | import Control.Monad.Logger | 12 | import Control.Monad.Logger |
13 | import Control.Monad.Reader | 13 | import Control.Monad.Reader |
@@ -23,7 +23,9 @@ import System.IO | |||
23 | import System.IO.Error | 23 | import System.IO.Error |
24 | import Text.PrettyPrint.HughesPJClass | 24 | import Text.PrettyPrint.HughesPJClass |
25 | import Text.Printf | 25 | import Text.Printf |
26 | import Text.Read | ||
26 | import Control.Monad.Reader.Class | 27 | import Control.Monad.Reader.Class |
28 | import System.Posix.Process (getProcessID) | ||
27 | 29 | ||
28 | import Network.BitTorrent.Address | 30 | import Network.BitTorrent.Address |
29 | import Network.BitTorrent.DHT | 31 | import Network.BitTorrent.DHT |
@@ -31,6 +33,13 @@ import qualified Network.BitTorrent.DHT.Routing as R | |||
31 | import Network.BitTorrent.DHT.Session | 33 | import Network.BitTorrent.DHT.Session |
32 | import Network.SocketLike | 34 | import Network.SocketLike |
33 | import Network.StreamServer | 35 | import Network.StreamServer |
36 | import Control.Exception.Lifted as Lifted | ||
37 | #ifdef THREAD_DEBUG | ||
38 | import Control.Concurrent.Lifted.Instrument | ||
39 | import Data.Time.Clock | ||
40 | #else | ||
41 | import Control.Concurrent | ||
42 | #endif | ||
34 | 43 | ||
35 | mkNodeAddr :: SockAddr -> NodeAddr IPv4 | 44 | mkNodeAddr :: SockAddr -> NodeAddr IPv4 |
36 | mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) | 45 | mkNodeAddr addr = NodeAddr (fromJust $ fromSockAddr addr) |
@@ -83,6 +92,8 @@ noDebugPrints _ = \case LevelDebug -> False | |||
83 | noLogging :: LogSource -> LogLevel -> Bool | 92 | noLogging :: LogSource -> LogLevel -> Bool |
84 | noLogging _ _ = False | 93 | noLogging _ _ = False |
85 | 94 | ||
95 | allNoise :: LogSource -> LogLevel -> Bool | ||
96 | allNoise _ _ = True | ||
86 | 97 | ||
87 | resume :: DHT IPv4 (Maybe B.ByteString) | 98 | resume :: DHT IPv4 (Maybe B.ByteString) |
88 | resume = do | 99 | resume = do |
@@ -97,7 +108,7 @@ resume = do | |||
97 | godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b | 108 | godht :: forall b. (NodeAddr IPv4 -> NodeId -> DHT IPv4 b) -> IO b |
98 | godht f = do | 109 | godht f = do |
99 | a <- btBindAddr "8008" False | 110 | a <- btBindAddr "8008" False |
100 | dht def { optTimeout = 5 } a noDebugPrints $ do | 111 | dht def { optTimeout = 5 } a allNoise $ do |
101 | me0 <- asks tentativeNodeId | 112 | me0 <- asks tentativeNodeId |
102 | printReport [("tentative node-id",show $ pPrint me0) | 113 | printReport [("tentative node-id",show $ pPrint me0) |
103 | ,("listen-address", show a) | 114 | ,("listen-address", show a) |
@@ -153,11 +164,23 @@ clientSession st signalQuit sock n h = do | |||
153 | return $ do | 164 | return $ do |
154 | hPutClient h $ showReport r | 165 | hPutClient h $ showReport r |
155 | 166 | ||
156 | ("peers ", s) -> cmd $ do | 167 | ("peers", s) -> cmd $ case readEither s of |
157 | let ih = fromString s | 168 | Right ih -> do |
158 | ps <- allPeers ih | 169 | ps <- allPeers ih |
159 | return $ do | 170 | seq ih $ return $ do |
160 | hPutClient h $ showReport $ map (((,) "") . show . pPrint) ps | 171 | hPutClient h $ showReport $ map (((,) "") . show . pPrint) ps |
172 | Left er -> return $ hPutClient h er | ||
173 | |||
174 | ("pid", _) -> cmd $ return $ do | ||
175 | pid <- getProcessID | ||
176 | hPutClient h (show pid) | ||
177 | #ifdef THREAD_DEBUG | ||
178 | ("threads", _) -> cmd $ return $ do | ||
179 | ts <- threadsInformation | ||
180 | tm <- getCurrentTime | ||
181 | let r = map (\PerThread{..} -> (show lbl,show (diffUTCTime tm startTime))) ts | ||
182 | hPutClient h $ showReport r | ||
183 | #endif | ||
161 | 184 | ||
162 | _ -> cmd0 $ hPutClient h "error." | 185 | _ -> cmd0 $ hPutClient h "error." |
163 | 186 | ||
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs new file mode 100644 index 00000000..eab0fadc --- /dev/null +++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs | |||
@@ -0,0 +1,5 @@ | |||
1 | module Control.Concurrent.Async.Lifted.Instrument | ||
2 | ( module Control.Concurrent.Async.Lifted | ||
3 | ) where | ||
4 | |||
5 | import Control.Concurrent.Async.Lifted | ||
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs new file mode 100644 index 00000000..9ec5deef --- /dev/null +++ b/src/Control/Concurrent/Lifted/Instrument.hs | |||
@@ -0,0 +1,78 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | module Control.Concurrent.Lifted.Instrument | ||
3 | ( module Control.Concurrent.Lifted | ||
4 | , forkIO | ||
5 | , fork | ||
6 | , labelThread | ||
7 | , threadsInformation | ||
8 | , PerThread(..) | ||
9 | ) where | ||
10 | |||
11 | import qualified Control.Concurrent.Lifted as Raw | ||
12 | import Control.Concurrent.Lifted hiding (fork) | ||
13 | import Control.Monad.Trans.Control | ||
14 | import System.IO.Unsafe | ||
15 | import System.Mem.Weak | ||
16 | import qualified Data.Map as Map | ||
17 | import qualified Data.IntMap as IntMap | ||
18 | import Control.Exception.Lifted | ||
19 | import Control.Monad.Base | ||
20 | -- import Control.Monad.IO.Class | ||
21 | import qualified GHC.Conc as GHC | ||
22 | import Data.Time.Clock | ||
23 | |||
24 | data PerThread = PerThread | ||
25 | { -- wkid :: Weak ThreadId | ||
26 | lbl :: String | ||
27 | , startTime :: UTCTime | ||
28 | } | ||
29 | deriving (Eq,Ord,Show) -- ,Data,Generic) | ||
30 | |||
31 | data GlobalState = GlobalState | ||
32 | { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread | ||
33 | -- , uniqSource :: Int | ||
34 | } | ||
35 | |||
36 | globals :: MVar GlobalState | ||
37 | globals = unsafePerformIO $ newMVar $ GlobalState | ||
38 | { threads = Map.empty | ||
39 | -- , uniqSource = 0 | ||
40 | } | ||
41 | {-# NOINLINE globals #-} | ||
42 | |||
43 | |||
44 | forkIO :: IO () -> IO ThreadId | ||
45 | forkIO = fork | ||
46 | |||
47 | fork :: MonadBaseControl IO m => m () -> m ThreadId | ||
48 | fork action = do | ||
49 | t <- Raw.fork $ do | ||
50 | -- wkid <- myThreadId >>= liftBase . mkWeakThreadId | ||
51 | -- tid <- newUniq | ||
52 | tid <- myThreadId | ||
53 | tm <- liftBase getCurrentTime | ||
54 | bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm)) | ||
55 | (modifyThreads $ Map.delete tid) | ||
56 | action | ||
57 | return t | ||
58 | |||
59 | labelThread :: ThreadId -> String -> IO () | ||
60 | labelThread tid s = do | ||
61 | GHC.labelThread tid s | ||
62 | putStrLn $ "labelThread "++s++" "++show tid | ||
63 | modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid | ||
64 | |||
65 | threadsInformation :: IO [PerThread] | ||
66 | threadsInformation = do | ||
67 | m <- threads <$> readMVar globals | ||
68 | return $ Map.elems m | ||
69 | |||
70 | -- newUniq :: MonadBaseControl IO m => m Int | ||
71 | -- newUniq = do | ||
72 | -- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st)) | ||
73 | |||
74 | modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m () | ||
75 | modifyThreads f = do | ||
76 | g <- takeMVar globals | ||
77 | let f' st = st { threads = f (threads st) } | ||
78 | putMVar globals (f' g) | ||
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 73b3d492..533068c6 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -9,6 +9,7 @@ | |||
9 | -- Normally, you don't need to import this module, use | 9 | -- Normally, you don't need to import this module, use |
10 | -- "Network.BitTorrent.DHT" instead. | 10 | -- "Network.BitTorrent.DHT" instead. |
11 | -- | 11 | -- |
12 | {-# LANGUAGE CPP #-} | ||
12 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
13 | {-# LANGUAGE ScopedTypeVariables #-} | 14 | {-# LANGUAGE ScopedTypeVariables #-} |
14 | {-# LANGUAGE TemplateHaskell #-} | 15 | {-# LANGUAGE TemplateHaskell #-} |
@@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query | |||
49 | , (<@>) | 50 | , (<@>) |
50 | ) where | 51 | ) where |
51 | 52 | ||
53 | #ifdef THREAD_DEBUG | ||
54 | import Control.Concurrent.Lifted.Instrument hiding (yield) | ||
55 | #else | ||
56 | import GHC.Conc (labelThread) | ||
52 | import Control.Concurrent.Lifted hiding (yield) | 57 | import Control.Concurrent.Lifted hiding (yield) |
58 | #endif | ||
53 | import Control.Exception.Lifted hiding (Handler) | 59 | import Control.Exception.Lifted hiding (Handler) |
54 | import Control.Monad.Reader | 60 | import Control.Monad.Reader |
55 | import Control.Monad.Logger | 61 | import Control.Monad.Logger |
@@ -225,8 +231,8 @@ refreshNodes nid = do | |||
225 | 231 | ||
226 | -- | This operation do not block but acquire exclusive access to | 232 | -- | This operation do not block but acquire exclusive access to |
227 | -- routing table. | 233 | -- routing table. |
228 | insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId | 234 | insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip () |
229 | insertNode info witnessed_ip0 = fork $ do | 235 | insertNode info witnessed_ip0 = do |
230 | var <- asks routingInfo | 236 | var <- asks routingInfo |
231 | tm <- getTimestamp | 237 | tm <- getTimestamp |
232 | let showTable = do | 238 | let showTable = do |
@@ -286,8 +292,10 @@ insertNode info witnessed_ip0 = fork $ do | |||
286 | return ps | 292 | return ps |
287 | ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 | 293 | ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 |
288 | showTable | 294 | showTable |
289 | _ <- fork $ forM_ ps $ \(CheckPing ns)-> do | 295 | _ <- fork $ do |
290 | forM_ ns $ \n -> do | 296 | myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults" |
297 | forM_ ps $ \(CheckPing ns)-> do | ||
298 | forM_ ns $ \n -> do | ||
291 | (b,mip) <- probeNode (nodeAddr n) | 299 | (b,mip) <- probeNode (nodeAddr n) |
292 | let alive = PingResult n b | 300 | let alive = PingResult n b |
293 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | 301 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) |
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 8a6849a1..d64e415e 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -529,11 +529,11 @@ splitTip nid n i bucket | |||
529 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | 529 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia |
530 | -- paper. The rule requiring additional splits is in section 2.4. | 530 | -- paper. The rule requiring additional splits is in section 2.4. |
531 | modifyBucket | 531 | modifyBucket |
532 | :: forall f ip xs. (Alternative f, Eq ip) => | 532 | :: forall ip xs. (Eq ip) => |
533 | NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) | 533 | NodeId -> (Bucket ip -> Maybe (xs, Bucket ip)) -> Table ip -> Maybe (xs,Table ip) |
534 | modifyBucket nodeId f = go (0 :: BitIx) | 534 | modifyBucket nodeId f = go (0 :: BitIx) |
535 | where | 535 | where |
536 | go :: BitIx -> Table ip -> f (xs, Table ip) | 536 | go :: BitIx -> Table ip -> Maybe (xs, Table ip) |
537 | go i (Zero table bucket) | 537 | go i (Zero table bucket) |
538 | | testIdBit nodeId i = second (Zero table) <$> f bucket | 538 | | testIdBit nodeId i = second (Zero table) <$> f bucket |
539 | | otherwise = second (`Zero` bucket) <$> go (succ i) table | 539 | | otherwise = second (`Zero` bucket) <$> go (succ i) table |
@@ -562,8 +562,8 @@ data CheckPing ip = CheckPing [NodeInfo ip] | |||
562 | 562 | ||
563 | 563 | ||
564 | -- | Atomic 'Table' update | 564 | -- | Atomic 'Table' update |
565 | insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) | 565 | insert :: (Eq ip, Applicative m) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) |
566 | insert tm event = modifyBucket (eventId event) (insertBucket tm event) | 566 | insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl |
567 | 567 | ||
568 | 568 | ||
569 | {----------------------------------------------------------------------- | 569 | {----------------------------------------------------------------------- |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index c08021c7..bad783a5 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session | |||
72 | import Prelude hiding (ioError) | 72 | import Prelude hiding (ioError) |
73 | 73 | ||
74 | import Control.Concurrent.STM | 74 | import Control.Concurrent.STM |
75 | #ifdef THREAD_DEBUG | ||
76 | import Control.Concurrent.Async.Lifted.Instrument | ||
77 | #else | ||
75 | import Control.Concurrent.Async.Lifted | 78 | import Control.Concurrent.Async.Lifted |
79 | #endif | ||
76 | import Control.Exception.Lifted hiding (Handler) | 80 | import Control.Exception.Lifted hiding (Handler) |
77 | import Control.Monad.Base | 81 | import Control.Monad.Base |
78 | import Control.Monad.Logger | 82 | import Control.Monad.Logger |
@@ -395,8 +399,9 @@ routableAddress = do | |||
395 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId | 399 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId |
396 | myNodeIdAccordingTo _ = do | 400 | myNodeIdAccordingTo _ = do |
397 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 401 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
398 | fallback <- asks tentativeNodeId | 402 | maybe (asks tentativeNodeId) |
399 | return $ maybe fallback myNodeId info | 403 | (return . myNodeId) |
404 | info | ||
400 | 405 | ||
401 | -- | Get current routing table. Normally you don't need to use this | 406 | -- | Get current routing table. Normally you don't need to use this |
402 | -- function, but it can be usefull for debugging and profiling purposes. | 407 | -- function, but it can be usefull for debugging and profiling purposes. |
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4852eb38..22d111e2 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs | |||
@@ -7,6 +7,7 @@ | |||
7 | -- | 7 | -- |
8 | -- Normally, you don't need to import this module. | 8 | -- Normally, you don't need to import this module. |
9 | -- | 9 | -- |
10 | {-# LANGUAGE CPP #-} | ||
10 | {-# LANGUAGE OverloadedStrings #-} | 11 | {-# LANGUAGE OverloadedStrings #-} |
11 | {-# LANGUAGE FlexibleInstances #-} | 12 | {-# LANGUAGE FlexibleInstances #-} |
12 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
@@ -41,8 +42,12 @@ module Network.KRPC.Manager | |||
41 | ) where | 42 | ) where |
42 | 43 | ||
43 | import Control.Applicative | 44 | import Control.Applicative |
44 | import Control.Concurrent | 45 | #ifdef THREAD_DEBUG |
45 | import Control.Concurrent.Lifted (fork) | 46 | import Control.Concurrent.Lifted.Instrument |
47 | #else | ||
48 | import GHC.Conc (labelThread) | ||
49 | import Control.Concurrent.Lifted | ||
50 | #endif | ||
46 | import Control.Exception hiding (Handler) | 51 | import Control.Exception hiding (Handler) |
47 | import qualified Control.Exception.Lifted as E (Handler (..)) | 52 | import qualified Control.Exception.Lifted as E (Handler (..)) |
48 | import Control.Exception.Lifted as Lifted (catches, finally) | 53 | import Control.Exception.Lifted as Lifted (catches, finally) |
@@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do | |||
432 | -- | 437 | -- |
433 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () | 438 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () |
434 | handleQuery raw q addr = void $ fork $ do | 439 | handleQuery raw q addr = void $ fork $ do |
440 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
435 | Manager {..} <- getManager | 441 | Manager {..} <- getManager |
436 | res <- dispatchHandler q addr | 442 | res <- dispatchHandler q addr |
437 | let resbe = either toBEncode toBEncode res | 443 | let resbe = either toBEncode toBEncode res |
@@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m () | |||
481 | listen = do | 487 | listen = do |
482 | Manager {..} <- getManager | 488 | Manager {..} <- getManager |
483 | tid <- fork $ do | 489 | tid <- fork $ do |
490 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | ||
484 | listener `Lifted.finally` | 491 | listener `Lifted.finally` |
485 | liftIO (takeMVar listenerThread) | 492 | liftIO (takeMVar listenerThread) |
486 | liftIO $ putMVar listenerThread tid | 493 | liftIO $ putMVar listenerThread tid |
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs index a6cead0e..34b9388e 100644 --- a/src/Network/StreamServer.hs +++ b/src/Network/StreamServer.hs | |||
@@ -1,4 +1,5 @@ | |||
1 | -- | This module implements a bare-bones TCP or Unix socket server. | 1 | -- | This module implements a bare-bones TCP or Unix socket server. |
2 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TypeFamilies #-} | 3 | {-# LANGUAGE TypeFamilies #-} |
3 | {-# LANGUAGE TypeOperators #-} | 4 | {-# LANGUAGE TypeOperators #-} |
4 | {-# LANGUAGE OverloadedStrings #-} | 5 | {-# LANGUAGE OverloadedStrings #-} |
@@ -34,7 +35,12 @@ import System.IO | |||
34 | ) | 35 | ) |
35 | import Control.Monad | 36 | import Control.Monad |
36 | import Control.Monad.Fix (fix) | 37 | import Control.Monad.Fix (fix) |
37 | import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) | 38 | #ifdef THREAD_DEBUG |
39 | import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId) | ||
40 | #else | ||
41 | import GHC.Conc (labelThread) | ||
42 | import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId) | ||
43 | #endif | ||
38 | import Control.Exception (catch,handle,try,finally) | 44 | import Control.Exception (catch,handle,try,finally) |
39 | import System.IO.Error (tryIOError) | 45 | import System.IO.Error (tryIOError) |
40 | import System.Mem.Weak | 46 | import System.Mem.Weak |
@@ -113,7 +119,9 @@ streamServer cfg addr = do | |||
113 | threadDelay 5000000 | 119 | threadDelay 5000000 |
114 | loop | 120 | loop |
115 | listen sock maxListenQueue | 121 | listen sock maxListenQueue |
116 | thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 | 122 | thread <- mkWeakThreadId <=< forkIO $ do |
123 | myThreadId >>= flip labelThread "StreamServer.acceptLoop" | ||
124 | acceptLoop cfg sock 0 | ||
117 | return (ServerHandle sock thread) | 125 | return (ServerHandle sock thread) |
118 | 126 | ||
119 | -- | Not exported. This, combined with 'acceptException' form a mutually recursive | 127 | -- | Not exported. This, combined with 'acceptException' form a mutually recursive |
@@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | |||
124 | con <- accept sock | 132 | con <- accept sock |
125 | let conkey = n + 1 | 133 | let conkey = n + 1 |
126 | h <- socketToHandle (fst con) ReadWriteMode | 134 | h <- socketToHandle (fst con) ReadWriteMode |
127 | forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h | 135 | forkIO $ do |
136 | myThreadId >>= flip labelThread "StreamServer.session" | ||
137 | serverSession cfg (restrictHandleSocket h (fst con)) conkey h | ||
128 | acceptLoop cfg sock (n + 1) | 138 | acceptLoop cfg sock (n + 1) |
129 | 139 | ||
130 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | 140 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () |