diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 44 |
1 files changed, 29 insertions, 15 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index 1da40a2d..1bc9e697 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -29,6 +29,7 @@ module Network.BitTorrent.DHT.Session | |||
29 | , LogFun | 29 | , LogFun |
30 | , NodeHandler | 30 | , NodeHandler |
31 | , startNode | 31 | , startNode |
32 | , stopNode | ||
32 | 33 | ||
33 | -- * DHT | 34 | -- * DHT |
34 | -- | Use @asks options@ to get options passed to 'startNode' | 35 | -- | Use @asks options@ to get options passed to 'startNode' |
@@ -233,6 +234,8 @@ data Node ip = Node | |||
233 | -- | Pseudo-unique self-assigned session identifier. This value is | 234 | -- | Pseudo-unique self-assigned session identifier. This value is |
234 | -- constant during DHT session and (optionally) between sessions. | 235 | -- constant during DHT session and (optionally) between sessions. |
235 | , thisNodeId :: !NodeId | 236 | , thisNodeId :: !NodeId |
237 | |||
238 | , resources :: !InternalState | ||
236 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; | 239 | , manager :: !(Manager (DHT ip)) -- ^ RPC manager; |
237 | , routingTable :: !(MVar (Table ip)) -- ^ search table; | 240 | , routingTable :: !(MVar (Table ip)) -- ^ search table; |
238 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; | 241 | , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; |
@@ -243,7 +246,7 @@ data Node ip = Node | |||
243 | 246 | ||
244 | -- | DHT keep track current session and proper resource allocation for | 247 | -- | DHT keep track current session and proper resource allocation for |
245 | -- safe multithreading. | 248 | -- safe multithreading. |
246 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | 249 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } |
247 | deriving ( Functor, Applicative, Monad | 250 | deriving ( Functor, Applicative, Monad |
248 | , MonadIO, MonadBase IO | 251 | , MonadIO, MonadBase IO |
249 | , MonadReader (Node ip) | 252 | , MonadReader (Node ip) |
@@ -251,7 +254,7 @@ newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } | |||
251 | 254 | ||
252 | instance MonadBaseControl IO (DHT ip) where | 255 | instance MonadBaseControl IO (DHT ip) where |
253 | newtype StM (DHT ip) a = StM { | 256 | newtype StM (DHT ip) a = StM { |
254 | unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a | 257 | unSt :: StM (ReaderT (Node ip) IO) a |
255 | } | 258 | } |
256 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | 259 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> |
257 | cc $ \ (DHT m) -> StM <$> cc' m | 260 | cc $ \ (DHT m) -> StM <$> cc' m |
@@ -270,31 +273,42 @@ instance MonadLogger (DHT ip) where | |||
270 | 273 | ||
271 | type NodeHandler ip = Handler (DHT ip) | 274 | type NodeHandler ip = Handler (DHT ip) |
272 | 275 | ||
273 | -- | Run DHT session. Some resources like listener thread may live for | 276 | -- | Run DHT session. You /must/ properly close session using |
274 | -- some short period of time right after this DHT session closed. | 277 | -- 'stopNode' function, otherwise socket or other scarce resources may |
278 | -- leak. | ||
275 | startNode :: Address ip | 279 | startNode :: Address ip |
276 | => [NodeHandler ip] -- ^ handlers to run on accepted queries; | 280 | => [NodeHandler ip] -- ^ handlers to run on accepted queries; |
277 | -> Options -- ^ various dht options; | 281 | -> Options -- ^ various dht options; |
278 | -> NodeAddr ip -- ^ node address to bind; | 282 | -> NodeAddr ip -- ^ node address to bind; |
279 | -> LogFun -- ^ | 283 | -> LogFun -- ^ |
280 | -> ResIO (Node ip) -- ^ a new DHT node running at given address. | 284 | -> IO (Node ip) -- ^ a new DHT node running at given address. |
281 | startNode hs opts naddr logger = do | 285 | startNode hs opts naddr logger = do |
282 | -- (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager | 286 | s <- createInternalState |
283 | m <- liftIO $ newManager rpcOpts nodeAddr hs | 287 | runInternalState initNode s |
284 | myId <- liftIO genNodeId | 288 | `onException` closeInternalState s |
285 | node <- liftIO $ Node opts myId m | 289 | where |
290 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | ||
291 | nodeAddr = toSockAddr naddr | ||
292 | initNode = do | ||
293 | s <- getInternalState | ||
294 | (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager | ||
295 | liftIO $ do | ||
296 | myId <- genNodeId | ||
297 | node <- Node opts myId s m | ||
286 | <$> newMVar (nullTable myId (optBucketCount opts)) | 298 | <$> newMVar (nullTable myId (optBucketCount opts)) |
287 | <*> newTVarIO def | 299 | <*> newTVarIO def |
288 | <*> newTVarIO S.empty | 300 | <*> newTVarIO S.empty |
289 | <*> (newTVarIO =<< nullSessionTokens) | 301 | <*> (newTVarIO =<< nullSessionTokens) |
290 | <*> pure logger | 302 | <*> pure logger |
291 | runReaderT (unDHT listen) node | 303 | runReaderT (unDHT listen) node |
292 | return node | 304 | return node |
293 | where | 305 | |
294 | rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } | 306 | -- | Some resources like listener thread may live for |
295 | nodeAddr = toSockAddr naddr | 307 | -- some short period of time right after this DHT session closed. |
308 | stopNode :: Node ip -> IO () | ||
309 | stopNode Node {..} = closeInternalState resources | ||
296 | 310 | ||
297 | runDHT :: Node ip -> DHT ip a -> ResIO a | 311 | runDHT :: Node ip -> DHT ip a -> IO a |
298 | runDHT node action = runReaderT (unDHT action) node | 312 | runDHT node action = runReaderT (unDHT action) node |
299 | {-# INLINE runDHT #-} | 313 | {-# INLINE runDHT #-} |
300 | 314 | ||