diff options
author | Joe Crayne <joe@jerkface.net> | 2020-01-03 21:27:50 -0500 |
---|---|---|
committer | Joe Crayne <joe@jerkface.net> | 2020-01-07 13:24:59 -0500 |
commit | c7fb8cfe16f821e4e148d1855a18cb81255743bc (patch) | |
tree | c035afc9ff870ea3bfc5b1dc7c4254ad0c0bf4b3 /dht/src/Network/BitTorrent/MainlineDHT.hs | |
parent | 5ea2de4e858cc89282561922bae257b6f9041d2e (diff) |
Async search.
Diffstat (limited to 'dht/src/Network/BitTorrent/MainlineDHT.hs')
-rw-r--r-- | dht/src/Network/BitTorrent/MainlineDHT.hs | 76 |
1 files changed, 64 insertions, 12 deletions
diff --git a/dht/src/Network/BitTorrent/MainlineDHT.hs b/dht/src/Network/BitTorrent/MainlineDHT.hs index 8532b492..d3904c40 100644 --- a/dht/src/Network/BitTorrent/MainlineDHT.hs +++ b/dht/src/Network/BitTorrent/MainlineDHT.hs | |||
@@ -512,8 +512,8 @@ data Routing = Routing | |||
512 | { tentativeId :: NodeInfo | 512 | { tentativeId :: NodeInfo |
513 | , committee4 :: TriadCommittee NodeId SockAddr | 513 | , committee4 :: TriadCommittee NodeId SockAddr |
514 | , committee6 :: TriadCommittee NodeId SockAddr | 514 | , committee6 :: TriadCommittee NodeId SockAddr |
515 | , refresher4 :: BucketRefresher NodeId NodeInfo | 515 | , refresher4 :: BucketRefresher NodeId NodeInfo TransactionId |
516 | , refresher6 :: BucketRefresher NodeId NodeInfo | 516 | , refresher6 :: BucketRefresher NodeId NodeInfo TransactionId |
517 | } | 517 | } |
518 | 518 | ||
519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) | 519 | sched4 :: Routing -> TVar (Int.PSQ POSIXTime) |
@@ -569,7 +569,6 @@ newClient swarms addr = do | |||
569 | -- We defer initializing the refreshSearch and refreshPing until we | 569 | -- We defer initializing the refreshSearch and refreshPing until we |
570 | -- have a client to send queries with. | 570 | -- have a client to send queries with. |
571 | let nullPing = const $ return False | 571 | let nullPing = const $ return False |
572 | nullSearch = mainlineSearch $ \_ _ -> return Canceled | ||
573 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount | 572 | tbl4 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info R.defaultBucketCount |
574 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing | 573 | refresher4 <- newBucketRefresher tbl4 nullSearch nullPing |
575 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount | 574 | tbl6 <- newTVar $ R.nullTable (comparing nodeId) (\s -> hashWithSalt s . nodeId) tentative_info6 R.defaultBucketCount |
@@ -730,7 +729,7 @@ defaultHandler meth = MethodHandler decodePayload errorPayload returnError | |||
730 | 729 | ||
731 | mainlineKademlia :: MainlineClient | 730 | mainlineKademlia :: MainlineClient |
732 | -> TriadCommittee NodeId SockAddr | 731 | -> TriadCommittee NodeId SockAddr |
733 | -> BucketRefresher NodeId NodeInfo | 732 | -> BucketRefresher NodeId NodeInfo TransactionId |
734 | -> Kademlia NodeId NodeInfo | 733 | -> Kademlia NodeId NodeInfo |
735 | mainlineKademlia client committee refresher | 734 | mainlineKademlia client committee refresher |
736 | = Kademlia quietInsertions | 735 | = Kademlia quietInsertions |
@@ -1037,6 +1036,35 @@ announceH (SwarmsDatabase peers toks _) naddr announcement = do | |||
1037 | isReadonlyClient :: MainlineClient -> Bool | 1036 | isReadonlyClient :: MainlineClient -> Bool |
1038 | isReadonlyClient client = False -- TODO | 1037 | isReadonlyClient client = False -- TODO |
1039 | 1038 | ||
1039 | mainlineAsync :: ( BEncode xqry | ||
1040 | , BEncode xrsp | ||
1041 | ) => Method | ||
1042 | -> (xrsp -> rsp) | ||
1043 | -> (qry -> xqry) | ||
1044 | -> MainlineClient | ||
1045 | -> qry | ||
1046 | -> NodeInfo | ||
1047 | -> (TransactionId -> QR.Result rsp -> IO ()) | ||
1048 | -> IO TransactionId | ||
1049 | mainlineAsync meth unwrap msg client nid addr withResult = do | ||
1050 | asyncQuery client serializer (msg nid) addr $ \qid reply -> do | ||
1051 | withResult qid $ case reply of | ||
1052 | Success (Right x) -> Success x | ||
1053 | Success (Left e) -> Canceled -- TODO: Do something with parse errors. | ||
1054 | Canceled -> Canceled | ||
1055 | TimedOut -> TimedOut | ||
1056 | where | ||
1057 | serializer = MethodSerializer | ||
1058 | { methodTimeout = \ni -> return (ni, 5000000) | ||
1059 | , method = meth | ||
1060 | , wrapQuery = encodeQueryPayload meth (isReadonlyClient client) | ||
1061 | , unwrapResponse = (>>= either (Left . Error GenericError . C8.pack) | ||
1062 | (Right . unwrap) | ||
1063 | . BE.fromBEncode) | ||
1064 | . rspPayload | ||
1065 | } | ||
1066 | |||
1067 | |||
1040 | mainlineSend :: ( BEncode xqry | 1068 | mainlineSend :: ( BEncode xqry |
1041 | , BEncode xrsp | 1069 | , BEncode xrsp |
1042 | ) => Method | 1070 | ) => Method |
@@ -1073,30 +1101,54 @@ ping client addr = | |||
1073 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) | 1101 | getNodes :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[NodeInfo],Maybe ())) |
1074 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | 1102 | getNodes = mainlineSend (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) |
1075 | 1103 | ||
1104 | asyncGetNodes :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[NodeInfo],Maybe ()) -> IO ()) | ||
1105 | -> IO TransactionId | ||
1106 | asyncGetNodes = mainlineAsync (Method "find_node") unwrapNodes $ flip FindNode (Just Want_Both) | ||
1107 | |||
1076 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) | 1108 | unwrapNodes :: NodeFound -> ([NodeInfo], [NodeInfo], Maybe ()) |
1077 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) | 1109 | unwrapNodes (NodeFound ns4 ns6) = (ns4++ns6, ns4++ns6, Just ()) |
1078 | 1110 | ||
1079 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) | 1111 | getPeers :: MainlineClient -> NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo],[PeerAddr],Maybe Token)) |
1080 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | 1112 | getPeers = mainlineSend (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce |
1081 | 1113 | ||
1114 | asyncGetPeers :: MainlineClient -> NodeId -> NodeInfo -> (TransactionId -> QR.Result ([NodeInfo],[PeerAddr],Maybe Token) -> IO ()) | ||
1115 | -> IO TransactionId | ||
1116 | asyncGetPeers = mainlineAsync (Method "get_peers") unwrapPeers $ flip GetPeers (Just Want_Both) . coerce | ||
1117 | |||
1082 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) | 1118 | unwrapPeers :: GotPeers -> ([NodeInfo], [PeerAddr], Maybe Token) |
1083 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) | 1119 | unwrapPeers (GotPeers ps (NodeFound ns4 ns6) tok) = (ns4++ns6, ps, Just tok) |
1084 | 1120 | ||
1085 | mainlineSearch :: (NodeId -> NodeInfo -> IO (QR.Result ([NodeInfo], [r], Maybe tok))) | 1121 | nullTransactionId :: TransactionId |
1086 | -> Search NodeId (IP, PortNumber) tok NodeInfo r | 1122 | nullTransactionId = TransactionId B.empty |
1087 | mainlineSearch qry = Search | 1123 | |
1124 | nullSearch :: Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1125 | nullSearch = Search | ||
1126 | { searchSpace = mainlineSpace | ||
1127 | , searchNodeAddress = nodeIP &&& nodePort | ||
1128 | , searchQuery = \_ _ f -> f nullTransactionId Canceled >> return nullTransactionId | ||
1129 | , searchQueryCancel = \_ _ -> return () | ||
1130 | , searchAlpha = 8 | ||
1131 | , searchK = 16 | ||
1132 | } | ||
1133 | |||
1134 | mainlineSearch :: MainlineClient | ||
1135 | -> (MainlineClient -> NodeId -> NodeInfo | ||
1136 | -> (TransactionId -> QR.Result ([NodeInfo], [r], Maybe tok) -> IO ()) -> IO TransactionId) | ||
1137 | -> Search NodeId (IP, PortNumber) tok NodeInfo r TransactionId | ||
1138 | mainlineSearch client qry = Search | ||
1088 | { searchSpace = mainlineSpace | 1139 | { searchSpace = mainlineSpace |
1089 | , searchNodeAddress = nodeIP &&& nodePort | 1140 | , searchNodeAddress = nodeIP &&& nodePort |
1090 | , searchQuery = qry | 1141 | , searchQuery = qry client |
1142 | , searchQueryCancel = cancelQuery client | ||
1091 | , searchAlpha = 8 | 1143 | , searchAlpha = 8 |
1092 | , searchK = 16 | 1144 | , searchK = 16 |
1093 | } | 1145 | } |
1094 | 1146 | ||
1095 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo | 1147 | nodeSearch :: MainlineClient -> Search NodeId (IP, PortNumber) () NodeInfo NodeInfo TransactionId |
1096 | nodeSearch client = mainlineSearch (getNodes client) | 1148 | nodeSearch client = mainlineSearch client asyncGetNodes |
1097 | 1149 | ||
1098 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr | 1150 | peerSearch :: MainlineClient -> Search NodeId (IP, PortNumber) Token NodeInfo PeerAddr TransactionId |
1099 | peerSearch client = mainlineSearch (getPeers client) | 1151 | peerSearch client = mainlineSearch client asyncGetPeers |
1100 | 1152 | ||
1101 | -- | List of bootstrap nodes maintained by different bittorrent | 1153 | -- | List of bootstrap nodes maintained by different bittorrent |
1102 | -- software authors. | 1154 | -- software authors. |