retry_lb_fail.cc 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. //
  2. // Copyright 2017 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. //
  16. #include <stdio.h>
  17. #include <string.h>
  18. #include <grpc/byte_buffer.h>
  19. #include <grpc/grpc.h>
  20. #include <grpc/support/alloc.h>
  21. #include <grpc/support/log.h>
  22. #include <grpc/support/string_util.h>
  23. #include <grpc/support/time.h>
  24. #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
  25. #include "src/core/lib/channel/channel_args.h"
  26. #include "src/core/lib/gpr/string.h"
  27. #include "src/core/lib/gpr/useful.h"
  28. #include "src/core/lib/iomgr/exec_ctx.h"
  29. #include "src/core/lib/transport/error_utils.h"
  30. #include "test/core/end2end/cq_verifier.h"
  31. #include "test/core/end2end/end2end_tests.h"
  32. #include "test/core/end2end/tests/cancel_test_helpers.h"
  33. namespace grpc_core {
  34. namespace {
  35. const char* kFailPolicyName = "fail_lb";
  36. std::atomic<int> g_num_lb_picks;
  37. class FailPolicy : public LoadBalancingPolicy {
  38. public:
  39. explicit FailPolicy(Args args) : LoadBalancingPolicy(std::move(args)) {}
  40. const char* name() const override { return kFailPolicyName; }
  41. void UpdateLocked(UpdateArgs) override {
  42. absl::Status status = absl::AbortedError("LB pick failed");
  43. channel_control_helper()->UpdateState(
  44. GRPC_CHANNEL_TRANSIENT_FAILURE, status,
  45. absl::make_unique<FailPicker>(status));
  46. }
  47. void ResetBackoffLocked() override {}
  48. void ShutdownLocked() override {}
  49. private:
  50. class FailPicker : public SubchannelPicker {
  51. public:
  52. explicit FailPicker(absl::Status status) : status_(status) {}
  53. PickResult Pick(PickArgs /*args*/) override {
  54. g_num_lb_picks.fetch_add(1);
  55. return PickResult::Fail(status_);
  56. }
  57. private:
  58. absl::Status status_;
  59. };
  60. };
  61. class FailLbConfig : public LoadBalancingPolicy::Config {
  62. public:
  63. const char* name() const override { return kFailPolicyName; }
  64. };
  65. class FailPolicyFactory : public LoadBalancingPolicyFactory {
  66. public:
  67. OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
  68. LoadBalancingPolicy::Args args) const override {
  69. return MakeOrphanable<FailPolicy>(std::move(args));
  70. }
  71. const char* name() const override { return kFailPolicyName; }
  72. RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
  73. const Json& /*json*/, grpc_error_handle* /*error*/) const override {
  74. return MakeRefCounted<FailLbConfig>();
  75. }
  76. };
  77. void RegisterFailPolicy() {
  78. LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
  79. absl::make_unique<FailPolicyFactory>());
  80. }
  81. } // namespace
  82. } // namespace grpc_core
  83. static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
  84. static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
  85. const char* test_name,
  86. grpc_channel_args* client_args,
  87. grpc_channel_args* server_args) {
  88. grpc_end2end_test_fixture f;
  89. gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
  90. f = config.create_fixture(client_args, server_args);
  91. config.init_server(&f, server_args);
  92. config.init_client(&f, client_args);
  93. return f;
  94. }
  95. static gpr_timespec n_seconds_from_now(int n) {
  96. return grpc_timeout_seconds_to_deadline(n);
  97. }
  98. static gpr_timespec five_seconds_from_now(void) {
  99. return n_seconds_from_now(5);
  100. }
  101. static void drain_cq(grpc_completion_queue* cq) {
  102. grpc_event ev;
  103. do {
  104. ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
  105. } while (ev.type != GRPC_QUEUE_SHUTDOWN);
  106. }
  107. static void shutdown_server(grpc_end2end_test_fixture* f) {
  108. if (!f->server) return;
  109. grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
  110. GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
  111. grpc_timeout_seconds_to_deadline(5),
  112. nullptr)
  113. .type == GRPC_OP_COMPLETE);
  114. grpc_server_destroy(f->server);
  115. f->server = nullptr;
  116. }
  117. static void shutdown_client(grpc_end2end_test_fixture* f) {
  118. if (!f->client) return;
  119. grpc_channel_destroy(f->client);
  120. f->client = nullptr;
  121. }
  122. static void end_test(grpc_end2end_test_fixture* f) {
  123. shutdown_server(f);
  124. shutdown_client(f);
  125. grpc_completion_queue_shutdown(f->cq);
  126. drain_cq(f->cq);
  127. grpc_completion_queue_destroy(f->cq);
  128. grpc_completion_queue_destroy(f->shutdown_cq);
  129. }
  130. // Tests that we retry properly when the LB policy fails the call before
  131. // it ever gets to the transport, even if recv_trailing_metadata isn't
  132. // started by the application until after the LB pick fails.
  133. // - 1 retry allowed for ABORTED status
  134. // - on first attempt, LB policy fails with ABORTED before application
  135. // starts recv_trailing_metadata op
  136. static void test_retry_lb_fail(grpc_end2end_test_config config) {
  137. grpc_call* c;
  138. grpc_op ops[6];
  139. grpc_op* op;
  140. grpc_metadata_array initial_metadata_recv;
  141. grpc_metadata_array trailing_metadata_recv;
  142. grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
  143. grpc_byte_buffer* request_payload =
  144. grpc_raw_byte_buffer_create(&request_payload_slice, 1);
  145. grpc_byte_buffer* response_payload_recv = nullptr;
  146. grpc_status_code status;
  147. grpc_call_error error;
  148. grpc_slice details;
  149. grpc_core::g_num_lb_picks.store(0, std::memory_order_relaxed);
  150. grpc_arg args[] = {
  151. grpc_channel_arg_integer_create(
  152. const_cast<char*>(GRPC_ARG_ENABLE_RETRIES), 1),
  153. grpc_channel_arg_string_create(
  154. const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
  155. const_cast<char*>(
  156. "{\n"
  157. " \"loadBalancingConfig\": [ {\n"
  158. " \"fail_lb\": {}\n"
  159. " } ],\n"
  160. " \"methodConfig\": [ {\n"
  161. " \"name\": [\n"
  162. " { \"service\": \"service\", \"method\": \"method\" }\n"
  163. " ],\n"
  164. " \"retryPolicy\": {\n"
  165. " \"maxAttempts\": 2,\n"
  166. " \"initialBackoff\": \"1s\",\n"
  167. " \"maxBackoff\": \"120s\",\n"
  168. " \"backoffMultiplier\": 1.6,\n"
  169. " \"retryableStatusCodes\": [ \"ABORTED\" ]\n"
  170. " }\n"
  171. " } ]\n"
  172. "}")),
  173. };
  174. grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
  175. grpc_end2end_test_fixture f =
  176. begin_test(config, "retry_lb_fail", &client_args, nullptr);
  177. cq_verifier* cqv = cq_verifier_create(f.cq);
  178. gpr_timespec deadline = five_seconds_from_now();
  179. c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
  180. grpc_slice_from_static_string("/service/method"),
  181. nullptr, deadline, nullptr);
  182. GPR_ASSERT(c);
  183. grpc_metadata_array_init(&initial_metadata_recv);
  184. grpc_metadata_array_init(&trailing_metadata_recv);
  185. memset(ops, 0, sizeof(ops));
  186. op = ops;
  187. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  188. op->data.send_initial_metadata.count = 0;
  189. op++;
  190. error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
  191. nullptr);
  192. GPR_ASSERT(GRPC_CALL_OK == error);
  193. CQ_EXPECT_COMPLETION(cqv, tag(1), false);
  194. cq_verify(cqv);
  195. memset(ops, 0, sizeof(ops));
  196. op = ops;
  197. op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  198. op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
  199. op->data.recv_status_on_client.status = &status;
  200. op->data.recv_status_on_client.status_details = &details;
  201. op++;
  202. error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
  203. nullptr);
  204. GPR_ASSERT(GRPC_CALL_OK == error);
  205. CQ_EXPECT_COMPLETION(cqv, tag(2), true);
  206. cq_verify(cqv);
  207. GPR_ASSERT(status == GRPC_STATUS_ABORTED);
  208. GPR_ASSERT(0 == grpc_slice_str_cmp(details, "LB pick failed"));
  209. grpc_slice_unref(details);
  210. grpc_metadata_array_destroy(&initial_metadata_recv);
  211. grpc_metadata_array_destroy(&trailing_metadata_recv);
  212. grpc_byte_buffer_destroy(request_payload);
  213. grpc_byte_buffer_destroy(response_payload_recv);
  214. grpc_call_unref(c);
  215. cq_verifier_destroy(cqv);
  216. int num_picks = grpc_core::g_num_lb_picks.load(std::memory_order_relaxed);
  217. gpr_log(GPR_INFO, "NUM LB PICKS: %d", num_picks);
  218. GPR_ASSERT(num_picks == 2);
  219. end_test(&f);
  220. config.tear_down_data(&f);
  221. }
  222. void retry_lb_fail(grpc_end2end_test_config config) {
  223. GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
  224. test_retry_lb_fail(config);
  225. }
  226. void retry_lb_fail_pre_init(void) { grpc_core::RegisterFailPolicy(); }