123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- //
- // Copyright 2017 gRPC authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- //
- #include <stdio.h>
- #include <string.h>
- #include <grpc/byte_buffer.h>
- #include <grpc/grpc.h>
- #include <grpc/support/alloc.h>
- #include <grpc/support/log.h>
- #include <grpc/support/string_util.h>
- #include <grpc/support/time.h>
- #include "src/core/lib/channel/channel_args.h"
- #include "src/core/lib/channel/channel_stack.h"
- #include "src/core/lib/channel/channel_stack_builder.h"
- #include "src/core/lib/config/core_configuration.h"
- #include "src/core/lib/gpr/string.h"
- #include "src/core/lib/gpr/useful.h"
- #include "src/core/lib/iomgr/exec_ctx.h"
- #include "src/core/lib/surface/channel_init.h"
- #include "src/core/lib/transport/error_utils.h"
- #include "test/core/end2end/cq_verifier.h"
- #include "test/core/end2end/end2end_tests.h"
- #include "test/core/end2end/tests/cancel_test_helpers.h"
- static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
- static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
- const char* test_name,
- grpc_channel_args* client_args,
- grpc_channel_args* server_args) {
- grpc_end2end_test_fixture f;
- gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
- f = config.create_fixture(client_args, server_args);
- config.init_server(&f, server_args);
- config.init_client(&f, client_args);
- return f;
- }
- static gpr_timespec n_seconds_from_now(int n) {
- return grpc_timeout_seconds_to_deadline(n);
- }
- static gpr_timespec five_seconds_from_now(void) {
- return n_seconds_from_now(5);
- }
- static void drain_cq(grpc_completion_queue* cq) {
- grpc_event ev;
- do {
- ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
- } while (ev.type != GRPC_QUEUE_SHUTDOWN);
- }
- static void shutdown_server(grpc_end2end_test_fixture* f) {
- if (!f->server) return;
- grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
- GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
- grpc_timeout_seconds_to_deadline(5),
- nullptr)
- .type == GRPC_OP_COMPLETE);
- grpc_server_destroy(f->server);
- f->server = nullptr;
- }
- static void shutdown_client(grpc_end2end_test_fixture* f) {
- if (!f->client) return;
- grpc_channel_destroy(f->client);
- f->client = nullptr;
- }
- static void end_test(grpc_end2end_test_fixture* f) {
- shutdown_server(f);
- shutdown_client(f);
- grpc_completion_queue_shutdown(f->cq);
- drain_cq(f->cq);
- grpc_completion_queue_destroy(f->cq);
- grpc_completion_queue_destroy(f->shutdown_cq);
- }
- // Tests transparent retries when the call was never sent out on the wire.
- static void test_retry_transparent_goaway(grpc_end2end_test_config config) {
- grpc_call* c;
- grpc_call* s;
- grpc_op ops[6];
- grpc_op* op;
- grpc_metadata_array initial_metadata_recv;
- grpc_metadata_array trailing_metadata_recv;
- grpc_metadata_array request_metadata_recv;
- grpc_call_details call_details;
- grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
- grpc_slice response_payload_slice = grpc_slice_from_static_string("bar");
- grpc_byte_buffer* request_payload =
- grpc_raw_byte_buffer_create(&request_payload_slice, 1);
- grpc_byte_buffer* response_payload =
- grpc_raw_byte_buffer_create(&response_payload_slice, 1);
- grpc_byte_buffer* request_payload_recv = nullptr;
- grpc_byte_buffer* response_payload_recv = nullptr;
- grpc_status_code status;
- grpc_call_error error;
- grpc_slice details;
- int was_cancelled = 2;
- char* peer;
- grpc_end2end_test_fixture f =
- begin_test(config, "retry_transparent_goaway", nullptr, nullptr);
- cq_verifier* cqv = cq_verifier_create(f.cq);
- gpr_timespec deadline = five_seconds_from_now();
- c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
- grpc_slice_from_static_string("/service/method"),
- nullptr, deadline, nullptr);
- GPR_ASSERT(c);
- peer = grpc_call_get_peer(c);
- GPR_ASSERT(peer != nullptr);
- gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
- gpr_free(peer);
- grpc_metadata_array_init(&initial_metadata_recv);
- grpc_metadata_array_init(&trailing_metadata_recv);
- grpc_metadata_array_init(&request_metadata_recv);
- grpc_call_details_init(&call_details);
- grpc_slice status_details = grpc_slice_from_static_string("xyz");
- // Start a batch containing send ops.
- memset(ops, 0, sizeof(ops));
- op = ops;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op++;
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message = request_payload;
- op++;
- op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- op++;
- error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
- nullptr);
- GPR_ASSERT(GRPC_CALL_OK == error);
- // Start a batch containing recv ops.
- memset(ops, 0, sizeof(ops));
- op = ops;
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &response_payload_recv;
- op++;
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
- op++;
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
- op->data.recv_status_on_client.status = &status;
- op->data.recv_status_on_client.status_details = &details;
- op++;
- error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(2),
- nullptr);
- GPR_ASSERT(GRPC_CALL_OK == error);
- // Client send ops should now complete.
- CQ_EXPECT_COMPLETION(cqv, tag(1), true);
- cq_verify(cqv);
- // Server should get a call.
- error =
- grpc_server_request_call(f.server, &s, &call_details,
- &request_metadata_recv, f.cq, f.cq, tag(101));
- GPR_ASSERT(GRPC_CALL_OK == error);
- CQ_EXPECT_COMPLETION(cqv, tag(101), true);
- cq_verify(cqv);
- // Server receives the request.
- memset(ops, 0, sizeof(ops));
- op = ops;
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message.recv_message = &request_payload_recv;
- op++;
- error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(102),
- nullptr);
- GPR_ASSERT(GRPC_CALL_OK == error);
- CQ_EXPECT_COMPLETION(cqv, tag(102), true);
- cq_verify(cqv);
- // Server sends a response with status OK.
- memset(ops, 0, sizeof(ops));
- op = ops;
- op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- op->data.recv_close_on_server.cancelled = &was_cancelled;
- op++;
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = 0;
- op++;
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message.send_message = response_payload;
- op++;
- op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- op->data.send_status_from_server.trailing_metadata_count = 0;
- op->data.send_status_from_server.status = GRPC_STATUS_OK;
- op->data.send_status_from_server.status_details = &status_details;
- op++;
- error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(103),
- nullptr);
- GPR_ASSERT(GRPC_CALL_OK == error);
- // In principle, the server batch should complete before the client
- // recv ops batch, but in the proxy fixtures, there are multiple threads
- // involved, so the completion order tends to be a little racy.
- CQ_EXPECT_COMPLETION(cqv, tag(103), true);
- CQ_EXPECT_COMPLETION(cqv, tag(2), true);
- cq_verify(cqv);
- GPR_ASSERT(status == GRPC_STATUS_OK);
- GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
- GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/service/method"));
- GPR_ASSERT(0 == call_details.flags);
- GPR_ASSERT(was_cancelled == 0);
- GPR_ASSERT(byte_buffer_eq_slice(request_payload_recv, request_payload_slice));
- GPR_ASSERT(
- byte_buffer_eq_slice(response_payload_recv, response_payload_slice));
- // Make sure the "grpc-previous-rpc-attempts" header was NOT sent, since
- // we don't do that for transparent retries.
- for (size_t i = 0; i < request_metadata_recv.count; ++i) {
- GPR_ASSERT(!grpc_slice_eq(
- request_metadata_recv.metadata[i].key,
- grpc_slice_from_static_string("grpc-previous-rpc-attempts")));
- }
- grpc_slice_unref(details);
- grpc_metadata_array_destroy(&initial_metadata_recv);
- grpc_metadata_array_destroy(&trailing_metadata_recv);
- grpc_metadata_array_destroy(&request_metadata_recv);
- grpc_call_details_destroy(&call_details);
- grpc_byte_buffer_destroy(request_payload);
- grpc_byte_buffer_destroy(response_payload);
- grpc_byte_buffer_destroy(request_payload_recv);
- grpc_byte_buffer_destroy(response_payload_recv);
- grpc_call_unref(c);
- grpc_call_unref(s);
- cq_verifier_destroy(cqv);
- end_test(&f);
- config.tear_down_data(&f);
- }
- namespace {
- // A filter that, for the first call it sees, will fail all batches except
- // for cancellations, so that the call fails with an error whose
- // StreamNetworkState is kNotSeenByServer.
- // All subsequent calls are allowed through without failures.
- class FailFirstCallFilter {
- public:
- static grpc_channel_filter kFilterVtable;
- private:
- class CallData {
- public:
- static grpc_error_handle Init(grpc_call_element* elem,
- const grpc_call_element_args* args) {
- new (elem->call_data) CallData(args);
- return GRPC_ERROR_NONE;
- }
- static void Destroy(grpc_call_element* elem,
- const grpc_call_final_info* /*final_info*/,
- grpc_closure* /*ignored*/) {
- auto* calld = static_cast<CallData*>(elem->call_data);
- calld->~CallData();
- }
- static void StartTransportStreamOpBatch(
- grpc_call_element* elem, grpc_transport_stream_op_batch* batch) {
- auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
- auto* calld = static_cast<CallData*>(elem->call_data);
- if (!chand->seen_call_) {
- calld->fail_ = true;
- chand->seen_call_ = true;
- }
- if (calld->fail_) {
- if (batch->recv_trailing_metadata) {
- batch->payload->recv_trailing_metadata.recv_trailing_metadata->Set(
- grpc_core::GrpcStreamNetworkState(),
- grpc_core::GrpcStreamNetworkState::kNotSeenByServer);
- }
- if (!batch->cancel_stream) {
- grpc_transport_stream_op_batch_finish_with_failure(
- batch,
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "FailFirstCallFilter failing batch"),
- GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAVAILABLE),
- calld->call_combiner_);
- return;
- }
- }
- grpc_call_next_op(elem, batch);
- }
- private:
- explicit CallData(const grpc_call_element_args* args)
- : call_combiner_(args->call_combiner) {}
- grpc_core::CallCombiner* call_combiner_;
- bool fail_ = false;
- };
- static grpc_error_handle Init(grpc_channel_element* elem,
- grpc_channel_element_args* /*args*/) {
- new (elem->channel_data) FailFirstCallFilter();
- return GRPC_ERROR_NONE;
- }
- static void Destroy(grpc_channel_element* elem) {
- auto* chand = static_cast<FailFirstCallFilter*>(elem->channel_data);
- chand->~FailFirstCallFilter();
- }
- bool seen_call_ = false;
- };
- grpc_channel_filter FailFirstCallFilter::kFilterVtable = {
- CallData::StartTransportStreamOpBatch,
- nullptr,
- grpc_channel_next_op,
- sizeof(CallData),
- CallData::Init,
- grpc_call_stack_ignore_set_pollset_or_pollset_set,
- CallData::Destroy,
- sizeof(FailFirstCallFilter),
- Init,
- Destroy,
- grpc_channel_next_get_info,
- "FailFirstCallFilter",
- };
- } // namespace
- void retry_transparent_goaway(grpc_end2end_test_config config) {
- GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
- grpc_core::CoreConfiguration::RunWithSpecialConfiguration(
- [](grpc_core::CoreConfiguration::Builder* builder) {
- grpc_core::BuildCoreConfiguration(builder);
- builder->channel_init()->RegisterStage(
- GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY + 1,
- [](grpc_core::ChannelStackBuilder* builder) {
- // Skip on proxy (which explicitly disables retries).
- const grpc_channel_args* args = builder->channel_args();
- if (!grpc_channel_args_find_bool(args, GRPC_ARG_ENABLE_RETRIES,
- true)) {
- return true;
- }
- // Install filter.
- builder->PrependFilter(&FailFirstCallFilter::kFilterVtable,
- nullptr);
- return true;
- });
- },
- [config] { test_retry_transparent_goaway(config); });
- }
- void retry_transparent_goaway_pre_init(void) {}
|