diff options
author | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-19 04:07:35 +0400 |
---|---|---|
committer | Sam Truzjan <pxqr.sta@gmail.com> | 2014-02-19 04:07:35 +0400 |
commit | c34bbed3738b8ffec822abec5c2fd1b2cec8102a (patch) | |
tree | 47de369814a27389d7ce52fdf1b4bb6abc64dfa9 | |
parent | 1194d9ec25ca821274362f1795564c655b01196a (diff) |
Hide ResIO, add stopNode function
-rw-r--r-- | src/Network/BitTorrent/Client.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/Client/Types.hs | 2 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT.hs | 6 | ||||
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 44 |
4 files changed, 34 insertions, 20 deletions
diff --git a/src/Network/BitTorrent/Client.hs b/src/Network/BitTorrent/Client.hs index e1a84939..0d2e14ca 100644 --- a/src/Network/BitTorrent/Client.hs +++ b/src/Network/BitTorrent/Client.hs | |||
@@ -93,7 +93,7 @@ newClient opts @ Options {..} logger = do | |||
93 | tmap <- newMVar HM.empty | 93 | tmap <- newMVar HM.empty |
94 | tmgr <- Tracker.newManager def (PeerInfo pid Nothing optPort) | 94 | tmgr <- Tracker.newManager def (PeerInfo pid Nothing optPort) |
95 | emgr <- Exchange.newManager (exchangeOptions pid opts) connHandler | 95 | emgr <- Exchange.newManager (exchangeOptions pid opts) connHandler |
96 | node <- runResourceT $ do | 96 | node <- do |
97 | node <- startNode handlers def optNodeAddr logger | 97 | node <- startNode handlers def optNodeAddr logger |
98 | runDHT node $ bootstrap (maybeToList optBootNode) | 98 | runDHT node $ bootstrap (maybeToList optBootNode) |
99 | return node | 99 | return node |
diff --git a/src/Network/BitTorrent/Client/Types.hs b/src/Network/BitTorrent/Client/Types.hs index e80578a3..63971518 100644 --- a/src/Network/BitTorrent/Client/Types.hs +++ b/src/Network/BitTorrent/Client/Types.hs | |||
@@ -80,7 +80,7 @@ instance MonadBitTorrent BitTorrent where | |||
80 | instance MonadDHT BitTorrent where | 80 | instance MonadDHT BitTorrent where |
81 | liftDHT action = BitTorrent $ do | 81 | liftDHT action = BitTorrent $ do |
82 | node <- asks clientNode | 82 | node <- asks clientNode |
83 | liftIO $ runResourceT $ runDHT node action | 83 | liftIO $ runDHT node action |
84 | 84 | ||
85 | instance MonadLogger BitTorrent where | 85 | instance MonadLogger BitTorrent where |
86 | monadLoggerLog loc src lvl msg = BitTorrent $ do | 86 | monadLoggerLog loc src lvl msg = BitTorrent $ do |
diff --git a/src/Network/BitTorrent/DHT.hs b/src/Network/BitTorrent/DHT.hs index 8b212ee8..6c78d992 100644 --- a/src/Network/BitTorrent/DHT.hs +++ b/src/Network/BitTorrent/DHT.hs | |||
@@ -55,6 +55,7 @@ import Control.Applicative | |||
55 | import Control.Monad.Logger | 55 | import Control.Monad.Logger |
56 | import Control.Monad.Reader | 56 | import Control.Monad.Reader |
57 | import Control.Monad.Trans | 57 | import Control.Monad.Trans |
58 | import Control.Exception | ||
58 | import Data.ByteString as BS | 59 | import Data.ByteString as BS |
59 | import Data.Conduit as C | 60 | import Data.Conduit as C |
60 | import Data.Conduit.List as C | 61 | import Data.Conduit.List as C |
@@ -84,9 +85,8 @@ dht :: Address ip | |||
84 | -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; | 85 | -> DHT ip a -- ^ actions to run: 'bootstrap', 'lookup', etc; |
85 | -> IO a -- ^ result. | 86 | -> IO a -- ^ result. |
86 | dht opts addr action = do | 87 | dht opts addr action = do |
87 | runResourceT $ do | 88 | runStderrLoggingT $ LoggingT $ \ logger -> do |
88 | runStderrLoggingT $ LoggingT $ \ logger -> do | 89 | bracket (startNode handlers opts addr logger) stopNode $ \ node -> |
89 | node <- startNode handlers opts addr logger | ||
90 | runDHT node action | 90 | runDHT node action |
91 | {-# INLINE dht #-} | 91 | {-# INLINE dht #-} |
92 | 92 | ||
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 1da40a2d..1bc9e697 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -29,6 +29,7 @@ module Network.BitTorrent.DHT.Session | |||
29 | , LogFun | 29 | , LogFun |
30 | , NodeHandler | 30 | , NodeHandler |
31 | , startNode | 31 | , startNode |
32 | , stopNode | ||
32 | 33 | ||
33 | -- * DHT | 34 | -- * DHT |
34 | -- | Use @asks options@ to get options passed to 'startNode' | 35 | -- | Use @asks options@ to get options passed to 'startNode' |
@@ -233,6 +234,8 @@ data Node ip = Node | |||
233 | -- | Pseudo-unique self-assigned session identifier. This value is | 234 | -- | Pseudo-unique self-assigned session identifier. This value is |
234 | -- constant during DHT session and (optionally) between sessions. | 235 | -- constant during DHT session and (optionally) between sessions. |
235 | , thisNodeId :: !NodeId | 236 | , thisNodeId :: !NodeId |
237 | |||
238 | , resources :: !InternalState | ||
236 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; | 239 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; |
237 | , routingTable :: !(MVar (Table ip)) -- ^ search table; | 240 | , routingTable :: !(MVar (Table ip)) -- ^ search table; |
238 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; | 241 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; |
@@ -243,7 +246,7 @@ data Node ip = Node | |||
243 | 246 | ||
244 | -- | DHT keep track current session and proper resource allocation for | 247 | -- | DHT keep track current session and proper resource allocation for |
245 | -- safe multithreading. | 248 | -- safe multithreading. |
246 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | 249 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } |
247 | deriving ( Functor, Applicative, Monad | 250 | deriving ( Functor, Applicative, Monad |
248 | , MonadIO, MonadBase IO | 251 | , MonadIO, MonadBase IO |
249 | , MonadReader (Node ip) | 252 | , MonadReader (Node ip) |
@@ -251,7 +254,7 @@ newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | |||
251 | 254 | ||
252 | instance MonadBaseControl IO (DHT ip) where | 255 | instance MonadBaseControl IO (DHT ip) where |
253 | newtype StM (DHT ip) a = StM { | 256 | newtype StM (DHT ip) a = StM { |
254 | unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a | 257 | unSt :: StM (ReaderT (Node ip) IO) a |
255 | } | 258 | } |
256 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | 259 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> |
257 | cc $ \ (DHT m) -> StM <$> cc' m | 260 | cc $ \ (DHT m) -> StM <$> cc' m |
@@ -270,31 +273,42 @@ instance MonadLogger (DHT ip) where | |||
270 | 273 | ||
271 | type NodeHandler ip = Handler (DHT ip) | 274 | type NodeHandler ip = Handler (DHT ip) |
272 | 275 | ||
273 | -- | Run DHT session. Some resources like listener thread may live for | 276 | -- | Run DHT session. You /must/ properly close session using |
274 | -- some short period of time right after this DHT session closed. | 277 | -- 'stopNode' function, otherwise socket or other scarce resources may |
278 | -- leak. | ||
275 | startNode :: Address ip | 279 | startNode :: Address ip |
276 | => [NodeHandler ip] -- ^ handlers to run on accepted queries; | 280 | => [NodeHandler ip] -- ^ handlers to run on accepted queries; |
277 | -> Options -- ^ various dht options; | 281 | -> Options -- ^ various dht options; |
278 | -> NodeAddr ip -- ^ node address to bind; | 282 | -> NodeAddr ip -- ^ node address to bind; |
279 | -> LogFun -- ^ | 283 | -> LogFun -- ^ |
280 | -> ResIO (Node ip) -- ^ a new DHT node running at given address. | 284 | -> IO (Node ip) -- ^ a new DHT node running at given address. |
281 | startNode hs opts naddr logger = do | 285 | startNode hs opts naddr logger = do |
282 | -- (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager | 286 | s <- createInternalState |
283 | m <- liftIO $ newManager rpcOpts nodeAddr hs | 287 | runInternalState initNode s |
284 | myId <- liftIO genNodeId | 288 | `onException` closeInternalState s |
285 | node <- liftIO $ Node opts myId m | 289 | where |
290 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | ||
291 | nodeAddr = toSockAddr naddr | ||
292 | initNode = do | ||
293 | s <- getInternalState | ||
294 | (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager | ||
295 | liftIO $ do | ||
296 | myId <- genNodeId | ||
297 | node <- Node opts myId s m | ||
286 | <$> newMVar (nullTable myId (optBucketCount opts)) | 298 | <$> newMVar (nullTable myId (optBucketCount opts)) |
287 | <*> newTVarIO def | 299 | <*> newTVarIO def |
288 | <*> newTVarIO S.empty | 300 | <*> newTVarIO S.empty |
289 | <*> (newTVarIO =<< nullSessionTokens) | 301 | <*> (newTVarIO =<< nullSessionTokens) |
290 | <*> pure logger | 302 | <*> pure logger |
291 | runReaderT (unDHT listen) node | 303 | runReaderT (unDHT listen) node |
292 | return node | 304 | return node |
293 | where | 305 | |
294 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | 306 | -- | Some resources like listener thread may live for |
295 | nodeAddr = toSockAddr naddr | 307 | -- some short period of time right after this DHT session closed. |
308 | stopNode :: Node ip -> IO () | ||
309 | stopNode Node {..} = closeInternalState resources | ||
296 | 310 | ||
297 | runDHT :: Node ip -> DHT ip a -> ResIO a | 311 | runDHT :: Node ip -> DHT ip a -> IO a |
298 | runDHT node action = runReaderT (unDHT action) node | 312 | runDHT node action = runReaderT (unDHT action) node |
299 | {-# INLINE runDHT #-} | 313 | {-# INLINE runDHT #-} |
300 | 314 | ||