From c34bbed3738b8ffec822abec5c2fd1b2cec8102a Mon Sep 17 00:00:00 2001 From: Sam Truzjan Date: Wed, 19 Feb 2014 04:07:35 +0400 Subject: Hide ResIO, add stopNode function --- src/Network/BitTorrent/Client.hs | 2 +- src/Network/BitTorrent/Client/Types.hs | 2 +- src/Network/BitTorrent/DHT.hs | 6 ++--- src/Network/BitTorrent/DHT/Session.hs | 44 ++++++++++++++++++++++------------ 4 files changed, 34 insertions(+), 20 deletions(-) (limited to 'src/Network') diff --git a/src/Network/BitTorrent/Client.hs b/src/Network/BitTorrent/Client.hs index e1a84939..0d2e14ca 100644 --- a/src/Network/BitTorrent/Client.hs +++ b/src/Network/BitTorrent/Client.hs @@ -93,7 +93,7 @@ newClient opts @ Options {..} logger = do tmap <- newMVar HM.empty tmgr <- Tracker.newManager def (PeerInfo pid Nothing optPort) emgr <- Exchange.newManager (exchangeOptions pid opts) connHandler - node <- runResourceT $ do + node <- do node <- startNode handlers def optNodeAddr logger runDHT node $ bootstrap (maybeToList optBootNode) return node diff --git a/src/Network/BitTorrent/Client/Types.hs b/src/Network/BitTorrent/Client/Types.hs index e80578a3..63971518 100644 --- a/src/Network/BitTorrent/Client/Types.hs +++ b/src/Network/BitTorrent/Client/Types.hs @@ -80,7 +80,7 @@ instance MonadBitTorrent BitTorrent where instance MonadDHT BitTorrent where liftDHT action = BitTorrent $ do node <- asks clientNode - liftIO $ runResourceT $ runDHT node action + liftIO $ runDHT node action instance MonadLogger BitTorrent where monadLoggerLog loc src lvl msg = BitTorrent $ do diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 8b212ee8..6c78d992 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs @@ -55,6 +55,7 @@ import Control.Applicative import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans +import Control.Exception import Data.ByteString as BS import Data.Conduit as C import Data.Conduit.List as C @@ -84,9 +85,8 @@ dht :: Address ip -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; -> IO a -- ^ result. dht opts addr action = do - runResourceT $ do - runStderrLoggingT $ LoggingT $ \ logger -> do - node <- startNode handlers opts addr logger + runStderrLoggingT $ LoggingT $ \ logger -> do + bracket (startNode handlers opts addr logger) stopNode $ \ node -> runDHT node action {-# INLINE dht #-} diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 1da40a2d..1bc9e697 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs @@ -29,6 +29,7 @@ module Network.BitTorrent.DHT.Session , LogFun , NodeHandler , startNode + , stopNode -- * DHT -- | Use @asks options@ to get options passed to 'startNode' @@ -233,6 +234,8 @@ data Node ip = Node -- | Pseudo-unique self-assigned session identifier. This value is -- constant during DHT session and (optionally) between sessions. , thisNodeId :: !NodeId + + , resources :: !InternalState , manager :: !(Manager (DHT ip)) -- ^ RPC manager; , routingTable :: !(MVar (Table ip)) -- ^ search table; , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; @@ -243,7 +246,7 @@ data Node ip = Node -- | DHT keep track current session and proper resource allocation for -- safe multithreading. -newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } +newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } deriving ( Functor, Applicative, Monad , MonadIO, MonadBase IO , MonadReader (Node ip) @@ -251,7 +254,7 @@ newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } instance MonadBaseControl IO (DHT ip) where newtype StM (DHT ip) a = StM { - unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a + unSt :: StM (ReaderT (Node ip) IO) a } liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> cc $ \ (DHT m) -> StM <$> cc' m @@ -270,31 +273,42 @@ instance MonadLogger (DHT ip) where type NodeHandler ip = Handler (DHT ip) --- | Run DHT session. Some resources like listener thread may live for --- some short period of time right after this DHT session closed. +-- | Run DHT session. You /must/ properly close session using +-- 'stopNode' function, otherwise socket or other scarce resources may +-- leak. startNode :: Address ip => [NodeHandler ip] -- ^ handlers to run on accepted queries; -> Options -- ^ various dht options; -> NodeAddr ip -- ^ node address to bind; -> LogFun -- ^ - -> ResIO (Node ip) -- ^ a new DHT node running at given address. + -> IO (Node ip) -- ^ a new DHT node running at given address. startNode hs opts naddr logger = do --- (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager - m <- liftIO $ newManager rpcOpts nodeAddr hs - myId <- liftIO genNodeId - node <- liftIO $ Node opts myId m + s <- createInternalState + runInternalState initNode s + `onException` closeInternalState s + where + rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } + nodeAddr = toSockAddr naddr + initNode = do + s <- getInternalState + (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager + liftIO $ do + myId <- genNodeId + node <- Node opts myId s m <$> newMVar (nullTable myId (optBucketCount opts)) <*> newTVarIO def <*> newTVarIO S.empty <*> (newTVarIO =<< nullSessionTokens) <*> pure logger - runReaderT (unDHT listen) node - return node - where - rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } - nodeAddr = toSockAddr naddr + runReaderT (unDHT listen) node + return node + +-- | Some resources like listener thread may live for +-- some short period of time right after this DHT session closed. +stopNode :: Node ip -> IO () +stopNode Node {..} = closeInternalState resources -runDHT :: Node ip -> DHT ip a -> ResIO a +runDHT :: Node ip -> DHT ip a -> IO a runDHT node action = runReaderT (unDHT action) node {-# INLINE runDHT #-} -- cgit v1.2.3