summaryrefslogtreecommitdiff
path: root/src/Network/BitTorrent/DHT/Session.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r--src/Network/BitTorrent/DHT/Session.hs44
1 files changed, 29 insertions, 15 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs
index 1da40a2d..1bc9e697 100644
--- a/src/Network/BitTorrent/DHT/Session.hs
+++ b/src/Network/BitTorrent/DHT/Session.hs
@@ -29,6 +29,7 @@ module Network.BitTorrent.DHT.Session
29 , LogFun 29 , LogFun
30 , NodeHandler 30 , NodeHandler
31 , startNode 31 , startNode
32 , stopNode
32 33
33 -- * DHT 34 -- * DHT
34 -- | Use @asks options@ to get options passed to 'startNode' 35 -- | Use @asks options@ to get options passed to 'startNode'
@@ -233,6 +234,8 @@ data Node ip = Node
233 -- | Pseudo-unique self-assigned session identifier. This value is 234 -- | Pseudo-unique self-assigned session identifier. This value is
234 -- constant during DHT session and (optionally) between sessions. 235 -- constant during DHT session and (optionally) between sessions.
235 , thisNodeId :: !NodeId 236 , thisNodeId :: !NodeId
237
238 , resources :: !InternalState
236 , manager :: !(Manager (DHT ip)) -- ^ RPC manager; 239 , manager :: !(Manager (DHT ip)) -- ^ RPC manager;
237 , routingTable :: !(MVar (Table ip)) -- ^ search table; 240 , routingTable :: !(MVar (Table ip)) -- ^ search table;
238 , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes; 241 , contactInfo :: !(TVar (PeerStore ip)) -- ^ published by other nodes;
@@ -243,7 +246,7 @@ data Node ip = Node
243 246
244-- | DHT keep track current session and proper resource allocation for 247-- | DHT keep track current session and proper resource allocation for
245-- safe multithreading. 248-- safe multithreading.
246newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a } 249newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a }
247 deriving ( Functor, Applicative, Monad 250 deriving ( Functor, Applicative, Monad
248 , MonadIO, MonadBase IO 251 , MonadIO, MonadBase IO
249 , MonadReader (Node ip) 252 , MonadReader (Node ip)
@@ -251,7 +254,7 @@ newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) (ResourceT IO) a }
251 254
252instance MonadBaseControl IO (DHT ip) where 255instance MonadBaseControl IO (DHT ip) where
253 newtype StM (DHT ip) a = StM { 256 newtype StM (DHT ip) a = StM {
254 unSt :: StM (ReaderT (Node ip) (ResourceT IO)) a 257 unSt :: StM (ReaderT (Node ip) IO) a
255 } 258 }
256 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> 259 liftBaseWith cc = DHT $ liftBaseWith $ \ cc' ->
257 cc $ \ (DHT m) -> StM <$> cc' m 260 cc $ \ (DHT m) -> StM <$> cc' m
@@ -270,31 +273,42 @@ instance MonadLogger (DHT ip) where
270 273
271type NodeHandler ip = Handler (DHT ip) 274type NodeHandler ip = Handler (DHT ip)
272 275
273-- | Run DHT session. Some resources like listener thread may live for 276-- | Run DHT session. You /must/ properly close session using
274-- some short period of time right after this DHT session closed. 277-- 'stopNode' function, otherwise socket or other scarce resources may
278-- leak.
275startNode :: Address ip 279startNode :: Address ip
276 => [NodeHandler ip] -- ^ handlers to run on accepted queries; 280 => [NodeHandler ip] -- ^ handlers to run on accepted queries;
277 -> Options -- ^ various dht options; 281 -> Options -- ^ various dht options;
278 -> NodeAddr ip -- ^ node address to bind; 282 -> NodeAddr ip -- ^ node address to bind;
279 -> LogFun -- ^ 283 -> LogFun -- ^
280 -> ResIO (Node ip) -- ^ a new DHT node running at given address. 284 -> IO (Node ip) -- ^ a new DHT node running at given address.
281startNode hs opts naddr logger = do 285startNode hs opts naddr logger = do
282-- (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager 286 s <- createInternalState
283 m <- liftIO $ newManager rpcOpts nodeAddr hs 287 runInternalState initNode s
284 myId <- liftIO genNodeId 288 `onException` closeInternalState s
285 node <- liftIO $ Node opts myId m 289 where
290 rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) }
291 nodeAddr = toSockAddr naddr
292 initNode = do
293 s <- getInternalState
294 (_, m) <- allocate (newManager rpcOpts nodeAddr hs) closeManager
295 liftIO $ do
296 myId <- genNodeId
297 node <- Node opts myId s m
286 <$> newMVar (nullTable myId (optBucketCount opts)) 298 <$> newMVar (nullTable myId (optBucketCount opts))
287 <*> newTVarIO def 299 <*> newTVarIO def
288 <*> newTVarIO S.empty 300 <*> newTVarIO S.empty
289 <*> (newTVarIO =<< nullSessionTokens) 301 <*> (newTVarIO =<< nullSessionTokens)
290 <*> pure logger 302 <*> pure logger
291 runReaderT (unDHT listen) node 303 runReaderT (unDHT listen) node
292 return node 304 return node
293 where 305
294 rpcOpts = KRPC.def { optQueryTimeout = seconds (optTimeout opts) } 306-- | Some resources like listener thread may live for
295 nodeAddr = toSockAddr naddr 307-- some short period of time right after this DHT session closed.
308stopNode :: Node ip -> IO ()
309stopNode Node {..} = closeInternalState resources
296 310
297runDHT :: Node ip -> DHT ip a -> ResIO a 311runDHT :: Node ip -> DHT ip a -> IO a
298runDHT node action = runReaderT (unDHT action) node 312runDHT node action = runReaderT (unDHT action) node
299{-# INLINE runDHT #-} 313{-# INLINE runDHT #-}
300 314