diff options
Diffstat (limited to 'src/Network/BitTorrent/DHT/Session.hs')
-rw-r--r-- | src/Network/BitTorrent/DHT/Session.hs | 114 |
1 files changed, 49 insertions, 65 deletions
diff --git a/src/Network/BitTorrent/DHT/Session.hs b/src/Network/BitTorrent/DHT/Session.hs index d4794038..f96ba707 100644 --- a/src/Network/BitTorrent/DHT/Session.hs +++ b/src/Network/BitTorrent/DHT/Session.hs | |||
@@ -96,6 +96,7 @@ import Control.Monad.Trans.Control | |||
96 | import Control.Monad.Trans.Resource | 96 | import Control.Monad.Trans.Resource |
97 | import Data.Typeable | 97 | import Data.Typeable |
98 | import Data.String | 98 | import Data.String |
99 | import Data.Bits | ||
99 | import Data.ByteString | 100 | import Data.ByteString |
100 | import Data.Conduit.Lazy | 101 | import Data.Conduit.Lazy |
101 | import Data.Default | 102 | import Data.Default |
@@ -265,26 +266,17 @@ type AnnounceSet = Set (InfoHash, PortNumber) | |||
265 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () | 266 | type LogFun = Loc -> LogSource -> LogLevel -> LogStr -> IO () |
266 | 267 | ||
267 | -- | DHT session keep track state of /this/ node. | 268 | -- | DHT session keep track state of /this/ node. |
268 | data Node ip = Node | 269 | data Node raw dht u ip = Node |
269 | { -- | Session configuration; | 270 | { -- | Session configuration; |
270 | options :: !Options | 271 | options :: !Options |
271 | 272 | ||
272 | -- | Pseudo-unique self-assigned session identifier. This value is | 273 | -- | Pseudo-unique self-assigned session identifier. This value is |
273 | -- constant during DHT session and (optionally) between sessions. | 274 | -- constant during DHT session and (optionally) between sessions. |
274 | #ifdef VERSION_bencoding | 275 | , tentativeNodeId :: !(NodeId dht) |
275 | , tentativeNodeId :: !(NodeId KMessageOf) | ||
276 | #else | ||
277 | , tentativeNodeId :: !(NodeId Tox.Message) | ||
278 | #endif | ||
279 | 276 | ||
280 | , resources :: !InternalState | 277 | , resources :: !InternalState |
281 | #ifdef VERSION_bencoding | 278 | , manager :: !(Manager raw dht) -- ^ RPC manager; |
282 | , manager :: !(Manager IO BValue KMessageOf) -- ^ RPC manager; | 279 | , routingInfo :: !(TVar (Maybe (R.Info dht ip u))) -- ^ search table; |
283 | , routingInfo :: !(TVar (Maybe (R.Info KMessageOf ip ()))) -- ^ search table; | ||
284 | #else | ||
285 | , manager :: !(Manager (DHT ip) ByteString Tox.Message) -- ^ RPC manager; | ||
286 | , routingInfo :: !(TVar (Maybe (R.Info Tox.Message ip Bool))) -- ^ search table; | ||
287 | #endif | ||
288 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; | 280 | , contactInfo :: !(TVar (PeerStore ip )) -- ^ published by other nodes; |
289 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; | 281 | , announceInfo :: !(TVar AnnounceSet ) -- ^ to publish by this node; |
290 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. | 282 | , sessionTokens :: !(TVar SessionTokens ) -- ^ query session IDs. |
@@ -293,23 +285,23 @@ data Node ip = Node | |||
293 | 285 | ||
294 | -- | DHT keep track current session and proper resource allocation for | 286 | -- | DHT keep track current session and proper resource allocation for |
295 | -- safe multithreading. | 287 | -- safe multithreading. |
296 | newtype DHT ip a = DHT { unDHT :: ReaderT (Node ip) IO a } | 288 | newtype DHT raw dht u ip a = DHT { unDHT :: ReaderT (Node raw dht u ip) IO a } |
297 | deriving ( Functor, Applicative, Monad, MonadIO | 289 | deriving ( Functor, Applicative, Monad, MonadIO |
298 | , MonadBase IO, MonadReader (Node ip), MonadThrow | 290 | , MonadBase IO, MonadReader (Node raw dht u ip), MonadThrow |
299 | ) | 291 | ) |
300 | 292 | ||
301 | #if MIN_VERSION_monad_control(1,0,0) | 293 | #if MIN_VERSION_monad_control(1,0,0) |
302 | newtype DHTStM ip a = StM { | 294 | newtype DHTStM raw dht u ip a = StM { |
303 | unSt :: StM (ReaderT (Node ip) IO) a | 295 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
304 | } | 296 | } |
305 | #endif | 297 | #endif |
306 | 298 | ||
307 | instance MonadBaseControl IO (DHT ip) where | 299 | instance MonadBaseControl IO (DHT raw dht u ip) where |
308 | #if MIN_VERSION_monad_control(1,0,0) | 300 | #if MIN_VERSION_monad_control(1,0,0) |
309 | type StM (DHT ip) a = DHTStM ip a | 301 | type StM (DHT raw dht u ip) a = DHTStM raw dht u ip a |
310 | #else | 302 | #else |
311 | newtype StM (DHT ip) a = StM { | 303 | newtype StM (DHT raw dht u ip) a = StM { |
312 | unSt :: StM (ReaderT (Node ip) IO) a | 304 | unSt :: StM (ReaderT (Node raw dht u ip) IO) a |
313 | } | 305 | } |
314 | #endif | 306 | #endif |
315 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> | 307 | liftBaseWith cc = DHT $ liftBaseWith $ \ cc' -> |
@@ -321,22 +313,22 @@ instance MonadBaseControl IO (DHT ip) where | |||
321 | 313 | ||
322 | -- | Check is it is possible to run 'queryNode' or handle pending | 314 | -- | Check is it is possible to run 'queryNode' or handle pending |
323 | -- query from remote node. | 315 | -- query from remote node. |
324 | instance MonadActive (DHT ip) where | 316 | instance MonadActive (DHT raw dht u ip) where |
325 | monadActive = getManager >>= liftIO . isActive | 317 | monadActive = getManager >>= liftIO . isActive |
326 | {-# INLINE monadActive #-} | 318 | {-# INLINE monadActive #-} |
327 | 319 | ||
328 | -- | All allocated resources will be closed at 'closeNode'. | 320 | -- | All allocated resources will be closed at 'closeNode'. |
329 | instance MonadResource (DHT ip) where | 321 | instance MonadResource (DHT raw dht u ip) where |
330 | liftResourceT m = do | 322 | liftResourceT m = do |
331 | s <- asks resources | 323 | s <- asks resources |
332 | liftIO $ runInternalState m s | 324 | liftIO $ runInternalState m s |
333 | 325 | ||
334 | -- instance MonadKRPC (DHT ip) (DHT ip) BValue KMessageOf where | 326 | -- instance MonadKRPC (DHT raw dht u ip) (DHT raw dht u ip) BValue KMessageOf where |
335 | 327 | ||
336 | getManager :: DHT ip (Manager IO BValue KMessageOf) | 328 | getManager :: DHT raw dht u ip (Manager raw dht) |
337 | getManager = asks manager | 329 | getManager = asks manager |
338 | 330 | ||
339 | instance MonadLogger (DHT ip) where | 331 | instance MonadLogger (DHT raw dht u ip) where |
340 | monadLoggerLog loc src lvl msg = do | 332 | monadLoggerLog loc src lvl msg = do |
341 | logger <- asks loggerFun | 333 | logger <- asks loggerFun |
342 | liftIO $ logger loc src lvl (toLogStr msg) | 334 | liftIO $ logger loc src lvl (toLogStr msg) |
@@ -344,7 +336,7 @@ instance MonadLogger (DHT ip) where | |||
344 | #ifdef VERSION_bencoding | 336 | #ifdef VERSION_bencoding |
345 | type NodeHandler = Handler IO KMessageOf BValue | 337 | type NodeHandler = Handler IO KMessageOf BValue |
346 | #else | 338 | #else |
347 | type NodeHandler ip = Handler (DHT ip) Tox.Message ByteString | 339 | type NodeHandler ip = Handler (DHT raw dht u ip) Tox.Message ByteString |
348 | #endif | 340 | #endif |
349 | 341 | ||
350 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () | 342 | logt :: HasCallStack => LogFun -> Char -> String -> Text -> IO () |
@@ -376,17 +368,16 @@ locFromCS cs = case getCallStack cs of | |||
376 | -- | Run DHT session. You /must/ properly close session using | 368 | -- | Run DHT session. You /must/ properly close session using |
377 | -- 'closeNode' function, otherwise socket or other scarce resources may | 369 | -- 'closeNode' function, otherwise socket or other scarce resources may |
378 | -- leak. | 370 | -- leak. |
379 | newNode :: Address ip | 371 | newNode :: ( Address ip |
372 | , FiniteBits (NodeId dht) | ||
373 | , Serialize (NodeId dht) | ||
374 | ) | ||
380 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; | 375 | => -- [NodeHandler] -- ^ handlers to run on accepted queries; |
381 | Options -- ^ various dht options; | 376 | Options -- ^ various dht options; |
382 | -> NodeAddr ip -- ^ node address to bind; | 377 | -> NodeAddr ip -- ^ node address to bind; |
383 | -> LogFun -- ^ invoked on log messages; | 378 | -> LogFun -- ^ invoked on log messages; |
384 | #ifdef VERSION_bencoding | 379 | -> Maybe (NodeId dht) -- ^ use this NodeId, if not given a new one is generated. |
385 | -> Maybe (NodeId KMessageOf) -- ^ use this NodeId, if not given a new one is generated. | 380 | -> IO (Node raw dht u ip) -- ^ a new DHT node running at given address. |
386 | #else | ||
387 | -> Maybe (NodeId Tox.Message) -- ^ use this NodeId, if not given a new one is generated. | ||
388 | #endif | ||
389 | -> IO (Node ip) -- ^ a new DHT node running at given address. | ||
390 | newNode opts naddr logger mbid = do | 381 | newNode opts naddr logger mbid = do |
391 | s <- createInternalState | 382 | s <- createInternalState |
392 | runInternalState initNode s | 383 | runInternalState initNode s |
@@ -409,11 +400,11 @@ newNode opts naddr logger mbid = do | |||
409 | 400 | ||
410 | -- | Some resources like listener thread may live for | 401 | -- | Some resources like listener thread may live for |
411 | -- some short period of time right after this DHT session closed. | 402 | -- some short period of time right after this DHT session closed. |
412 | closeNode :: Node ip -> IO () | 403 | closeNode :: Node raw dht u ip -> IO () |
413 | closeNode Node {..} = closeInternalState resources | 404 | closeNode Node {..} = closeInternalState resources |
414 | 405 | ||
415 | -- | Run DHT operation on the given session. | 406 | -- | Run DHT operation on the given session. |
416 | runDHT :: Node ip -> DHT ip a -> IO a | 407 | runDHT :: Node raw dht u ip -> DHT raw dht u ip a -> IO a |
417 | runDHT node action = runReaderT (unDHT action) node | 408 | runDHT node action = runReaderT (unDHT action) node |
418 | {-# INLINE runDHT #-} | 409 | {-# INLINE runDHT #-} |
419 | 410 | ||
@@ -453,24 +444,20 @@ checkToken sessionTokens addr questionableToken = do | |||
453 | -----------------------------------------------------------------------} | 444 | -----------------------------------------------------------------------} |
454 | 445 | ||
455 | -- | This nodes externally routable address reported by remote peers. | 446 | -- | This nodes externally routable address reported by remote peers. |
456 | routableAddress :: DHT ip (Maybe SockAddr) | 447 | routableAddress :: DHT raw dht u ip (Maybe SockAddr) |
457 | routableAddress = do | 448 | routableAddress = do |
458 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 449 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
459 | return $ myAddress <$> info | 450 | return $ myAddress <$> info |
460 | 451 | ||
461 | -- | The current NodeId that the given remote node should know us by. | 452 | -- | The current NodeId that the given remote node should know us by. |
462 | #ifdef VERSION_bencoding | 453 | myNodeIdAccordingTo :: NodeAddr ip -> DHT raw dht u ip (NodeId dht) |
463 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId KMessageOf) | ||
464 | #else | ||
465 | myNodeIdAccordingTo :: NodeAddr ip -> DHT ip (NodeId Tox.Message) | ||
466 | #endif | ||
467 | myNodeIdAccordingTo _ = do | 454 | myNodeIdAccordingTo _ = do |
468 | info <- asks routingInfo >>= liftIO . atomically . readTVar | 455 | info <- asks routingInfo >>= liftIO . atomically . readTVar |
469 | maybe (asks tentativeNodeId) | 456 | maybe (asks tentativeNodeId) |
470 | (return . myNodeId) | 457 | (return . myNodeId) |
471 | info | 458 | info |
472 | 459 | ||
473 | myNodeIdAccordingTo1 :: DHT ip ( NodeAddr ip -> IO (NodeId KMessageOf) ) | 460 | myNodeIdAccordingTo1 :: DHT raw dht u ip ( NodeAddr ip -> IO (NodeId dht) ) |
474 | myNodeIdAccordingTo1 = do | 461 | myNodeIdAccordingTo1 = do |
475 | var <- asks routingInfo | 462 | var <- asks routingInfo |
476 | tid <- asks tentativeNodeId | 463 | tid <- asks tentativeNodeId |
@@ -480,11 +467,7 @@ myNodeIdAccordingTo1 = do | |||
480 | 467 | ||
481 | -- | Get current routing table. Normally you don't need to use this | 468 | -- | Get current routing table. Normally you don't need to use this |
482 | -- function, but it can be usefull for debugging and profiling purposes. | 469 | -- function, but it can be usefull for debugging and profiling purposes. |
483 | #ifdef VERSION_bencoding | 470 | getTable :: Eq ip => DHT raw dht u ip (Table dht ip u) |
484 | getTable :: Eq ip => DHT ip (Table KMessageOf ip ()) | ||
485 | #else | ||
486 | getTable :: Eq ip => DHT ip (Table Tox.Message ip Bool) | ||
487 | #endif | ||
488 | getTable = do | 471 | getTable = do |
489 | Node { tentativeNodeId = myId | 472 | Node { tentativeNodeId = myId |
490 | , routingInfo = var | 473 | , routingInfo = var |
@@ -492,18 +475,18 @@ getTable = do | |||
492 | let nil = nullTable myId (optBucketCount opts) | 475 | let nil = nullTable myId (optBucketCount opts) |
493 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) | 476 | liftIO (maybe nil R.myBuckets <$> atomically (readTVar var)) |
494 | 477 | ||
495 | getSwarms :: Ord ip => DHT ip [ (InfoHash, Int, Maybe ByteString) ] | 478 | getSwarms :: Ord ip => DHT raw dht u ip [ (InfoHash, Int, Maybe ByteString) ] |
496 | getSwarms = do | 479 | getSwarms = do |
497 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 480 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
498 | return $ P.knownSwarms store | 481 | return $ P.knownSwarms store |
499 | 482 | ||
500 | savePeerStore :: (Ord ip, Address ip) => DHT ip ByteString | 483 | savePeerStore :: (Ord ip, Address ip) => DHT raw dht u ip ByteString |
501 | savePeerStore = do | 484 | savePeerStore = do |
502 | var <- asks contactInfo | 485 | var <- asks contactInfo |
503 | peers <- liftIO $ atomically $ readTVar var | 486 | peers <- liftIO $ atomically $ readTVar var |
504 | return $ S.encode peers | 487 | return $ S.encode peers |
505 | 488 | ||
506 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT ip () | 489 | mergeSavedPeers :: (Ord ip, Address ip) => ByteString -> DHT raw dht u ip () |
507 | mergeSavedPeers bs = do | 490 | mergeSavedPeers bs = do |
508 | var <- asks contactInfo | 491 | var <- asks contactInfo |
509 | case S.decode bs of | 492 | case S.decode bs of |
@@ -511,7 +494,7 @@ mergeSavedPeers bs = do | |||
511 | Left _ -> return () | 494 | Left _ -> return () |
512 | 495 | ||
513 | 496 | ||
514 | allPeers :: Ord ip => InfoHash -> DHT ip [ PeerAddr ip ] | 497 | allPeers :: Ord ip => InfoHash -> DHT raw dht u ip [ PeerAddr ip ] |
515 | allPeers ih = do | 498 | allPeers ih = do |
516 | store <- asks contactInfo >>= liftIO . atomically . readTVar | 499 | store <- asks contactInfo >>= liftIO . atomically . readTVar |
517 | return $ P.lookup ih store | 500 | return $ P.lookup ih store |
@@ -521,18 +504,20 @@ allPeers ih = do | |||
521 | -- | 504 | -- |
522 | -- This operation used for 'find_nodes' query. | 505 | -- This operation used for 'find_nodes' query. |
523 | -- | 506 | -- |
524 | #ifdef VERSION_bencoding | 507 | getClosest :: ( Eq ip |
525 | getClosest :: Eq ip => TableKey KMessageOf k => k -> DHT ip [NodeInfo KMessageOf ip ()] | 508 | , Ord (NodeId dht) |
526 | #else | 509 | , FiniteBits (NodeId dht) |
527 | getClosest :: Eq ip => TableKey Tox.Message k => k -> DHT ip [NodeInfo Tox.Message ip Bool] | 510 | , TableKey dht k ) => |
528 | #endif | 511 | k -> DHT raw dht u ip [NodeInfo dht ip u] |
529 | getClosest node = do | 512 | getClosest node = do |
530 | k <- asks (optK . options) | 513 | k <- asks (optK . options) |
531 | kclosest k node <$> getTable | 514 | kclosest k node <$> getTable |
532 | 515 | ||
533 | getClosest1 :: ( Eq ip | 516 | getClosest1 :: ( Eq ip |
534 | , TableKey KMessageOf k | 517 | , Ord (NodeId dht) |
535 | ) => DHT ip (k -> IO [NodeInfo KMessageOf ip ()]) | 518 | , FiniteBits (NodeId dht) |
519 | , TableKey dht k | ||
520 | ) => DHT raw dht u ip (k -> IO [NodeInfo dht ip u]) | ||
536 | getClosest1 = do | 521 | getClosest1 = do |
537 | k <- asks (optK . options) | 522 | k <- asks (optK . options) |
538 | nobkts <- asks (optBucketCount . options) | 523 | nobkts <- asks (optBucketCount . options) |
@@ -574,13 +559,12 @@ getTimestamp = do | |||
574 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) | 559 | -- $(logDebugS) "routing.make_timestamp" (Text.pack (render (pPrint utcTime))) |
575 | return $ utcTimeToPOSIXSeconds utcTime | 560 | return $ utcTimeToPOSIXSeconds utcTime |
576 | 561 | ||
577 | |||
578 | #ifdef VERSION_bencoding | 562 | #ifdef VERSION_bencoding |
579 | -- | Prepare result for 'get_peers' query. | 563 | -- | Prepare result for 'get_peers' query. |
580 | -- | 564 | -- |
581 | -- This operation use 'getClosest' as failback so it may block. | 565 | -- This operation use 'getClosest' as failback so it may block. |
582 | -- | 566 | -- |
583 | getPeerList :: Ord ip => InfoHash -> DHT ip (PeerList ip) | 567 | getPeerList :: Ord ip => InfoHash -> DHT raw KMessageOf () ip (PeerList ip) |
584 | getPeerList ih = do | 568 | getPeerList ih = do |
585 | var <- asks contactInfo | 569 | var <- asks contactInfo |
586 | ps <- liftIO $ lookupPeers var ih | 570 | ps <- liftIO $ lookupPeers var ih |
@@ -588,7 +572,7 @@ getPeerList ih = do | |||
588 | then Left <$> getClosest ih | 572 | then Left <$> getClosest ih |
589 | else return (Right ps) | 573 | else return (Right ps) |
590 | 574 | ||
591 | getPeerList1 :: Ord ip => DHT ip (InfoHash -> IO (PeerList ip)) | 575 | getPeerList1 :: Ord ip => DHT raw KMessageOf () ip (InfoHash -> IO (PeerList ip)) |
592 | getPeerList1 = do | 576 | getPeerList1 = do |
593 | var <- asks contactInfo | 577 | var <- asks contactInfo |
594 | getclosest <- getClosest1 | 578 | getclosest <- getClosest1 |
@@ -599,12 +583,12 @@ getPeerList1 = do | |||
599 | else return (Right ps) | 583 | else return (Right ps) |
600 | 584 | ||
601 | 585 | ||
602 | insertTopic :: InfoHash -> PortNumber -> DHT ip () | 586 | insertTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
603 | insertTopic ih p = do | 587 | insertTopic ih p = do |
604 | var <- asks announceInfo | 588 | var <- asks announceInfo |
605 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) | 589 | liftIO $ atomically $ modifyTVar' var (S.insert (ih, p)) |
606 | 590 | ||
607 | deleteTopic :: InfoHash -> PortNumber -> DHT ip () | 591 | deleteTopic :: InfoHash -> PortNumber -> DHT raw dht u ip () |
608 | deleteTopic ih p = do | 592 | deleteTopic ih p = do |
609 | var <- asks announceInfo | 593 | var <- asks announceInfo |
610 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) | 594 | liftIO $ atomically $ modifyTVar' var (S.delete (ih, p)) |
@@ -616,7 +600,7 @@ deleteTopic ih p = do | |||
616 | -----------------------------------------------------------------------} | 600 | -----------------------------------------------------------------------} |
617 | 601 | ||
618 | -- | Failed queries are ignored. | 602 | -- | Failed queries are ignored. |
619 | queryParallel :: [DHT ip a] -> DHT ip [a] | 603 | queryParallel :: [DHT raw dht u ip a] -> DHT raw dht u ip [a] |
620 | queryParallel queries = do | 604 | queryParallel queries = do |
621 | -- TODO: use alpha | 605 | -- TODO: use alpha |
622 | -- alpha <- asks (optAlpha . options) | 606 | -- alpha <- asks (optAlpha . options) |