diff options
author | joe <joe@jerkface.net> | 2017-11-06 05:18:04 -0500 |
---|---|---|
committer | joe <joe@jerkface.net> | 2017-11-08 02:30:43 -0500 |
commit | 70a96073db817b19e98d058702b1a8aa3d4b8445 (patch) | |
tree | 83414727033ad1fb66ea6289a20495b275a4e13c /src/Network/BitTorrent/MainlineDHT.hs | |
parent | 6749c25eb6bf544ebef51817049c922030e8369d (diff) |
Bootstrapping rework in progress.
Diffstat (limited to 'src/Network/BitTorrent/MainlineDHT.hs')
-rw-r--r-- | src/Network/BitTorrent/MainlineDHT.hs | 70 |
1 files changed, 46 insertions, 24 deletions
diff --git a/src/Network/BitTorrent/MainlineDHT.hs b/src/Network/BitTorrent/MainlineDHT.hs index c7ef560c..268cacfb 100644 --- a/src/Network/BitTorrent/MainlineDHT.hs +++ b/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -6,6 +6,7 @@ | |||
6 | {-# LANGUAGE FlexibleInstances #-} | 6 | {-# LANGUAGE FlexibleInstances #-} |
7 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} | 7 | {-# LANGUAGE GeneralizedNewtypeDeriving #-} |
8 | {-# LANGUAGE LambdaCase #-} | 8 | {-# LANGUAGE LambdaCase #-} |
9 | {-# LANGUAGE NamedFieldPuns #-} | ||
9 | {-# LANGUAGE PatternSynonyms #-} | 10 | {-# LANGUAGE PatternSynonyms #-} |
10 | {-# LANGUAGE StandaloneDeriving #-} | 11 | {-# LANGUAGE StandaloneDeriving #-} |
11 | {-# LANGUAGE TupleSections #-} | 12 | {-# LANGUAGE TupleSections #-} |
@@ -502,14 +503,24 @@ newSwarmsDatabase = do | |||
502 | 503 | ||
503 | data Routing = Routing | 504 | data Routing = Routing |
504 | { tentativeId :: NodeInfo | 505 | { tentativeId :: NodeInfo |
505 | , sched4 :: !( TVar (Int.PSQ POSIXTime) ) | ||
506 | , routing4 :: !( TVar (R.BucketList NodeInfo) ) | ||
507 | , committee4 :: TriadCommittee NodeId SockAddr | 506 | , committee4 :: TriadCommittee NodeId SockAddr |
508 | , sched6 :: !( TVar (Int.PSQ POSIXTime) ) | ||
509 | , routing6 :: !( TVar (R.BucketList NodeInfo) ) | ||
510 | , committee6 :: TriadCommittee NodeId SockAddr | 507 | , committee6 :: TriadCommittee NodeId SockAddr |
508 | , refresher4 :: BucketRefresher NodeId NodeInfo | ||
509 | , refresher6 :: BucketRefresher NodeId NodeInfo | ||
511 | } | 510 | } |
512 | 511 | ||
512 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
513 | sched4 Routing { refresher4 = BucketRefresher { refreshQueue } } = refreshQueue | ||
514 | |||
515 | sched6 :: Routing -> TVar (Int.PSQ POSIXTime) | ||
516 | sched6 Routing { refresher6 = BucketRefresher { refreshQueue } } = refreshQueue | ||
517 | |||
518 | routing4 :: Routing -> TVar (R.BucketList NodeInfo) | ||
519 | routing4 Routing { refresher4 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
520 | |||
521 | routing6 :: Routing -> TVar (R.BucketList NodeInfo) | ||
522 | routing6 Routing { refresher6 = BucketRefresher { refreshBuckets } } = refreshBuckets | ||
523 | |||
513 | traced :: Show tid => TableMethods t tid -> TableMethods t tid | 524 | traced :: Show tid => TableMethods t tid -> TableMethods t tid |
514 | traced (TableMethods ins del lkup) | 525 | traced (TableMethods ins del lkup) |
515 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) | 526 | = TableMethods (\tid mvar t -> trace ("insert "++show tid) $ ins tid mvar t) |
@@ -537,7 +548,7 @@ newClient swarms addr = do | |||
537 | <$> global6 | 548 | <$> global6 |
538 | addr4 <- atomically $ newTChan | 549 | addr4 <- atomically $ newTChan |
539 | addr6 <- atomically $ newTChan | 550 | addr6 <- atomically $ newTChan |
540 | routing <- atomically $ do | 551 | mkrouting <- atomically $ do |
541 | let nobkts = R.defaultBucketCount :: Int | 552 | let nobkts = R.defaultBucketCount :: Int |
542 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts | 553 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info nobkts |
543 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts | 554 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 nobkts |
@@ -558,9 +569,22 @@ newClient swarms addr = do | |||
558 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 | 569 | committee6 <- newTriadCommittee $ updateIPVote tbl6 addr6 |
559 | sched4 <- newTVar Int.empty | 570 | sched4 <- newTVar Int.empty |
560 | sched6 <- newTVar Int.empty | 571 | sched6 <- newTVar Int.empty |
561 | return $ Routing tentative_info sched4 tbl4 committee4 sched6 tbl6 committee6 | 572 | return $ \client -> |
573 | let refresher sched bkts = BucketRefresher | ||
574 | { refreshInterval = 15 * 60 | ||
575 | , refreshQueue = sched | ||
576 | , refreshSearch = nodeSearch client | ||
577 | , refreshBuckets = bkts | ||
578 | , refreshPing = ping client | ||
579 | } | ||
580 | refresher4 = refresher sched4 tbl4 | ||
581 | refresher6 = refresher sched6 tbl6 | ||
582 | in Routing tentative_info committee4 committee6 refresher4 refresher6 | ||
562 | map_var <- atomically $ newTVar (0, mempty) | 583 | map_var <- atomically $ newTVar (0, mempty) |
563 | let net = onInbound (updateRouting outgoingClient routing) | 584 | |
585 | let routing = mkrouting outgoingClient | ||
586 | |||
587 | net = onInbound (updateRouting outgoingClient routing) | ||
564 | $ layerTransport parsePacket encodePacket | 588 | $ layerTransport parsePacket encodePacket |
565 | $ udp | 589 | $ udp |
566 | 590 | ||
@@ -625,14 +649,9 @@ newClient swarms addr = do | |||
625 | -- TODO: trigger bootstrap ipv6 | 649 | -- TODO: trigger bootstrap ipv6 |
626 | again | 650 | again |
627 | 651 | ||
628 | refresh_thread4 <- forkPollForRefresh | 652 | |
629 | (15*60) | 653 | refresh_thread4 <- forkPollForRefresh $ refresher4 routing |
630 | (sched4 routing) | 654 | refresh_thread6 <- forkPollForRefresh $ refresher6 routing |
631 | (refreshBucket (nodeSearch client) (routing4 routing)) | ||
632 | refresh_thread6 <- forkPollForRefresh | ||
633 | (15*60) | ||
634 | (sched6 routing) | ||
635 | (refreshBucket (nodeSearch client) (routing6 routing)) | ||
636 | 655 | ||
637 | return (client, routing) | 656 | return (client, routing) |
638 | 657 | ||
@@ -676,14 +695,17 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
676 | returnError :: NodeInfo -> BValue -> IO Error | 695 | returnError :: NodeInfo -> BValue -> IO Error |
677 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) | 696 | returnError _ _ = return $ Error MethodUnknown ("Unknown method " <> meth) |
678 | 697 | ||
679 | mainlineKademlia :: MainlineClient -> TriadCommittee NodeId SockAddr -> TVar (R.BucketList NodeInfo) -> TVar (Int.PSQ POSIXTime) -> Kademlia NodeId NodeInfo | 698 | mainlineKademlia :: MainlineClient |
680 | mainlineKademlia client committee var sched | 699 | -> TriadCommittee NodeId SockAddr |
700 | -> BucketRefresher NodeId NodeInfo | ||
701 | -> Kademlia NodeId NodeInfo | ||
702 | mainlineKademlia client committee refresher | ||
681 | = Kademlia quietInsertions | 703 | = Kademlia quietInsertions |
682 | mainlineSpace | 704 | mainlineSpace |
683 | (vanillaIO var $ ping client) | 705 | (vanillaIO (refreshBuckets refresher) $ ping client) |
684 | { tblTransition = \tr -> do | 706 | { tblTransition = \tr -> do |
685 | io1 <- transitionCommittee committee tr | 707 | io1 <- transitionCommittee committee tr |
686 | io2 <- touchBucket mainlineSpace (15*60) var sched tr | 708 | io2 <- touchBucket refresher tr |
687 | return $ do | 709 | return $ do |
688 | io1 >> io2 | 710 | io1 >> io2 |
689 | {- noisy (timestamp updates are currently reported as transitions to Accepted) | 711 | {- noisy (timestamp updates are currently reported as transitions to Accepted) |
@@ -712,11 +734,11 @@ transitionCommittee committee _ = return $ return () | |||
712 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () | 734 | updateRouting :: MainlineClient -> Routing -> NodeInfo -> Message BValue -> IO () |
713 | updateRouting client routing naddr msg = do | 735 | updateRouting client routing naddr msg = do |
714 | case prefer4or6 naddr Nothing of | 736 | case prefer4or6 naddr Nothing of |
715 | Want_IP4 -> go (routing4 routing) (committee4 routing) (sched4 routing) | 737 | Want_IP4 -> go (committee4 routing) (refresher4 routing) |
716 | Want_IP6 -> go (routing6 routing) (committee6 routing) (sched6 routing) | 738 | Want_IP6 -> go (committee6 routing) (refresher6 routing) |
717 | where | 739 | where |
718 | go tbl committee sched = do | 740 | go committee refresher = do |
719 | self <- atomically $ R.thisNode <$> readTVar tbl | 741 | self <- atomically $ R.thisNode <$> readTVar (refreshBuckets refresher) |
720 | when (nodeIP self /= nodeIP naddr) $ do | 742 | when (nodeIP self /= nodeIP naddr) $ do |
721 | case msg of | 743 | case msg of |
722 | R { rspReflectedIP = Just sockaddr } | 744 | R { rspReflectedIP = Just sockaddr } |
@@ -724,7 +746,7 @@ updateRouting client routing naddr msg = do | |||
724 | -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) | 746 | -- hPutStrLn stderr $ "External: "++show (nodeId naddr,sockaddr) |
725 | atomically $ addVote committee (nodeId naddr) sockaddr | 747 | atomically $ addVote committee (nodeId naddr) sockaddr |
726 | _ -> return () | 748 | _ -> return () |
727 | insertNode (mainlineKademlia client committee tbl sched) naddr | 749 | insertNode (mainlineKademlia client committee refresher) naddr |
728 | 750 | ||
729 | data Ping = Ping deriving Show | 751 | data Ping = Ping deriving Show |
730 | 752 | ||