diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Control/Concurrent/Async/Lifted/Instrument.hs | 5 | ||||
-rw-r--r-- | src/Control/Concurrent/Lifted/Instrument.hs | 78 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Query.hs | 16 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Routing.hs | 10 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 9 | ||||
-rw-r--r-- | src/Network/KRPC/Manager.hs | 11 | ||||
-rw-r--r-- | src/Network/StreamServer.hs | 16 |
7 files changed, 129 insertions, 16 deletions
diff --git a/src/Control/Concurrent/Async/Lifted/Instrument.hs b/src/Control/Concurrent/Async/Lifted/Instrument.hs new file mode 100644 index 00000000..eab0fadc --- /dev/null +++ b/src/Control/Concurrent/Async/Lifted/Instrument.hs | |||
@@ -0,0 +1,5 @@ | |||
1 | module Control.Concurrent.Async.Lifted.Instrument | ||
2 | ( module Control.Concurrent.Async.Lifted | ||
3 | ) where | ||
4 | |||
5 | import Control.Concurrent.Async.Lifted | ||
diff --git a/src/Control/Concurrent/Lifted/Instrument.hs b/src/Control/Concurrent/Lifted/Instrument.hs new file mode 100644 index 00000000..9ec5deef --- /dev/null +++ b/src/Control/Concurrent/Lifted/Instrument.hs | |||
@@ -0,0 +1,78 @@ | |||
1 | {-# LANGUAGE FlexibleContexts #-} | ||
2 | module Control.Concurrent.Lifted.Instrument | ||
3 | ( module Control.Concurrent.Lifted | ||
4 | , forkIO | ||
5 | , fork | ||
6 | , labelThread | ||
7 | , threadsInformation | ||
8 | , PerThread(..) | ||
9 | ) where | ||
10 | |||
11 | import qualified Control.Concurrent.Lifted as Raw | ||
12 | import Control.Concurrent.Lifted hiding (fork) | ||
13 | import Control.Monad.Trans.Control | ||
14 | import System.IO.Unsafe | ||
15 | import System.Mem.Weak | ||
16 | import qualified Data.Map as Map | ||
17 | import qualified Data.IntMap as IntMap | ||
18 | import Control.Exception.Lifted | ||
19 | import Control.Monad.Base | ||
20 | -- import Control.Monad.IO.Class | ||
21 | import qualified GHC.Conc as GHC | ||
22 | import Data.Time.Clock | ||
23 | |||
24 | data PerThread = PerThread | ||
25 | { -- wkid :: Weak ThreadId | ||
26 | lbl :: String | ||
27 | , startTime :: UTCTime | ||
28 | } | ||
29 | deriving (Eq,Ord,Show) -- ,Data,Generic) | ||
30 | |||
31 | data GlobalState = GlobalState | ||
32 | { threads :: Map.Map ThreadId PerThread -- IntMap.IntMap PerThread | ||
33 | -- , uniqSource :: Int | ||
34 | } | ||
35 | |||
36 | globals :: MVar GlobalState | ||
37 | globals = unsafePerformIO $ newMVar $ GlobalState | ||
38 | { threads = Map.empty | ||
39 | -- , uniqSource = 0 | ||
40 | } | ||
41 | {-# NOINLINE globals #-} | ||
42 | |||
43 | |||
44 | forkIO :: IO () -> IO ThreadId | ||
45 | forkIO = fork | ||
46 | |||
47 | fork :: MonadBaseControl IO m => m () -> m ThreadId | ||
48 | fork action = do | ||
49 | t <- Raw.fork $ do | ||
50 | -- wkid <- myThreadId >>= liftBase . mkWeakThreadId | ||
51 | -- tid <- newUniq | ||
52 | tid <- myThreadId | ||
53 | tm <- liftBase getCurrentTime | ||
54 | bracket_ (modifyThreads $ Map.insert tid (PerThread "" tm)) | ||
55 | (modifyThreads $ Map.delete tid) | ||
56 | action | ||
57 | return t | ||
58 | |||
59 | labelThread :: ThreadId -> String -> IO () | ||
60 | labelThread tid s = do | ||
61 | GHC.labelThread tid s | ||
62 | putStrLn $ "labelThread "++s++" "++show tid | ||
63 | modifyThreads $ Map.adjust (\pt -> pt { lbl = s }) tid | ||
64 | |||
65 | threadsInformation :: IO [PerThread] | ||
66 | threadsInformation = do | ||
67 | m <- threads <$> readMVar globals | ||
68 | return $ Map.elems m | ||
69 | |||
70 | -- newUniq :: MonadBaseControl IO m => m Int | ||
71 | -- newUniq = do | ||
72 | -- modifyMVar globals (\st -> return (st { uniqSource = succ (uniqSource st) }, uniqSource st)) | ||
73 | |||
74 | modifyThreads :: MonadBase IO m => (Map.Map ThreadId PerThread -> Map.Map ThreadId PerThread) -> m () | ||
75 | modifyThreads f = do | ||
76 | g <- takeMVar globals | ||
77 | let f' st = st { threads = f (threads st) } | ||
78 | putMVar globals (f' g) | ||
diff --git a/src/Network/BitTorrent/DHT/Query.hs b/src/Network/BitTorrent/DHT/Query.hs index 73b3d492..533068c6 100644 --- a/src/Network/BitTorrent/DHT/Query.hs +++ b/src/Network/BitTorrent/DHT/Query.hs | |||
@@ -9,6 +9,7 @@ | |||
9 | -- Normally, you don't need to import this module, use | 9 | -- Normally, you don't need to import this module, use |
10 | -- "Network.BitTorrent.DHT" instead. | 10 | -- "Network.BitTorrent.DHT" instead. |
11 | -- | 11 | -- |
12 | {-# LANGUAGE CPP #-} | ||
12 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
13 | {-# LANGUAGE ScopedTypeVariables #-} | 14 | {-# LANGUAGE ScopedTypeVariables #-} |
14 | {-# LANGUAGE TemplateHaskell #-} | 15 | {-# LANGUAGE TemplateHaskell #-} |
@@ -49,7 +50,12 @@ module Network.BitTorrent.DHT.Query | |||
49 | , (<@>) | 50 | , (<@>) |
50 | ) where | 51 | ) where |
51 | 52 | ||
53 | #ifdef THREAD_DEBUG | ||
54 | import Control.Concurrent.Lifted.Instrument hiding (yield) | ||
55 | #else | ||
56 | import GHC.Conc (labelThread) | ||
52 | import Control.Concurrent.Lifted hiding (yield) | 57 | import Control.Concurrent.Lifted hiding (yield) |
58 | #endif | ||
53 | import Control.Exception.Lifted hiding (Handler) | 59 | import Control.Exception.Lifted hiding (Handler) |
54 | import Control.Monad.Reader | 60 | import Control.Monad.Reader |
55 | import Control.Monad.Logger | 61 | import Control.Monad.Logger |
@@ -225,8 +231,8 @@ refreshNodes nid = do | |||
225 | 231 | ||
226 | -- | This operation do not block but acquire exclusive access to | 232 | -- | This operation do not block but acquire exclusive access to |
227 | -- routing table. | 233 | -- routing table. |
228 | insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip ThreadId | 234 | insertNode :: forall ip. Address ip => NodeInfo ip -> Maybe ReflectedIP -> DHT ip () |
229 | insertNode info witnessed_ip0 = fork $ do | 235 | insertNode info witnessed_ip0 = do |
230 | var <- asks routingInfo | 236 | var <- asks routingInfo |
231 | tm <- getTimestamp | 237 | tm <- getTimestamp |
232 | let showTable = do | 238 | let showTable = do |
@@ -286,8 +292,10 @@ insertNode info witnessed_ip0 = fork $ do | |||
286 | return ps | 292 | return ps |
287 | ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 | 293 | ps <- join $ liftIO $ atomically $ atomicInsert arrival0 witnessed_ip0 |
288 | showTable | 294 | showTable |
289 | _ <- fork $ forM_ ps $ \(CheckPing ns)-> do | 295 | _ <- fork $ do |
290 | forM_ ns $ \n -> do | 296 | myThreadId >>= liftIO . flip labelThread "DHT.insertNode.pingResults" |
297 | forM_ ps $ \(CheckPing ns)-> do | ||
298 | forM_ ns $ \n -> do | ||
291 | (b,mip) <- probeNode (nodeAddr n) | 299 | (b,mip) <- probeNode (nodeAddr n) |
292 | let alive = PingResult n b | 300 | let alive = PingResult n b |
293 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) | 301 | $(logDebugS) "insertNode" $ T.pack ("PingResult "++show (nodeId n,b)) |
diff --git a/src/Network/BitTorrent/DHT/Routing.hs b/src/Network/BitTorrent/DHT/Routing.hs index 8a6849a1..d64e415e 100644 --- a/src/Network/BitTorrent/DHT/Routing.hs +++ b/src/Network/BitTorrent/DHT/Routing.hs | |||
@@ -529,11 +529,11 @@ splitTip nid n i bucket | |||
529 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia | 529 | -- k nodes in them. Which subtrees I mean is illustrated in Fig 1. of Kademlia |
530 | -- paper. The rule requiring additional splits is in section 2.4. | 530 | -- paper. The rule requiring additional splits is in section 2.4. |
531 | modifyBucket | 531 | modifyBucket |
532 | :: forall f ip xs. (Alternative f, Eq ip) => | 532 | :: forall ip xs. (Eq ip) => |
533 | NodeId -> (Bucket ip -> f (xs, Bucket ip)) -> Table ip -> f (xs,Table ip) | 533 | NodeId -> (Bucket ip -> Maybe (xs, Bucket ip)) -> Table ip -> Maybe (xs,Table ip) |
534 | modifyBucket nodeId f = go (0 :: BitIx) | 534 | modifyBucket nodeId f = go (0 :: BitIx) |
535 | where | 535 | where |
536 | go :: BitIx -> Table ip -> f (xs, Table ip) | 536 | go :: BitIx -> Table ip -> Maybe (xs, Table ip) |
537 | go i (Zero table bucket) | 537 | go i (Zero table bucket) |
538 | | testIdBit nodeId i = second (Zero table) <$> f bucket | 538 | | testIdBit nodeId i = second (Zero table) <$> f bucket |
539 | | otherwise = second (`Zero` bucket) <$> go (succ i) table | 539 | | otherwise = second (`Zero` bucket) <$> go (succ i) table |
@@ -562,8 +562,8 @@ data CheckPing ip = CheckPing [NodeInfo ip] | |||
562 | 562 | ||
563 | 563 | ||
564 | -- | Atomic 'Table' update | 564 | -- | Atomic 'Table' update |
565 | insert :: (Alternative m, Eq ip) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) | 565 | insert :: (Eq ip, Applicative m) => Timestamp -> Event ip -> Table ip -> m ([CheckPing ip], Table ip) |
566 | insert tm event = modifyBucket (eventId event) (insertBucket tm event) | 566 | insert tm event tbl = pure $ fromMaybe ([],tbl) $ modifyBucket (eventId event) (insertBucket tm event) tbl |
567 | 567 | ||
568 | 568 | ||
569 | {----------------------------------------------------------------------- | 569 | {----------------------------------------------------------------------- |
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index c08021c7..bad783a5 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -72,7 +72,11 @@ module Network.BitTorrent.DHT.Session | |||
72 | import Prelude hiding (ioError) | 72 | import Prelude hiding (ioError) |
73 | 73 | ||
74 | import Control.Concurrent.STM | 74 | import Control.Concurrent.STM |
75 | #ifdef THREAD_DEBUG | ||
76 | import Control.Concurrent.Async.Lifted.Instrument | ||
77 | #else | ||
75 | import Control.Concurrent.Async.Lifted | 78 | import Control.Concurrent.Async.Lifted |
79 | #endif | ||
76 | import Control.Exception.Lifted hiding (Handler) | 80 | import Control.Exception.Lifted hiding (Handler) |
77 | import Control.Monad.Base | 81 | import Control.Monad.Base |
78 | import Control.Monad.Logger | 82 | import Control.Monad.Logger |
@@ -395,8 +399,9 @@ routableAddress = do | |||
395 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId | 399 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip NodeId |
396 | myNodeIdAccordingTo _ = do | 400 | myNodeIdAccordingTo _ = do |
397 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 401 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
398 | fallback <- asks tentativeNodeId | 402 | maybe (asks tentativeNodeId) |
399 | return $ maybe fallback myNodeId info | 403 | (return . myNodeId) |
404 | info | ||
400 | 405 | ||
401 | -- | Get current routing table. Normally you don't need to use this | 406 | -- | Get current routing table. Normally you don't need to use this |
402 | -- function, but it can be usefull for debugging and profiling purposes. | 407 | -- function, but it can be usefull for debugging and profiling purposes. |
diff --git a/src/Network/KRPC/Manager.hs b/src/Network/KRPC/Manager.hs index 4852eb38..22d111e2 100644 --- a/src/Network/KRPC/Manager.hs +++ b/src/Network/KRPC/Manager.hs | |||
@@ -7,6 +7,7 @@ | |||
7 | -- | 7 | -- |
8 | -- Normally, you don't need to import this module. | 8 | -- Normally, you don't need to import this module. |
9 | -- | 9 | -- |
10 | {-# LANGUAGE CPP #-} | ||
10 | {-# LANGUAGE OverloadedStrings #-} | 11 | {-# LANGUAGE OverloadedStrings #-} |
11 | {-# LANGUAGE FlexibleInstances #-} | 12 | {-# LANGUAGE FlexibleInstances #-} |
12 | {-# LANGUAGE FlexibleContexts #-} | 13 | {-# LANGUAGE FlexibleContexts #-} |
@@ -41,8 +42,12 @@ module Network.KRPC.Manager | |||
41 | ) where | 42 | ) where |
42 | 43 | ||
43 | import Control.Applicative | 44 | import Control.Applicative |
44 | import Control.Concurrent | 45 | #ifdef THREAD_DEBUG |
45 | import Control.Concurrent.Lifted (fork) | 46 | import Control.Concurrent.Lifted.Instrument |
47 | #else | ||
48 | import GHC.Conc (labelThread) | ||
49 | import Control.Concurrent.Lifted | ||
50 | #endif | ||
46 | import Control.Exception hiding (Handler) | 51 | import Control.Exception hiding (Handler) |
47 | import qualified Control.Exception.Lifted as E (Handler (..)) | 52 | import qualified Control.Exception.Lifted as E (Handler (..)) |
48 | import Control.Exception.Lifted as Lifted (catches, finally) | 53 | import Control.Exception.Lifted as Lifted (catches, finally) |
@@ -432,6 +437,7 @@ dispatchHandler q @ KQuery {..} addr = do | |||
432 | -- | 437 | -- |
433 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () | 438 | handleQuery :: MonadKRPC h m => BValue -> KQuery -> SockAddr -> m () |
434 | handleQuery raw q addr = void $ fork $ do | 439 | handleQuery raw q addr = void $ fork $ do |
440 | myThreadId >>= liftIO . flip labelThread "KRPC.handleQuery" | ||
435 | Manager {..} <- getManager | 441 | Manager {..} <- getManager |
436 | res <- dispatchHandler q addr | 442 | res <- dispatchHandler q addr |
437 | let resbe = either toBEncode toBEncode res | 443 | let resbe = either toBEncode toBEncode res |
@@ -481,6 +487,7 @@ listen :: MonadKRPC h m => m () | |||
481 | listen = do | 487 | listen = do |
482 | Manager {..} <- getManager | 488 | Manager {..} <- getManager |
483 | tid <- fork $ do | 489 | tid <- fork $ do |
490 | myThreadId >>= liftIO . flip labelThread "KRPC.listen" | ||
484 | listener `Lifted.finally` | 491 | listener `Lifted.finally` |
485 | liftIO (takeMVar listenerThread) | 492 | liftIO (takeMVar listenerThread) |
486 | liftIO $ putMVar listenerThread tid | 493 | liftIO $ putMVar listenerThread tid |
diff --git a/src/Network/StreamServer.hs b/src/Network/StreamServer.hs index a6cead0e..34b9388e 100644 --- a/src/Network/StreamServer.hs +++ b/src/Network/StreamServer.hs | |||
@@ -1,4 +1,5 @@ | |||
1 | -- | This module implements a bare-bones TCP or Unix socket server. | 1 | -- | This module implements a bare-bones TCP or Unix socket server. |
2 | {-# LANGUAGE CPP #-} | ||
2 | {-# LANGUAGE TypeFamilies #-} | 3 | {-# LANGUAGE TypeFamilies #-} |
3 | {-# LANGUAGE TypeOperators #-} | 4 | {-# LANGUAGE TypeOperators #-} |
4 | {-# LANGUAGE OverloadedStrings #-} | 5 | {-# LANGUAGE OverloadedStrings #-} |
@@ -34,7 +35,12 @@ import System.IO | |||
34 | ) | 35 | ) |
35 | import Control.Monad | 36 | import Control.Monad |
36 | import Control.Monad.Fix (fix) | 37 | import Control.Monad.Fix (fix) |
37 | import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId) | 38 | #ifdef THREAD_DEBUG |
39 | import Control.Concurrent.Lifted.Instrument (forkIO, threadDelay, ThreadId, mkWeakThreadId, labelThread, myThreadId) | ||
40 | #else | ||
41 | import GHC.Conc (labelThread) | ||
42 | import Control.Concurrent (forkIO, threadDelay, ThreadId, mkWeakThreadId, myThreadId) | ||
43 | #endif | ||
38 | import Control.Exception (catch,handle,try,finally) | 44 | import Control.Exception (catch,handle,try,finally) |
39 | import System.IO.Error (tryIOError) | 45 | import System.IO.Error (tryIOError) |
40 | import System.Mem.Weak | 46 | import System.Mem.Weak |
@@ -113,7 +119,9 @@ streamServer cfg addr = do | |||
113 | threadDelay 5000000 | 119 | threadDelay 5000000 |
114 | loop | 120 | loop |
115 | listen sock maxListenQueue | 121 | listen sock maxListenQueue |
116 | thread <- mkWeakThreadId <=< forkIO $ acceptLoop cfg sock 0 | 122 | thread <- mkWeakThreadId <=< forkIO $ do |
123 | myThreadId >>= flip labelThread "StreamServer.acceptLoop" | ||
124 | acceptLoop cfg sock 0 | ||
117 | return (ServerHandle sock thread) | 125 | return (ServerHandle sock thread) |
118 | 126 | ||
119 | -- | Not exported. This, combined with 'acceptException' form a mutually recursive | 127 | -- | Not exported. This, combined with 'acceptException' form a mutually recursive |
@@ -124,7 +132,9 @@ acceptLoop cfg sock n = handle (acceptException cfg n sock) $ do | |||
124 | con <- accept sock | 132 | con <- accept sock |
125 | let conkey = n + 1 | 133 | let conkey = n + 1 |
126 | h <- socketToHandle (fst con) ReadWriteMode | 134 | h <- socketToHandle (fst con) ReadWriteMode |
127 | forkIO $ serverSession cfg (restrictHandleSocket h (fst con)) conkey h | 135 | forkIO $ do |
136 | myThreadId >>= flip labelThread "StreamServer.session" | ||
137 | serverSession cfg (restrictHandleSocket h (fst con)) conkey h | ||
128 | acceptLoop cfg sock (n + 1) | 138 | acceptLoop cfg sock (n + 1) |
129 | 139 | ||
130 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () | 140 | acceptException :: ServerConfig -> Int -> Socket -> IOError -> IO () |