summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Control/Concurrent/Async/Lifted/Instrument.hs5
-rw-r--r--src/Control/Concurrent/Lifted/Instrument.hs78
-rw-r--r--src/Network/BitTorrent/DHT/Query.hs16
-rw-r--r--src/Network/BitTorrent/DHT/Routing.hs10
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs9
-rw-r--r--src/Network/KRPC/Manager.hs11
-rw-r--r--src/Network/StreamServer.hs16
7 files changed, 129 insertions, 16 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs
new file mode 100644
index 00000000..eab0fadc
--- /dev/null
+++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs
@@ -0,0 +1,5 @@
1module Control.Concurrent.Async.Lifted.Instrument
2 ( module Control.Concurrent.Async.Lifted
3 ) where
4
5import Control.Concurrent.Async.Lifted
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs
new file mode 100644
index 00000000..9ec5deef
--- /dev/null
+++ b/src/Control/Concurrent/Lifted/Instrument.hs
@@ -0,0 +1,78 @@
1{-# LANGUAGE FlexibleContexts #-}
2module Control.Concurrent.Lifted.Instrument
3 ( module Control.Concurrent.Lifted
4 , forkIO
5 , fork
6 , labelThread
7 , threadsInformation
8 , PerThread(..)
9 ) where
10
11import qualified Control.Concurrent.Lifted as Raw
12import Control.Concurrent.Lifted hiding (fork)
13import Control.Monad.Trans.Control
14import System.IO.Unsafe
15import System.Mem.Weak
16import qualified Data.Map as Map
17import qualified Data.IntMap as IntMap
18import Control.Exception.Lifted
19import Control.Monad.Base
20-- import Control.Monad.IO.Class
21import qualified GHC.Conc as GHC
22import Data.Time.Clock
23
24data PerThread = PerThread
25 { -- wkid :: Weak ThreadId
26 lbl :: String
27 , startTime :: UTCTime
28 }
29 deriving (Eq,Ord,Show) -- ,Data,Generic)
30
31data GlobalState = GlobalState
32 { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread
33 -- , uniqSource :: Int
34 }
35
36globals :: MVar GlobalState
37globals = unsafePerformIO $ newMVar $ GlobalState
38 { threads = Map.empty
39 -- , uniqSource = 0
40 }
41{-# NOINLINE globals #-}
42
43
44forkIO :: IO () -> IO ThreadId
45forkIO = fork
46
47fork :: MonadBaseControl IO m => m () -> m ThreadId
48fork action = do
49 t <- Raw.fork $ do
50 -- wkid <- myThreadId >>= liftBase . mkWeakThreadId
51 -- tid <- newUniq
52 tid <- myThreadId
53 tm <- liftBase getCurrentTime
54 bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm))
55 (modifyThreads $ Map.delete tid)
56 action
57 return t
58
59labelThread :: ThreadId -> String -> IO ()
60labelThread tid s = do
61 GHC.labelThread tid s
62 putStrLn $ "labelThread "++s++" "++show tid
63 modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid
64
65threadsInformation :: IO [PerThread]
66threadsInformation = do
67 m <- threads <$> readMVar globals
68 return $ Map.elems m
69
70-- newUniq :: MonadBaseControl IO m => m Int
71-- newUniq = do
72-- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st))
73
74modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m ()
75modifyThreads f = do
76 g <- takeMVar globals
77 let f' st = st { threads = f (threads st) }
78 putMVar globals (f' g)
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs
index 73b3d492..533068c6 100644
--- a/src/Network/BitTorrent/DHT/Query.hs
+++ b/src/Network/BitTorrent/DHT/Query.hs
@@ -9,6 +9,7 @@
9-- Normally, you don't need to import this module, use 9-- Normally, you don't need to import this module, use
10-- "Network.BitTorrent.DHT" instead. 10-- "Network.BitTorrent.DHT" instead.
11-- 11--
12{-# LANGUAGE CPP #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
13{-# LANGUAGE ScopedTypeVariables #-} 14{-# LANGUAGE ScopedTypeVariables #-}
14{-# LANGUAGE TemplateHaskell #-} 15{-# LANGUAGE TemplateHaskell #-}
@@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query
49 , (<@>) 50 , (<@>)
50 ) where 51 ) where
51 52
53#ifdef THREAD_DEBUG
54import Control.Concurrent.Lifted.Instrument hiding (yield)
55#else
56import GHC.Conc (labelThread)
52import Control.Concurrent.Lifted hiding (yield) 57import Control.Concurrent.Lifted hiding (yield)
58#endif
53import Control.Exception.Lifted hiding (Handler) 59import Control.Exception.Lifted hiding (Handler)
54import Control.Monad.Reader 60import Control.Monad.Reader
55import Control.Monad.Logger 61import Control.Monad.Logger
@@ -225,8 +231,8 @@ refreshNodes nid = do
225 231
226-- | This operation do not block but acquire exclusive access to 232-- | This operation do not block but acquire exclusive access to
227-- routing table. 233-- routing table.
228insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId 234insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ()
229insertNode info witnessed_ip0 = fork $ do 235insertNode info witnessed_ip0 = do
230 var <- asks routingInfo 236 var <- asks routingInfo
231 tm <- getTimestamp 237 tm <- getTimestamp
232 let showTable = do 238 let showTable = do
@@ -286,8 +292,10 @@ insertNode info witnessed_ip0 = fork $ do
286 return ps 292 return ps
287 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 293 ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0
288 showTable 294 showTable
289 _ <- fork $ forM_ ps $ \(CheckPing ns)-> do 295 _ <- fork $ do
290 forM_ ns $ \n -> do 296 myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults"
297 forM_ ps $ \(CheckPing ns)-> do
298 forM_ ns $ \n -> do
291 (b,mip) <- probeNode (nodeAddr n) 299 (b,mip) <- probeNode (nodeAddr n)
292 let alive = PingResult n b 300 let alive = PingResult n b
293 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) 301 $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b))
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs
index 8a6849a1..d64e415e 100644
--- a/src/Network/BitTorrent/DHT/Routing.hs
+++ b/src/Network/BitTorrent/DHT/Routing.hs
@@ -529,11 +529,11 @@ splitTip nid n i bucket
529-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia 529-- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia
530-- paper. The rule requiring additional splits is in section 2.4. 530-- paper. The rule requiring additional splits is in section 2.4.
531modifyBucket 531modifyBucket
532 :: forall f ip xs. (Alternative f, Eq ip) => 532 :: forall ip xs. (Eq ip) =>
533 NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) 533 NodeId -> (Bucket ip -> Maybe (xs, Bucket ip)) -> Table ip -> Maybe (xs,Table ip)
534modifyBucket nodeId f = go (0 :: BitIx) 534modifyBucket nodeId f = go (0 :: BitIx)
535 where 535 where
536 go :: BitIx -> Table ip -> f (xs, Table ip) 536 go :: BitIx -> Table ip -> Maybe (xs, Table ip)
537 go i (Zero table bucket) 537 go i (Zero table bucket)
538 | testIdBit nodeId i = second (Zero table) <$> f bucket 538 | testIdBit nodeId i = second (Zero table) <$> f bucket
539 | otherwise = second (`Zero` bucket) <$> go (succ i) table 539 | otherwise = second (`Zero` bucket) <$> go (succ i) table
@@ -562,8 +562,8 @@ data CheckPing ip = CheckPing [NodeInfo ip]
562 562
563 563
564-- | Atomic 'Table' update 564-- | Atomic 'Table' update
565insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) 565insert :: (Eq ip, Applicative m) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip)
566insert tm event = modifyBucket (eventId event) (insertBucket tm event) 566insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl
567 567
568 568
569{----------------------------------------------------------------------- 569{-----------------------------------------------------------------------
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index c08021c7..bad783a5 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session
72import Prelude hiding (ioError) 72import Prelude hiding (ioError)
73 73
74import Control.Concurrent.STM 74import Control.Concurrent.STM
75#ifdef THREAD_DEBUG
76import Control.Concurrent.Async.Lifted.Instrument
77#else
75import Control.Concurrent.Async.Lifted 78import Control.Concurrent.Async.Lifted
79#endif
76import Control.Exception.Lifted hiding (Handler) 80import Control.Exception.Lifted hiding (Handler)
77import Control.Monad.Base 81import Control.Monad.Base
78import Control.Monad.Logger 82import Control.Monad.Logger
@@ -395,8 +399,9 @@ routableAddress = do
395myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId 399myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId
396myNodeIdAccordingTo _ = do 400myNodeIdAccordingTo _ = do
397 info <- asks routingInfo >>= liftIO . atomically . readTVar 401 info <- asks routingInfo >>= liftIO . atomically . readTVar
398 fallback <- asks tentativeNodeId 402 maybe (asks tentativeNodeId)
399 return $ maybe fallback myNodeId info 403 (return . myNodeId)
404 info
400 405
401-- | Get current routing table. Normally you don't need to use this 406-- | Get current routing table. Normally you don't need to use this
402-- function, but it can be usefull for debugging and profiling purposes. 407-- function, but it can be usefull for debugging and profiling purposes.
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs
index 4852eb38..22d111e2 100644
--- a/src/Network/KRPC/Manager.hs
+++ b/src/Network/KRPC/Manager.hs
@@ -7,6 +7,7 @@
7-- 7--
8-- Normally, you don't need to import this module. 8-- Normally, you don't need to import this module.
9-- 9--
10{-# LANGUAGE CPP #-}
10{-# LANGUAGE OverloadedStrings #-} 11{-# LANGUAGE OverloadedStrings #-}
11{-# LANGUAGE FlexibleInstances #-} 12{-# LANGUAGE FlexibleInstances #-}
12{-# LANGUAGE FlexibleContexts #-} 13{-# LANGUAGE FlexibleContexts #-}
@@ -41,8 +42,12 @@ module Network.KRPC.Manager
41 ) where 42 ) where
42 43
43import Control.Applicative 44import Control.Applicative
44import Control.Concurrent 45#ifdef THREAD_DEBUG
45import Control.Concurrent.Lifted (fork) 46import Control.Concurrent.Lifted.Instrument
47#else
48import GHC.Conc (labelThread)
49import Control.Concurrent.Lifted
50#endif
46import Control.Exception hiding (Handler) 51import Control.Exception hiding (Handler)
47import qualified Control.Exception.Lifted as E (Handler (..)) 52import qualified Control.Exception.Lifted as E (Handler (..))
48import Control.Exception.Lifted as Lifted (catches, finally) 53import Control.Exception.Lifted as Lifted (catches, finally)
@@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do
432-- 437--
433handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () 438handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m ()
434handleQuery raw q addr = void $ fork $ do 439handleQuery raw q addr = void $ fork $ do
440 myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery"
435 Manager {..} <- getManager 441 Manager {..} <- getManager
436 res <- dispatchHandler q addr 442 res <- dispatchHandler q addr
437 let resbe = either toBEncode toBEncode res 443 let resbe = either toBEncode toBEncode res
@@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m ()
481listen = do 487listen = do
482 Manager {..} <- getManager 488 Manager {..} <- getManager
483 tid <- fork $ do 489 tid <- fork $ do
490 myThreadId >>= liftIO . flip labelThread "KRPC.listen"
484 listener `Lifted.finally` 491 listener `Lifted.finally`
485 liftIO (takeMVar listenerThread) 492 liftIO (takeMVar listenerThread)
486 liftIO $ putMVar listenerThread tid 493 liftIO $ putMVar listenerThread tid
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs
index a6cead0e..34b9388e 100644
--- a/src/Network/StreamServer.hs
+++ b/src/Network/StreamServer.hs
@@ -1,4 +1,5 @@
1-- | This module implements a bare-bones TCP or Unix socket server. 1-- | This module implements a bare-bones TCP or Unix socket server.
2{-# LANGUAGE CPP #-}
2{-# LANGUAGE TypeFamilies #-} 3{-# LANGUAGE TypeFamilies #-}
3{-# LANGUAGE TypeOperators #-} 4{-# LANGUAGE TypeOperators #-}
4{-# LANGUAGE OverloadedStrings #-} 5{-# LANGUAGE OverloadedStrings #-}
@@ -34,7 +35,12 @@ import System.IO
34 ) 35 )
35import Control.Monad 36import Control.Monad
36import Control.Monad.Fix (fix) 37import Control.Monad.Fix (fix)
37import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) 38#ifdef THREAD_DEBUG
39import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId)
40#else
41import GHC.Conc (labelThread)
42import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId)
43#endif
38import Control.Exception (catch,handle,try,finally) 44import Control.Exception (catch,handle,try,finally)
39import System.IO.Error (tryIOError) 45import System.IO.Error (tryIOError)
40import System.Mem.Weak 46import System.Mem.Weak
@@ -113,7 +119,9 @@ streamServer cfg addr = do
113 threadDelay 5000000 119 threadDelay 5000000
114 loop 120 loop
115 listen sock maxListenQueue 121 listen sock maxListenQueue
116 thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 122 thread <- mkWeakThreadId <=< forkIO $ do
123 myThreadId >>= flip labelThread "StreamServer.acceptLoop"
124 acceptLoop cfg sock 0
117 return (ServerHandle sock thread) 125 return (ServerHandle sock thread)
118 126
119-- | Not exported. This, combined with 'acceptException' form a mutually recursive 127-- | Not exported. This, combined with 'acceptException' form a mutually recursive
@@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do
124 con <- accept sock 132 con <- accept sock
125 let conkey = n + 1 133 let conkey = n + 1
126 h <- socketToHandle (fst con) ReadWriteMode 134 h <- socketToHandle (fst con) ReadWriteMode
127 forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h 135 forkIO $ do
136 myThreadId >>= flip labelThread "StreamServer.session"
137 serverSession cfg (restrictHandleSocket h (fst con)) conkey h
128 acceptLoop cfg sock (n + 1) 138 acceptLoop cfg sock (n + 1)
129 139
130acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () 140acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO ()