retry_transparent_goaway.cc 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. //
  2. // Copyright 2017 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. //
  16. #include <stdio.h>
  17. #include <string.h>
  18. #include <grpc/byte_buffer.h>
  19. #include <grpc/grpc.h>
  20. #include <grpc/support/alloc.h>
  21. #include <grpc/support/log.h>
  22. #include <grpc/support/string_util.h>
  23. #include <grpc/support/time.h>
  24. #include "src/core/lib/channel/channel_args.h"
  25. #include "src/core/lib/channel/channel_stack.h"
  26. #include "src/core/lib/channel/channel_stack_builder.h"
  27. #include "src/core/lib/config/core_configuration.h"
  28. #include "src/core/lib/gpr/string.h"
  29. #include "src/core/lib/gpr/useful.h"
  30. #include "src/core/lib/iomgr/exec_ctx.h"
  31. #include "src/core/lib/surface/channel_init.h"
  32. #include "src/core/lib/transport/error_utils.h"
  33. #include "test/core/end2end/cq_verifier.h"
  34. #include "test/core/end2end/end2end_tests.h"
  35. #include "test/core/end2end/tests/cancel_test_helpers.h"
  36. static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
  37. static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
  38. const char* test_name,
  39. grpc_channel_args* client_args,
  40. grpc_channel_args* server_args) {
  41. grpc_end2end_test_fixture f;
  42. gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
  43. f = config.create_fixture(client_args, server_args);
  44. config.init_server(&f, server_args);
  45. config.init_client(&f, client_args);
  46. return f;
  47. }
  48. static gpr_timespec n_seconds_from_now(int n) {
  49. return grpc_timeout_seconds_to_deadline(n);
  50. }
  51. static gpr_timespec five_seconds_from_now(void) {
  52. return n_seconds_from_now(5);
  53. }
  54. static void drain_cq(grpc_completion_queue* cq) {
  55. grpc_event ev;
  56. do {
  57. ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
  58. } while (ev.type != GRPC_QUEUE_SHUTDOWN);
  59. }
  60. static void shutdown_server(grpc_end2end_test_fixture* f) {
  61. if (!f->server) return;
  62. grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
  63. GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
  64. grpc_timeout_seconds_to_deadline(5),
  65. nullptr)
  66. .type == GRPC_OP_COMPLETE);
  67. grpc_server_destroy(f->server);
  68. f->server = nullptr;
  69. }
  70. static void shutdown_client(grpc_end2end_test_fixture* f) {
  71. if (!f->client) return;
  72. grpc_channel_destroy(f->client);
  73. f->client = nullptr;
  74. }
  75. static void end_test(grpc_end2end_test_fixture* f) {
  76. shutdown_server(f);
  77. shutdown_client(f);
  78. grpc_completion_queue_shutdown(f->cq);
  79. drain_cq(f->cq);
  80. grpc_completion_queue_destroy(f->cq);
  81. grpc_completion_queue_destroy(f->shutdown_cq);
  82. }
  83. // Tests transparent retries when the call was never sent out on the wire.
  84. static void test_retry_transparent_goaway(grpc_end2end_test_config config) {
  85. grpc_call* c;
  86. grpc_call* s;
  87. grpc_op ops[6];
  88. grpc_op* op;
  89. grpc_metadata_array initial_metadata_recv;
  90. grpc_metadata_array trailing_metadata_recv;
  91. grpc_metadata_array request_metadata_recv;
  92. grpc_call_details call_details;
  93. grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
  94. grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
  95. grpc_byte_buffer* request_payload =
  96. grpc_raw_byte_buffer_create(&request_payload_slice, 1);
  97. grpc_byte_buffer* response_payload =
  98. grpc_raw_byte_buffer_create(&response_payload_slice, 1);
  99. grpc_byte_buffer* request_payload_recv = nullptr;
  100. grpc_byte_buffer* response_payload_recv = nullptr;
  101. grpc_status_code status;
  102. grpc_call_error error;
  103. grpc_slice details;
  104. int was_cancelled = 2;
  105. char* peer;
  106. grpc_end2end_test_fixture f =
  107. begin_test(config, "retry_transparent_goaway", nullptr, nullptr);
  108. cq_verifier* cqv = cq_verifier_create(f.cq);
  109. gpr_timespec deadline = five_seconds_from_now();
  110. c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
  111. grpc_slice_from_static_string("/service/method"),
  112. nullptr, deadline, nullptr);
  113. GPR_ASSERT(c);
  114. peer = grpc_call_get_peer(c);
  115. GPR_ASSERT(peer != nullptr);
  116. gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
  117. gpr_free(peer);
  118. grpc_metadata_array_init(&initial_metadata_recv);
  119. grpc_metadata_array_init(&trailing_metadata_recv);
  120. grpc_metadata_array_init(&request_metadata_recv);
  121. grpc_call_details_init(&call_details);
  122. grpc_slice status_details = grpc_slice_from_static_string("xyz");
  123. // Start a batch containing send ops.
  124. memset(ops, 0, sizeof(ops));
  125. op = ops;
  126. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  127. op->data.send_initial_metadata.count = 0;
  128. op++;
  129. op->op = GRPC_OP_SEND_MESSAGE;
  130. op->data.send_message.send_message = request_payload;
  131. op++;
  132. op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
  133. op++;
  134. error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
  135. nullptr);
  136. GPR_ASSERT(GRPC_CALL_OK == error);
  137. // Start a batch containing recv ops.
  138. memset(ops, 0, sizeof(ops));
  139. op = ops;
  140. op->op = GRPC_OP_RECV_MESSAGE;
  141. op->data.recv_message.recv_message = &response_payload_recv;
  142. op++;
  143. op->op = GRPC_OP_RECV_INITIAL_METADATA;
  144. op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
  145. op++;
  146. op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  147. op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
  148. op->data.recv_status_on_client.status = &status;
  149. op->data.recv_status_on_client.status_details = &details;
  150. op++;
  151. error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
  152. nullptr);
  153. GPR_ASSERT(GRPC_CALL_OK == error);
  154. // Client send ops should now complete.
  155. CQ_EXPECT_COMPLETION(cqv, tag(1), true);
  156. cq_verify(cqv);
  157. // Server should get a call.
  158. error =
  159. grpc_server_request_call(f.server, &s, &call_details,
  160. &request_metadata_recv, f.cq, f.cq, tag(101));
  161. GPR_ASSERT(GRPC_CALL_OK == error);
  162. CQ_EXPECT_COMPLETION(cqv, tag(101), true);
  163. cq_verify(cqv);
  164. // Server receives the request.
  165. memset(ops, 0, sizeof(ops));
  166. op = ops;
  167. op->op = GRPC_OP_RECV_MESSAGE;
  168. op->data.recv_message.recv_message = &request_payload_recv;
  169. op++;
  170. error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
  171. nullptr);
  172. GPR_ASSERT(GRPC_CALL_OK == error);
  173. CQ_EXPECT_COMPLETION(cqv, tag(102), true);
  174. cq_verify(cqv);
  175. // Server sends a response with status OK.
  176. memset(ops, 0, sizeof(ops));
  177. op = ops;
  178. op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  179. op->data.recv_close_on_server.cancelled = &was_cancelled;
  180. op++;
  181. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  182. op->data.send_initial_metadata.count = 0;
  183. op++;
  184. op->op = GRPC_OP_SEND_MESSAGE;
  185. op->data.send_message.send_message = response_payload;
  186. op++;
  187. op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  188. op->data.send_status_from_server.trailing_metadata_count = 0;
  189. op->data.send_status_from_server.status = GRPC_STATUS_OK;
  190. op->data.send_status_from_server.status_details = &status_details;
  191. op++;
  192. error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
  193. nullptr);
  194. GPR_ASSERT(GRPC_CALL_OK == error);
  195. // In principle, the server batch should complete before the client
  196. // recv ops batch, but in the proxy fixtures, there are multiple threads
  197. // involved, so the completion order tends to be a little racy.
  198. CQ_EXPECT_COMPLETION(cqv, tag(103), true);
  199. CQ_EXPECT_COMPLETION(cqv, tag(2), true);
  200. cq_verify(cqv);
  201. GPR_ASSERT(status == GRPC_STATUS_OK);
  202. GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
  203. GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
  204. GPR_ASSERT(0 == call_details.flags);
  205. GPR_ASSERT(was_cancelled == 0);
  206. GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
  207. GPR_ASSERT(
  208. byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
  209. // Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
  210. // we don't do that for transparent retries.
  211. for (size_t i = 0; i < request_metadata_recv.count; ++i) {
  212. GPR_ASSERT(!grpc_slice_eq(
  213. request_metadata_recv.metadata[i].key,
  214. grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
  215. }
  216. grpc_slice_unref(details);
  217. grpc_metadata_array_destroy(&initial_metadata_recv);
  218. grpc_metadata_array_destroy(&trailing_metadata_recv);
  219. grpc_metadata_array_destroy(&request_metadata_recv);
  220. grpc_call_details_destroy(&call_details);
  221. grpc_byte_buffer_destroy(request_payload);
  222. grpc_byte_buffer_destroy(response_payload);
  223. grpc_byte_buffer_destroy(request_payload_recv);
  224. grpc_byte_buffer_destroy(response_payload_recv);
  225. grpc_call_unref(c);
  226. grpc_call_unref(s);
  227. cq_verifier_destroy(cqv);
  228. end_test(&f);
  229. config.tear_down_data(&f);
  230. }
  231. namespace {
  232. // A filter that, for the first call it sees, will fail all batches except
  233. // for cancellations, so that the call fails with an error whose
  234. // StreamNetworkState is kNotSeenByServer.
  235. // All subsequent calls are allowed through without failures.
  236. class FailFirstCallFilter {
  237. public:
  238. static grpc_channel_filter kFilterVtable;
  239. private:
  240. class CallData {
  241. public:
  242. static grpc_error_handle Init(grpc_call_element* elem,
  243. const grpc_call_element_args* args) {
  244. new (elem->call_data) CallData(args);
  245. return GRPC_ERROR_NONE;
  246. }
  247. static void Destroy(grpc_call_element* elem,
  248. const grpc_call_final_info* /*final_info*/,
  249. grpc_closure* /*ignored*/) {
  250. auto* calld = static_cast<CallData*>(elem->call_data);
  251. calld->~CallData();
  252. }
  253. static void StartTransportStreamOpBatch(
  254. grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
  255. auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
  256. auto* calld = static_cast<CallData*>(elem->call_data);
  257. if (!chand->seen_call_) {
  258. calld->fail_ = true;
  259. chand->seen_call_ = true;
  260. }
  261. if (calld->fail_) {
  262. if (batch->recv_trailing_metadata) {
  263. batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
  264. grpc_core::GrpcStreamNetworkState(),
  265. grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
  266. }
  267. if (!batch->cancel_stream) {
  268. grpc_transport_stream_op_batch_finish_with_failure(
  269. batch,
  270. grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
  271. "FailFirstCallFilter failing batch"),
  272. GRPC_ERROR_INT_GRPC_STATUS,
  273. GRPC_STATUS_UNAVAILABLE),
  274. calld->call_combiner_);
  275. return;
  276. }
  277. }
  278. grpc_call_next_op(elem, batch);
  279. }
  280. private:
  281. explicit CallData(const grpc_call_element_args* args)
  282. : call_combiner_(args->call_combiner) {}
  283. grpc_core::CallCombiner* call_combiner_;
  284. bool fail_ = false;
  285. };
  286. static grpc_error_handle Init(grpc_channel_element* elem,
  287. grpc_channel_element_args* /*args*/) {
  288. new (elem->channel_data) FailFirstCallFilter();
  289. return GRPC_ERROR_NONE;
  290. }
  291. static void Destroy(grpc_channel_element* elem) {
  292. auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
  293. chand->~FailFirstCallFilter();
  294. }
  295. bool seen_call_ = false;
  296. };
  297. grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
  298. CallData::StartTransportStreamOpBatch,
  299. nullptr,
  300. grpc_channel_next_op,
  301. sizeof(CallData),
  302. CallData::Init,
  303. grpc_call_stack_ignore_set_pollset_or_pollset_set,
  304. CallData::Destroy,
  305. sizeof(FailFirstCallFilter),
  306. Init,
  307. Destroy,
  308. grpc_channel_next_get_info,
  309. "FailFirstCallFilter",
  310. };
  311. } // namespace
  312. void retry_transparent_goaway(grpc_end2end_test_config config) {
  313. GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
  314. grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
  315. [](grpc_core::CoreConfiguration::Builder* builder) {
  316. grpc_core::BuildCoreConfiguration(builder);
  317. builder->channel_init()->RegisterStage(
  318. GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
  319. [](grpc_core::ChannelStackBuilder* builder) {
  320. // Skip on proxy (which explicitly disables retries).
  321. const grpc_channel_args* args = builder->channel_args();
  322. if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
  323. true)) {
  324. return true;
  325. }
  326. // Install filter.
  327. builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
  328. nullptr);
  329. return true;
  330. });
  331. },
  332. [config] { test_retry_transparent_goaway(config); });
  333. }
  334. void retry_transparent_goaway_pre_init(void) {}