test_lb_policies.cc 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. //
  2. // Copyright 2018 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. //
  16. #include "test/core/util/test_lb_policies.h"
  17. #include <string>
  18. #include <grpc/support/log.h>
  19. #include "src/core/ext/filters/client_channel/lb_policy.h"
  20. #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
  21. #include "src/core/lib/address_utils/parse_address.h"
  22. #include "src/core/lib/channel/channel_args.h"
  23. #include "src/core/lib/channel/channelz.h"
  24. #include "src/core/lib/debug/trace.h"
  25. #include "src/core/lib/gprpp/memory.h"
  26. #include "src/core/lib/gprpp/orphanable.h"
  27. #include "src/core/lib/gprpp/ref_counted_ptr.h"
  28. #include "src/core/lib/iomgr/closure.h"
  29. #include "src/core/lib/iomgr/combiner.h"
  30. #include "src/core/lib/iomgr/error.h"
  31. #include "src/core/lib/iomgr/pollset_set.h"
  32. #include "src/core/lib/json/json.h"
  33. #include "src/core/lib/json/json_util.h"
  34. #include "src/core/lib/transport/connectivity_state.h"
  35. namespace grpc_core {
  36. namespace {
  37. //
  38. // ForwardingLoadBalancingPolicy
  39. //
  40. // A minimal forwarding class to avoid implementing a standalone test LB.
  41. class ForwardingLoadBalancingPolicy : public LoadBalancingPolicy {
  42. public:
  43. ForwardingLoadBalancingPolicy(
  44. std::unique_ptr<ChannelControlHelper> delegating_helper, Args args,
  45. const char* delegate_policy_name, intptr_t initial_refcount = 1)
  46. : LoadBalancingPolicy(std::move(args), initial_refcount) {
  47. Args delegate_args;
  48. delegate_args.work_serializer = work_serializer();
  49. delegate_args.channel_control_helper = std::move(delegating_helper);
  50. delegate_args.args = args.args;
  51. delegate_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
  52. delegate_policy_name, std::move(delegate_args));
  53. grpc_pollset_set_add_pollset_set(delegate_->interested_parties(),
  54. interested_parties());
  55. }
  56. ~ForwardingLoadBalancingPolicy() override = default;
  57. void UpdateLocked(UpdateArgs args) override {
  58. delegate_->UpdateLocked(std::move(args));
  59. }
  60. void ExitIdleLocked() override { delegate_->ExitIdleLocked(); }
  61. void ResetBackoffLocked() override { delegate_->ResetBackoffLocked(); }
  62. private:
  63. void ShutdownLocked() override { delegate_.reset(); }
  64. OrphanablePtr<LoadBalancingPolicy> delegate_;
  65. };
  66. //
  67. // TestPickArgsLb
  68. //
  69. constexpr char kTestPickArgsLbPolicyName[] = "test_pick_args_lb";
  70. class TestPickArgsLb : public ForwardingLoadBalancingPolicy {
  71. public:
  72. TestPickArgsLb(Args args, TestPickArgsCallback cb,
  73. const char* delegate_policy_name)
  74. : ForwardingLoadBalancingPolicy(
  75. absl::make_unique<Helper>(RefCountedPtr<TestPickArgsLb>(this), cb),
  76. std::move(args), delegate_policy_name,
  77. /*initial_refcount=*/2) {}
  78. ~TestPickArgsLb() override = default;
  79. const char* name() const override { return kTestPickArgsLbPolicyName; }
  80. private:
  81. class Picker : public SubchannelPicker {
  82. public:
  83. Picker(std::unique_ptr<SubchannelPicker> delegate_picker,
  84. TestPickArgsCallback cb)
  85. : delegate_picker_(std::move(delegate_picker)), cb_(std::move(cb)) {}
  86. PickResult Pick(PickArgs args) override {
  87. // Report args seen.
  88. PickArgsSeen args_seen;
  89. args_seen.path = std::string(args.path);
  90. args_seen.metadata = args.initial_metadata->TestOnlyCopyToVector();
  91. cb_(args_seen);
  92. // Do pick.
  93. return delegate_picker_->Pick(args);
  94. }
  95. private:
  96. std::unique_ptr<SubchannelPicker> delegate_picker_;
  97. TestPickArgsCallback cb_;
  98. };
  99. class Helper : public ChannelControlHelper {
  100. public:
  101. Helper(RefCountedPtr<TestPickArgsLb> parent, TestPickArgsCallback cb)
  102. : parent_(std::move(parent)), cb_(std::move(cb)) {}
  103. RefCountedPtr<SubchannelInterface> CreateSubchannel(
  104. ServerAddress address, const grpc_channel_args& args) override {
  105. return parent_->channel_control_helper()->CreateSubchannel(
  106. std::move(address), args);
  107. }
  108. void UpdateState(grpc_connectivity_state state, const absl::Status& status,
  109. std::unique_ptr<SubchannelPicker> picker) override {
  110. parent_->channel_control_helper()->UpdateState(
  111. state, status, absl::make_unique<Picker>(std::move(picker), cb_));
  112. }
  113. void RequestReresolution() override {
  114. parent_->channel_control_helper()->RequestReresolution();
  115. }
  116. absl::string_view GetAuthority() override {
  117. return parent_->channel_control_helper()->GetAuthority();
  118. }
  119. void AddTraceEvent(TraceSeverity severity,
  120. absl::string_view message) override {
  121. parent_->channel_control_helper()->AddTraceEvent(severity, message);
  122. }
  123. private:
  124. RefCountedPtr<TestPickArgsLb> parent_;
  125. TestPickArgsCallback cb_;
  126. };
  127. };
  128. class TestPickArgsLbConfig : public LoadBalancingPolicy::Config {
  129. public:
  130. const char* name() const override { return kTestPickArgsLbPolicyName; }
  131. };
  132. class TestPickArgsLbFactory : public LoadBalancingPolicyFactory {
  133. public:
  134. explicit TestPickArgsLbFactory(TestPickArgsCallback cb,
  135. const char* delegate_policy_name)
  136. : cb_(std::move(cb)), delegate_policy_name_(delegate_policy_name) {}
  137. OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
  138. LoadBalancingPolicy::Args args) const override {
  139. return MakeOrphanable<TestPickArgsLb>(std::move(args), cb_,
  140. delegate_policy_name_);
  141. }
  142. const char* name() const override { return kTestPickArgsLbPolicyName; }
  143. RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
  144. const Json& /*json*/, grpc_error_handle* /*error*/) const override {
  145. return MakeRefCounted<TestPickArgsLbConfig>();
  146. }
  147. private:
  148. TestPickArgsCallback cb_;
  149. const char* delegate_policy_name_;
  150. };
  151. //
  152. // InterceptRecvTrailingMetadataLoadBalancingPolicy
  153. //
  154. constexpr char kInterceptRecvTrailingMetadataLbPolicyName[] =
  155. "intercept_trailing_metadata_lb";
  156. class InterceptRecvTrailingMetadataLoadBalancingPolicy
  157. : public ForwardingLoadBalancingPolicy {
  158. public:
  159. InterceptRecvTrailingMetadataLoadBalancingPolicy(
  160. Args args, InterceptRecvTrailingMetadataCallback cb)
  161. : ForwardingLoadBalancingPolicy(
  162. absl::make_unique<Helper>(
  163. RefCountedPtr<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
  164. this),
  165. std::move(cb)),
  166. std::move(args),
  167. /*delegate_policy_name=*/"pick_first",
  168. /*initial_refcount=*/2) {}
  169. ~InterceptRecvTrailingMetadataLoadBalancingPolicy() override = default;
  170. const char* name() const override {
  171. return kInterceptRecvTrailingMetadataLbPolicyName;
  172. }
  173. private:
  174. class Picker : public SubchannelPicker {
  175. public:
  176. Picker(std::unique_ptr<SubchannelPicker> delegate_picker,
  177. InterceptRecvTrailingMetadataCallback cb)
  178. : delegate_picker_(std::move(delegate_picker)), cb_(std::move(cb)) {}
  179. PickResult Pick(PickArgs args) override {
  180. // Do pick.
  181. PickResult result = delegate_picker_->Pick(args);
  182. // Intercept trailing metadata.
  183. auto* complete_pick = absl::get_if<PickResult::Complete>(&result.result);
  184. if (complete_pick != nullptr) {
  185. complete_pick->subchannel_call_tracker =
  186. absl::make_unique<SubchannelCallTracker>(cb_);
  187. }
  188. return result;
  189. }
  190. private:
  191. std::unique_ptr<SubchannelPicker> delegate_picker_;
  192. InterceptRecvTrailingMetadataCallback cb_;
  193. };
  194. class Helper : public ChannelControlHelper {
  195. public:
  196. Helper(
  197. RefCountedPtr<InterceptRecvTrailingMetadataLoadBalancingPolicy> parent,
  198. InterceptRecvTrailingMetadataCallback cb)
  199. : parent_(std::move(parent)), cb_(std::move(cb)) {}
  200. RefCountedPtr<SubchannelInterface> CreateSubchannel(
  201. ServerAddress address, const grpc_channel_args& args) override {
  202. return parent_->channel_control_helper()->CreateSubchannel(
  203. std::move(address), args);
  204. }
  205. void UpdateState(grpc_connectivity_state state, const absl::Status& status,
  206. std::unique_ptr<SubchannelPicker> picker) override {
  207. parent_->channel_control_helper()->UpdateState(
  208. state, status, absl::make_unique<Picker>(std::move(picker), cb_));
  209. }
  210. void RequestReresolution() override {
  211. parent_->channel_control_helper()->RequestReresolution();
  212. }
  213. absl::string_view GetAuthority() override {
  214. return parent_->channel_control_helper()->GetAuthority();
  215. }
  216. void AddTraceEvent(TraceSeverity severity,
  217. absl::string_view message) override {
  218. parent_->channel_control_helper()->AddTraceEvent(severity, message);
  219. }
  220. private:
  221. RefCountedPtr<InterceptRecvTrailingMetadataLoadBalancingPolicy> parent_;
  222. InterceptRecvTrailingMetadataCallback cb_;
  223. };
  224. class SubchannelCallTracker : public SubchannelCallTrackerInterface {
  225. public:
  226. explicit SubchannelCallTracker(InterceptRecvTrailingMetadataCallback cb)
  227. : cb_(std::move(cb)) {}
  228. void Start() override {}
  229. void Finish(FinishArgs args) override {
  230. TrailingMetadataArgsSeen args_seen;
  231. args_seen.status = args.status;
  232. args_seen.backend_metric_data =
  233. args.backend_metric_accessor->GetBackendMetricData();
  234. args_seen.metadata = args.trailing_metadata->TestOnlyCopyToVector();
  235. cb_(args_seen);
  236. }
  237. private:
  238. InterceptRecvTrailingMetadataCallback cb_;
  239. };
  240. };
  241. class InterceptTrailingConfig : public LoadBalancingPolicy::Config {
  242. public:
  243. const char* name() const override {
  244. return kInterceptRecvTrailingMetadataLbPolicyName;
  245. }
  246. };
  247. class InterceptTrailingFactory : public LoadBalancingPolicyFactory {
  248. public:
  249. explicit InterceptTrailingFactory(InterceptRecvTrailingMetadataCallback cb)
  250. : cb_(std::move(cb)) {}
  251. OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
  252. LoadBalancingPolicy::Args args) const override {
  253. return MakeOrphanable<InterceptRecvTrailingMetadataLoadBalancingPolicy>(
  254. std::move(args), cb_);
  255. }
  256. const char* name() const override {
  257. return kInterceptRecvTrailingMetadataLbPolicyName;
  258. }
  259. RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
  260. const Json& /*json*/, grpc_error_handle* /*error*/) const override {
  261. return MakeRefCounted<InterceptTrailingConfig>();
  262. }
  263. private:
  264. InterceptRecvTrailingMetadataCallback cb_;
  265. };
  266. //
  267. // AddressTestLoadBalancingPolicy
  268. //
  269. constexpr char kAddressTestLbPolicyName[] = "address_test_lb";
  270. class AddressTestLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
  271. public:
  272. AddressTestLoadBalancingPolicy(Args args, AddressTestCallback cb)
  273. : ForwardingLoadBalancingPolicy(
  274. absl::make_unique<Helper>(
  275. RefCountedPtr<AddressTestLoadBalancingPolicy>(this),
  276. std::move(cb)),
  277. std::move(args),
  278. /*delegate_policy_name=*/"pick_first",
  279. /*initial_refcount=*/2) {}
  280. ~AddressTestLoadBalancingPolicy() override = default;
  281. const char* name() const override { return kAddressTestLbPolicyName; }
  282. private:
  283. class Helper : public ChannelControlHelper {
  284. public:
  285. Helper(RefCountedPtr<AddressTestLoadBalancingPolicy> parent,
  286. AddressTestCallback cb)
  287. : parent_(std::move(parent)), cb_(std::move(cb)) {}
  288. RefCountedPtr<SubchannelInterface> CreateSubchannel(
  289. ServerAddress address, const grpc_channel_args& args) override {
  290. cb_(address);
  291. return parent_->channel_control_helper()->CreateSubchannel(
  292. std::move(address), args);
  293. }
  294. void UpdateState(grpc_connectivity_state state, const absl::Status& status,
  295. std::unique_ptr<SubchannelPicker> picker) override {
  296. parent_->channel_control_helper()->UpdateState(state, status,
  297. std::move(picker));
  298. }
  299. void RequestReresolution() override {
  300. parent_->channel_control_helper()->RequestReresolution();
  301. }
  302. absl::string_view GetAuthority() override {
  303. return parent_->channel_control_helper()->GetAuthority();
  304. }
  305. void AddTraceEvent(TraceSeverity severity,
  306. absl::string_view message) override {
  307. parent_->channel_control_helper()->AddTraceEvent(severity, message);
  308. }
  309. private:
  310. RefCountedPtr<AddressTestLoadBalancingPolicy> parent_;
  311. AddressTestCallback cb_;
  312. };
  313. };
  314. class AddressTestConfig : public LoadBalancingPolicy::Config {
  315. public:
  316. const char* name() const override { return kAddressTestLbPolicyName; }
  317. };
  318. class AddressTestFactory : public LoadBalancingPolicyFactory {
  319. public:
  320. explicit AddressTestFactory(AddressTestCallback cb) : cb_(std::move(cb)) {}
  321. OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
  322. LoadBalancingPolicy::Args args) const override {
  323. return MakeOrphanable<AddressTestLoadBalancingPolicy>(std::move(args), cb_);
  324. }
  325. const char* name() const override { return kAddressTestLbPolicyName; }
  326. RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
  327. const Json& /*json*/, grpc_error_handle* /*error*/) const override {
  328. return MakeRefCounted<AddressTestConfig>();
  329. }
  330. private:
  331. AddressTestCallback cb_;
  332. };
  333. //
  334. // FixedAddressLoadBalancingPolicy
  335. //
  336. constexpr char kFixedAddressLbPolicyName[] = "fixed_address_lb";
  337. class FixedAddressConfig : public LoadBalancingPolicy::Config {
  338. public:
  339. explicit FixedAddressConfig(std::string address)
  340. : address_(std::move(address)) {}
  341. const char* name() const override { return kFixedAddressLbPolicyName; }
  342. const std::string& address() const { return address_; }
  343. private:
  344. std::string address_;
  345. };
  346. class FixedAddressLoadBalancingPolicy : public ForwardingLoadBalancingPolicy {
  347. public:
  348. explicit FixedAddressLoadBalancingPolicy(Args args)
  349. : ForwardingLoadBalancingPolicy(
  350. absl::make_unique<Helper>(
  351. RefCountedPtr<FixedAddressLoadBalancingPolicy>(this)),
  352. std::move(args),
  353. /*delegate_policy_name=*/"pick_first",
  354. /*initial_refcount=*/2) {}
  355. ~FixedAddressLoadBalancingPolicy() override = default;
  356. const char* name() const override { return kFixedAddressLbPolicyName; }
  357. void UpdateLocked(UpdateArgs args) override {
  358. auto* config = static_cast<FixedAddressConfig*>(args.config.get());
  359. gpr_log(GPR_INFO, "%s: update URI: %s", kFixedAddressLbPolicyName,
  360. config->address().c_str());
  361. auto uri = URI::Parse(config->address());
  362. args.config.reset();
  363. args.addresses = ServerAddressList();
  364. if (uri.ok()) {
  365. grpc_resolved_address address;
  366. GPR_ASSERT(grpc_parse_uri(*uri, &address));
  367. args.addresses->emplace_back(address, /*args=*/nullptr);
  368. } else {
  369. gpr_log(GPR_ERROR,
  370. "%s: could not parse URI (%s), using empty address list",
  371. kFixedAddressLbPolicyName, uri.status().ToString().c_str());
  372. }
  373. ForwardingLoadBalancingPolicy::UpdateLocked(std::move(args));
  374. }
  375. private:
  376. class Helper : public ChannelControlHelper {
  377. public:
  378. explicit Helper(RefCountedPtr<FixedAddressLoadBalancingPolicy> parent)
  379. : parent_(std::move(parent)) {}
  380. RefCountedPtr<SubchannelInterface> CreateSubchannel(
  381. ServerAddress address, const grpc_channel_args& args) override {
  382. return parent_->channel_control_helper()->CreateSubchannel(
  383. std::move(address), args);
  384. }
  385. void UpdateState(grpc_connectivity_state state, const absl::Status& status,
  386. std::unique_ptr<SubchannelPicker> picker) override {
  387. parent_->channel_control_helper()->UpdateState(state, status,
  388. std::move(picker));
  389. }
  390. void RequestReresolution() override {
  391. parent_->channel_control_helper()->RequestReresolution();
  392. }
  393. absl::string_view GetAuthority() override {
  394. return parent_->channel_control_helper()->GetAuthority();
  395. }
  396. void AddTraceEvent(TraceSeverity severity,
  397. absl::string_view message) override {
  398. parent_->channel_control_helper()->AddTraceEvent(severity, message);
  399. }
  400. private:
  401. RefCountedPtr<FixedAddressLoadBalancingPolicy> parent_;
  402. };
  403. };
  404. class FixedAddressFactory : public LoadBalancingPolicyFactory {
  405. public:
  406. FixedAddressFactory() = default;
  407. OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
  408. LoadBalancingPolicy::Args args) const override {
  409. return MakeOrphanable<FixedAddressLoadBalancingPolicy>(std::move(args));
  410. }
  411. const char* name() const override { return kFixedAddressLbPolicyName; }
  412. RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
  413. const Json& json, grpc_error_handle* error) const override {
  414. std::vector<grpc_error_handle> error_list;
  415. std::string address;
  416. ParseJsonObjectField(json.object_value(), "address", &address, &error_list);
  417. if (!error_list.empty()) {
  418. *error = GRPC_ERROR_CREATE_FROM_VECTOR(
  419. "errors parsing fixed_address_lb config", &error_list);
  420. return nullptr;
  421. }
  422. return MakeRefCounted<FixedAddressConfig>(std::move(address));
  423. }
  424. };
  425. } // namespace
  426. void RegisterTestPickArgsLoadBalancingPolicy(TestPickArgsCallback cb,
  427. const char* delegate_policy_name) {
  428. LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
  429. absl::make_unique<TestPickArgsLbFactory>(std::move(cb),
  430. delegate_policy_name));
  431. }
  432. void RegisterInterceptRecvTrailingMetadataLoadBalancingPolicy(
  433. InterceptRecvTrailingMetadataCallback cb) {
  434. LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
  435. absl::make_unique<InterceptTrailingFactory>(std::move(cb)));
  436. }
  437. void RegisterAddressTestLoadBalancingPolicy(AddressTestCallback cb) {
  438. LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
  439. absl::make_unique<AddressTestFactory>(std::move(cb)));
  440. }
  441. void RegisterFixedAddressLoadBalancingPolicy() {
  442. LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
  443. absl::make_unique<FixedAddressFactory>());
  444. }
  445. } // namespace grpc_core