1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168 |
- /*
- *
- * Copyright 2016 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #include <string.h>
- #include <grpc/grpc.h>
- #include <grpc/grpc_security.h>
- #include <grpc/support/alloc.h>
- #include <grpc/support/log.h>
- #include <grpc/support/string_util.h>
- #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
- #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
- #include "src/core/lib/channel/channel_args.h"
- #include "src/core/lib/gpr/env.h"
- #include "src/core/lib/iomgr/executor.h"
- #include "src/core/lib/iomgr/resolve_address.h"
- #include "src/core/lib/iomgr/tcp_client.h"
- #include "src/core/lib/iomgr/timer.h"
- #include "src/core/lib/iomgr/timer_manager.h"
- #include "src/core/lib/resolver/server_address.h"
- #include "src/core/lib/slice/slice_internal.h"
- #include "src/core/lib/surface/channel.h"
- #include "src/core/lib/surface/server.h"
- #include "src/libfuzzer/libfuzzer_macro.h"
- #include "test/core/end2end/data/ssl_test_data.h"
- #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
- #include "test/core/util/passthru_endpoint.h"
- static constexpr uint64_t kMaxAdvanceTimeMicros =
- 31536000000000; // 1 year (24 * 365 * 3600 * 1000000)
- // Applicable when simulating channel actions. Prevents overflows.
- static constexpr uint64_t kMaxWaitMs =
- 31536000000; // 1 year (24 * 365 * 3600 * 1000)
- // Applicable when simulating channel actions. Prevents overflows.
- static constexpr uint64_t kMaxAddNReadableBytes = (2 * 1024 * 1024); // 2GB
- // Applicable when simulating channel actions. Prevents overflows.
- static constexpr uint64_t kMaxAddNWritableBytes = (2 * 1024 * 1024); // 2GB
- ////////////////////////////////////////////////////////////////////////////////
- // logging
- bool squelch = true;
- bool leak_check = true;
- static void dont_log(gpr_log_func_args* /*args*/) {}
- ////////////////////////////////////////////////////////////////////////////////
- // global state
- static gpr_timespec g_now;
- static grpc_server* g_server;
- static grpc_channel* g_channel;
- static grpc_resource_quota* g_resource_quota;
- static std::vector<grpc_passthru_endpoint_channel_action> g_channel_actions;
- static std::atomic<bool> g_channel_force_delete{false};
- extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
- static gpr_timespec now_impl(gpr_clock_type clock_type) {
- GPR_ASSERT(clock_type != GPR_TIMESPAN);
- gpr_timespec ts = g_now;
- ts.clock_type = clock_type;
- return ts;
- }
- ////////////////////////////////////////////////////////////////////////////////
- // dns resolution
- typedef struct addr_req {
- grpc_timer timer;
- char* addr;
- grpc_closure* on_done;
- std::unique_ptr<grpc_core::ServerAddressList>* addresses;
- } addr_req;
- static void finish_resolve(void* arg, grpc_error_handle error) {
- addr_req* r = static_cast<addr_req*>(arg);
- if (error == GRPC_ERROR_NONE && 0 == strcmp(r->addr, "server")) {
- *r->addresses = absl::make_unique<grpc_core::ServerAddressList>();
- grpc_resolved_address fake_resolved_address;
- memset(&fake_resolved_address, 0, sizeof(fake_resolved_address));
- fake_resolved_address.len = 0;
- (*r->addresses)->emplace_back(fake_resolved_address, nullptr);
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, GRPC_ERROR_NONE);
- } else {
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done,
- GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
- "Resolution failed", &error, 1));
- }
- gpr_free(r->addr);
- delete r;
- }
- namespace {
- class FuzzerDNSResolver : public grpc_core::DNSResolver {
- public:
- class FuzzerDNSRequest : public grpc_core::DNSResolver::Request {
- public:
- FuzzerDNSRequest(
- absl::string_view name,
- std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
- on_done)
- : name_(std::string(name)), on_done_(std::move(on_done)) {}
- void Start() override {
- Ref().release(); // ref held by timer callback
- grpc_timer_init(
- &timer_,
- grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
- GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx));
- }
- // cancellation not implemented
- void Orphan() override { Unref(); }
- private:
- static void FinishResolve(void* arg, grpc_error_handle error) {
- FuzzerDNSRequest* self = static_cast<FuzzerDNSRequest*>(arg);
- if (error == GRPC_ERROR_NONE && self->name_ == "server") {
- std::vector<grpc_resolved_address> addrs;
- grpc_resolved_address addr;
- addr.len = 0;
- addrs.push_back(addr);
- self->on_done_(std::move(addrs));
- } else {
- self->on_done_(absl::UnknownError("Resolution failed"));
- }
- self->Unref();
- }
- const std::string name_;
- const std::function<void(
- absl::StatusOr<std::vector<grpc_resolved_address>>)>
- on_done_;
- grpc_timer timer_;
- };
- // Gets the singleton instance, possibly creating it first
- static FuzzerDNSResolver* GetOrCreate() {
- static FuzzerDNSResolver* instance = new FuzzerDNSResolver();
- return instance;
- }
- grpc_core::OrphanablePtr<grpc_core::DNSResolver::Request> ResolveName(
- absl::string_view name, absl::string_view /* default_port */,
- grpc_pollset_set* /* interested_parties */,
- std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
- on_done) override {
- return grpc_core::MakeOrphanable<FuzzerDNSRequest>(name,
- std::move(on_done));
- }
- absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
- absl::string_view /* name */,
- absl::string_view /* default_port */) override {
- GPR_ASSERT(0);
- }
- };
- } // namespace
- grpc_ares_request* my_dns_lookup_ares(
- const char* /*dns_server*/, const char* addr, const char* /*default_port*/,
- grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
- std::unique_ptr<grpc_core::ServerAddressList>* addresses,
- std::unique_ptr<grpc_core::ServerAddressList>* /*balancer_addresses*/,
- char** /*service_config_json*/, int /*query_timeout*/) {
- addr_req* r = new addr_req();
- r->addr = gpr_strdup(addr);
- r->on_done = on_done;
- r->addresses = addresses;
- grpc_timer_init(
- &r->timer,
- grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
- GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx));
- return nullptr;
- }
- static void my_cancel_ares_request(grpc_ares_request* request) {
- GPR_ASSERT(request == nullptr);
- }
- ////////////////////////////////////////////////////////////////////////////////
- // client connection
- static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
- gpr_timespec deadline);
- typedef struct {
- grpc_timer timer;
- grpc_closure* closure;
- grpc_endpoint** ep;
- gpr_timespec deadline;
- } future_connect;
- static void do_connect(void* arg, grpc_error_handle error) {
- future_connect* fc = static_cast<future_connect*>(arg);
- if (error != GRPC_ERROR_NONE) {
- *fc->ep = nullptr;
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_REF(error));
- } else if (g_server != nullptr) {
- grpc_endpoint* client;
- grpc_endpoint* server;
- grpc_passthru_endpoint_create(&client, &server, nullptr, true);
- *fc->ep = client;
- start_scheduling_grpc_passthru_endpoint_channel_effects(client,
- g_channel_actions);
- grpc_core::Server* core_server = grpc_core::Server::FromC(g_server);
- grpc_transport* transport = grpc_create_chttp2_transport(
- core_server->channel_args(), server, false);
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "SetupTransport",
- core_server->SetupTransport(transport, nullptr,
- core_server->channel_args(), nullptr)));
- grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
- grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_NONE);
- } else {
- sched_connect(fc->closure, fc->ep, fc->deadline);
- }
- gpr_free(fc);
- }
- static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
- gpr_timespec deadline) {
- if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) {
- *ep = nullptr;
- grpc_core::ExecCtx::Run(
- DEBUG_LOCATION, closure,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connect deadline exceeded"));
- return;
- }
- future_connect* fc = static_cast<future_connect*>(gpr_malloc(sizeof(*fc)));
- fc->closure = closure;
- fc->ep = ep;
- fc->deadline = deadline;
- grpc_timer_init(
- &fc->timer,
- grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
- GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
- }
- static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
- grpc_pollset_set* /*interested_parties*/,
- const grpc_channel_args* /*channel_args*/,
- const grpc_resolved_address* /*addr*/,
- grpc_core::Timestamp deadline) {
- sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
- }
- grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect};
- ////////////////////////////////////////////////////////////////////////////////
- // test driver
- class Validator {
- public:
- explicit Validator(std::function<void(bool)> impl) : impl_(std::move(impl)) {}
- virtual ~Validator() {}
- void Run(bool success) {
- impl_(success);
- delete this;
- }
- private:
- std::function<void(bool)> impl_;
- };
- Validator* MakeValidator(std::function<void(bool)> impl) {
- return new Validator(std::move(impl));
- }
- static Validator* AssertSuccessAndDecrement(int* counter) {
- return MakeValidator([counter](bool success) {
- GPR_ASSERT(success);
- --*counter;
- });
- }
- static Validator* Decrement(int* counter) {
- return MakeValidator([counter](bool) { --*counter; });
- }
- static Validator* ValidateConnectivityWatch(gpr_timespec deadline,
- int* counter) {
- return MakeValidator([deadline, counter](bool success) {
- if (!success) {
- auto now = gpr_now(deadline.clock_type);
- GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0);
- }
- --*counter;
- });
- }
- static void free_non_null(void* p) {
- GPR_ASSERT(p != nullptr);
- gpr_free(p);
- }
- enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
- class Call : public std::enable_shared_from_this<Call> {
- public:
- explicit Call(CallType type) : type_(type) {
- grpc_metadata_array_init(&recv_initial_metadata_);
- grpc_metadata_array_init(&recv_trailing_metadata_);
- grpc_call_details_init(&call_details_);
- }
- ~Call();
- CallType type() const { return type_; }
- bool done() const {
- if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) {
- return true;
- }
- if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true;
- return false;
- }
- void Shutdown() {
- if (call_ != nullptr) {
- grpc_call_cancel(call_, nullptr);
- type_ = CallType::TOMBSTONED;
- }
- }
- void SetCall(grpc_call* call) {
- GPR_ASSERT(call_ == nullptr);
- call_ = call;
- }
- grpc_call* call() const { return call_; }
- void RequestCall(grpc_server* server, grpc_completion_queue* cq) {
- auto* v = FinishedRequestCall();
- grpc_call_error error = grpc_server_request_call(
- server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v);
- if (error != GRPC_CALL_OK) {
- v->Run(false);
- }
- }
- void* Allocate(size_t size) {
- void* p = gpr_malloc(size);
- free_pointers_.push_back(p);
- return p;
- }
- template <typename T>
- T* AllocArray(size_t elems) {
- return static_cast<T*>(Allocate(sizeof(T) * elems));
- }
- template <typename T>
- T* NewCopy(T value) {
- T* p = AllocArray<T>(1);
- new (p) T(value);
- return p;
- }
- template <typename T>
- grpc_slice ReadSlice(const T& s) {
- grpc_slice slice = grpc_slice_from_cpp_string(s.value());
- unref_slices_.push_back(slice);
- return slice;
- }
- template <typename M>
- grpc_metadata_array ReadMetadata(const M& metadata) {
- grpc_metadata* m = AllocArray<grpc_metadata>(metadata.size());
- for (int i = 0; i < metadata.size(); ++i) {
- m[i].key = ReadSlice(metadata[i].key());
- m[i].value = ReadSlice(metadata[i].value());
- }
- return grpc_metadata_array{static_cast<size_t>(metadata.size()),
- static_cast<size_t>(metadata.size()), m};
- }
- absl::optional<grpc_op> ReadOp(
- const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok,
- uint8_t* batch_ops, std::vector<std::function<void()>>* unwinders) {
- grpc_op op;
- memset(&op, 0, sizeof(op));
- switch (batch_op.op_case()) {
- case api_fuzzer::BatchOp::OP_NOT_SET:
- /* invalid value */
- return {};
- case api_fuzzer::BatchOp::kSendInitialMetadata:
- if (sent_initial_metadata_) {
- *batch_is_ok = false;
- } else {
- sent_initial_metadata_ = true;
- op.op = GRPC_OP_SEND_INITIAL_METADATA;
- *batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA;
- auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata());
- op.data.send_initial_metadata.count = ary.count;
- op.data.send_initial_metadata.metadata = ary.metadata;
- }
- break;
- case api_fuzzer::BatchOp::kSendMessage:
- op.op = GRPC_OP_SEND_MESSAGE;
- if (send_message_ != nullptr) {
- *batch_is_ok = false;
- } else {
- *batch_ops |= 1 << GRPC_OP_SEND_MESSAGE;
- std::vector<grpc_slice> slices;
- for (const auto& m : batch_op.send_message().message()) {
- slices.push_back(ReadSlice(m));
- }
- send_message_ = op.data.send_message.send_message =
- grpc_raw_byte_buffer_create(slices.data(), slices.size());
- unwinders->push_back([this]() {
- grpc_byte_buffer_destroy(send_message_);
- send_message_ = nullptr;
- });
- }
- break;
- case api_fuzzer::BatchOp::kSendCloseFromClient:
- op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- *batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT;
- break;
- case api_fuzzer::BatchOp::kSendStatusFromServer: {
- op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- *batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER;
- auto ary = ReadMetadata(batch_op.send_status_from_server().metadata());
- op.data.send_status_from_server.trailing_metadata_count = ary.count;
- op.data.send_status_from_server.trailing_metadata = ary.metadata;
- op.data.send_status_from_server.status = static_cast<grpc_status_code>(
- batch_op.send_status_from_server().status_code());
- op.data.send_status_from_server.status_details =
- batch_op.send_status_from_server().has_status_details()
- ? NewCopy(ReadSlice(
- batch_op.send_status_from_server().status_details()))
- : nullptr;
- } break;
- case api_fuzzer::BatchOp::kReceiveInitialMetadata:
- if (enqueued_recv_initial_metadata_) {
- *batch_is_ok = false;
- } else {
- enqueued_recv_initial_metadata_ = true;
- op.op = GRPC_OP_RECV_INITIAL_METADATA;
- *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
- op.data.recv_initial_metadata.recv_initial_metadata =
- &recv_initial_metadata_;
- }
- break;
- case api_fuzzer::BatchOp::kReceiveMessage:
- // Allow only one active pending_recv_message_op to exist. Otherwise if
- // the previous enqueued recv_message_op is not complete by the time
- // we get here, then under certain conditions, enqueing this op will
- // over-write the internal call->receiving_buffer maintained by grpc
- // leading to a memory leak.
- if (call_closed_ || pending_recv_message_op_) {
- *batch_is_ok = false;
- } else {
- op.op = GRPC_OP_RECV_MESSAGE;
- *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
- pending_recv_message_op_ = true;
- op.data.recv_message.recv_message = &recv_message_;
- unwinders->push_back([this]() { pending_recv_message_op_ = false; });
- }
- break;
- case api_fuzzer::BatchOp::kReceiveStatusOnClient:
- op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op.data.recv_status_on_client.status = &status_;
- op.data.recv_status_on_client.trailing_metadata =
- &recv_trailing_metadata_;
- op.data.recv_status_on_client.status_details = &recv_status_details_;
- *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
- break;
- case api_fuzzer::BatchOp::kReceiveCloseOnServer:
- op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- *batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER;
- op.data.recv_close_on_server.cancelled = &cancelled_;
- break;
- }
- op.reserved = nullptr;
- op.flags = batch_op.flags();
- return op;
- }
- Validator* FinishedBatchValidator(uint8_t has_ops) {
- ++pending_ops_;
- auto self = shared_from_this();
- return MakeValidator([self, has_ops](bool /*success*/) {
- --self->pending_ops_;
- if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
- self->pending_recv_message_op_ = false;
- if (self->recv_message_ != nullptr) {
- grpc_byte_buffer_destroy(self->recv_message_);
- self->recv_message_ = nullptr;
- }
- }
- if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
- grpc_byte_buffer_destroy(self->send_message_);
- self->send_message_ = nullptr;
- }
- if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) ||
- (has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) {
- self->call_closed_ = true;
- }
- });
- }
- Validator* FinishedRequestCall() {
- ++pending_ops_;
- auto self = shared_from_this();
- return MakeValidator([self](bool success) {
- GPR_ASSERT(self->pending_ops_ > 0);
- --self->pending_ops_;
- if (success) {
- GPR_ASSERT(self->call_ != nullptr);
- self->type_ = CallType::SERVER;
- } else {
- self->type_ = CallType::TOMBSTONED;
- }
- });
- }
- private:
- CallType type_;
- grpc_call* call_ = nullptr;
- grpc_byte_buffer* recv_message_ = nullptr;
- grpc_status_code status_;
- grpc_metadata_array recv_initial_metadata_{0, 0, nullptr};
- grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr};
- grpc_slice recv_status_details_ = grpc_empty_slice();
- // set by receive close on server, unset here to trigger
- // msan if misused
- int cancelled_;
- int pending_ops_ = 0;
- bool sent_initial_metadata_ = false;
- bool enqueued_recv_initial_metadata_ = false;
- grpc_call_details call_details_{};
- grpc_byte_buffer* send_message_ = nullptr;
- bool call_closed_ = false;
- bool pending_recv_message_op_ = false;
- std::vector<void*> free_pointers_;
- std::vector<grpc_slice> unref_slices_;
- };
- static std::vector<std::shared_ptr<Call>> g_calls;
- static size_t g_active_call = 0;
- static Call* ActiveCall() {
- while (!g_calls.empty()) {
- if (g_active_call >= g_calls.size()) {
- g_active_call = 0;
- }
- if (g_calls[g_active_call] != nullptr && !g_calls[g_active_call]->done()) {
- return g_calls[g_active_call].get();
- }
- g_calls.erase(g_calls.begin() + g_active_call);
- }
- return nullptr;
- }
- Call::~Call() {
- if (call_ != nullptr) {
- grpc_call_unref(call_);
- }
- grpc_slice_unref(recv_status_details_);
- grpc_call_details_destroy(&call_details_);
- for (auto* p : free_pointers_) {
- gpr_free(p);
- }
- for (auto s : unref_slices_) {
- grpc_slice_unref(s);
- }
- if (recv_message_ != nullptr) {
- grpc_byte_buffer_destroy(recv_message_);
- recv_message_ = nullptr;
- }
- grpc_metadata_array_destroy(&recv_initial_metadata_);
- grpc_metadata_array_destroy(&recv_trailing_metadata_);
- }
- template <typename ChannelArgContainer>
- grpc_channel_args* ReadArgs(const ChannelArgContainer& args) {
- grpc_channel_args* res =
- static_cast<grpc_channel_args*>(gpr_malloc(sizeof(grpc_channel_args)));
- res->args =
- static_cast<grpc_arg*>(gpr_malloc(sizeof(grpc_arg) * args.size()));
- int j = 0;
- for (int i = 0; i < args.size(); i++) {
- switch (args[i].value_case()) {
- case api_fuzzer::ChannelArg::kStr:
- res->args[j].type = GRPC_ARG_STRING;
- res->args[j].value.string = gpr_strdup(args[i].str().c_str());
- break;
- case api_fuzzer::ChannelArg::kI:
- res->args[j].type = GRPC_ARG_INTEGER;
- res->args[j].value.integer = args[i].i();
- break;
- case api_fuzzer::ChannelArg::kResourceQuota:
- if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue;
- grpc_resource_quota_ref(g_resource_quota);
- res->args[j].type = GRPC_ARG_POINTER;
- res->args[j].value.pointer.p = g_resource_quota;
- res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable();
- break;
- case api_fuzzer::ChannelArg::VALUE_NOT_SET:
- res->args[j].type = GRPC_ARG_INTEGER;
- res->args[j].value.integer = 0;
- break;
- }
- res->args[j].key = gpr_strdup(args[i].key().c_str());
- ++j;
- }
- res->num_args = j;
- return res;
- }
- static const char* ReadCredArtifact(
- const api_fuzzer::CredArtifact& artifact,
- std::initializer_list<const char*> builtins) {
- switch (artifact.type_case()) {
- case api_fuzzer::CredArtifact::kCustom:
- return artifact.custom().c_str();
- case api_fuzzer::CredArtifact::kBuiltin:
- if (artifact.builtin() < 0) return nullptr;
- if (artifact.builtin() < static_cast<int>(builtins.size())) {
- return *(builtins.begin() + artifact.builtin());
- }
- return nullptr;
- case api_fuzzer::CredArtifact::TYPE_NOT_SET:
- return nullptr;
- }
- }
- static grpc_channel_credentials* ReadSslChannelCreds(
- const api_fuzzer::SslChannelCreds& creds) {
- const char* root_certs =
- creds.has_root_certs()
- ? ReadCredArtifact(creds.root_certs(), {test_root_cert})
- : nullptr;
- const char* private_key =
- creds.has_private_key()
- ? ReadCredArtifact(creds.private_key(),
- {test_server1_key, test_self_signed_client_key,
- test_signed_client_key})
- : nullptr;
- const char* certs =
- creds.has_certs()
- ? ReadCredArtifact(creds.certs(),
- {test_server1_cert, test_self_signed_client_cert,
- test_signed_client_cert})
- : nullptr;
- grpc_ssl_pem_key_cert_pair key_cert_pair = {private_key, certs};
- return grpc_ssl_credentials_create(
- root_certs,
- private_key != nullptr && certs != nullptr ? &key_cert_pair : nullptr,
- nullptr, nullptr);
- }
- static grpc_call_credentials* ReadCallCreds(
- const api_fuzzer::CallCreds& creds) {
- switch (creds.type_case()) {
- case api_fuzzer::CallCreds::TYPE_NOT_SET:
- return nullptr;
- case api_fuzzer::CallCreds::kNull:
- return nullptr;
- case api_fuzzer::CallCreds::kCompositeCallCreds: {
- grpc_call_credentials* out = nullptr;
- for (const auto& child_creds :
- creds.composite_call_creds().call_creds()) {
- grpc_call_credentials* child = ReadCallCreds(child_creds);
- if (child != nullptr) {
- if (out == nullptr) {
- out = child;
- } else {
- auto* composed =
- grpc_composite_call_credentials_create(out, child, nullptr);
- grpc_call_credentials_release(child);
- grpc_call_credentials_release(out);
- out = composed;
- }
- }
- }
- return out;
- }
- case api_fuzzer::CallCreds::kAccessToken:
- return grpc_access_token_credentials_create(creds.access_token().c_str(),
- nullptr);
- case api_fuzzer::CallCreds::kIam:
- return grpc_google_iam_credentials_create(
- creds.iam().auth_token().c_str(), creds.iam().auth_selector().c_str(),
- nullptr);
- /* TODO(ctiller): more cred types here */
- }
- }
- static grpc_channel_credentials* ReadChannelCreds(
- const api_fuzzer::ChannelCreds& creds) {
- switch (creds.type_case()) {
- case api_fuzzer::ChannelCreds::TYPE_NOT_SET:
- return nullptr;
- case api_fuzzer::ChannelCreds::kSslChannelCreds:
- return ReadSslChannelCreds(creds.ssl_channel_creds());
- case api_fuzzer::ChannelCreds::kCompositeChannelCreds: {
- const auto& comp = creds.composite_channel_creds();
- grpc_channel_credentials* c1 =
- comp.has_channel_creds() ? ReadChannelCreds(comp.channel_creds())
- : nullptr;
- grpc_call_credentials* c2 =
- comp.has_call_creds() ? ReadCallCreds(comp.call_creds()) : nullptr;
- if (c1 != nullptr && c2 != nullptr) {
- grpc_channel_credentials* out =
- grpc_composite_channel_credentials_create(c1, c2, nullptr);
- grpc_channel_credentials_release(c1);
- grpc_call_credentials_release(c2);
- return out;
- } else if (c1 != nullptr) {
- return c1;
- } else if (c2 != nullptr) {
- grpc_call_credentials_release(c2);
- return nullptr;
- } else {
- return nullptr;
- }
- GPR_UNREACHABLE_CODE(return nullptr);
- }
- case api_fuzzer::ChannelCreds::kNull:
- return nullptr;
- }
- }
- DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
- grpc_test_only_set_slice_hash_seed(0);
- char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER");
- if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
- gpr_free(grpc_trace_fuzzer);
- grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
- g_now = {1, 0, GPR_CLOCK_MONOTONIC};
- grpc_core::TestOnlySetProcessEpoch(g_now);
- gpr_now_impl = now_impl;
- grpc_init();
- grpc_timer_manager_set_threading(false);
- {
- grpc_core::ExecCtx exec_ctx;
- grpc_core::Executor::SetThreadingAll(false);
- }
- grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate());
- grpc_dns_lookup_ares = my_dns_lookup_ares;
- grpc_cancel_ares_request = my_cancel_ares_request;
- GPR_ASSERT(g_channel == nullptr);
- GPR_ASSERT(g_server == nullptr);
- bool server_shutdown = false;
- int pending_server_shutdowns = 0;
- int pending_channel_watches = 0;
- int pending_pings = 0;
- g_resource_quota = grpc_resource_quota_create("api_fuzzer");
- grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
- int action_index = 0;
- auto no_more_actions = [&]() { action_index = msg.actions_size(); };
- auto poll_cq = [&]() -> bool {
- grpc_event ev = grpc_completion_queue_next(
- cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
- switch (ev.type) {
- case GRPC_OP_COMPLETE: {
- static_cast<Validator*>(ev.tag)->Run(ev.success);
- break;
- }
- case GRPC_QUEUE_TIMEOUT:
- break;
- case GRPC_QUEUE_SHUTDOWN:
- return true;
- }
- return false;
- };
- while (action_index < msg.actions_size() || g_channel != nullptr ||
- g_server != nullptr || pending_channel_watches > 0 ||
- pending_pings > 0 || ActiveCall() != nullptr) {
- if (action_index == msg.actions_size()) {
- if (g_channel != nullptr) {
- grpc_channel_destroy(g_channel);
- g_channel = nullptr;
- }
- if (g_server != nullptr) {
- if (!server_shutdown) {
- grpc_server_shutdown_and_notify(
- g_server, cq,
- AssertSuccessAndDecrement(&pending_server_shutdowns));
- server_shutdown = true;
- pending_server_shutdowns++;
- } else if (pending_server_shutdowns == 0) {
- grpc_server_destroy(g_server);
- g_server = nullptr;
- }
- }
- for (auto& call : g_calls) {
- if (call == nullptr) continue;
- if (call->type() == CallType::PENDING_SERVER) continue;
- call->Shutdown();
- }
- g_now = gpr_time_add(
- g_now,
- gpr_time_from_seconds(
- std::max<int64_t>(1, static_cast<int64_t>(kMaxWaitMs / 1000)),
- GPR_TIMESPAN));
- grpc_timer_manager_tick();
- GPR_ASSERT(!poll_cq());
- continue;
- }
- grpc_timer_manager_tick();
- if (g_channel_force_delete.exchange(false) && g_channel) {
- grpc_channel_destroy(g_channel);
- g_channel = nullptr;
- g_channel_actions.clear();
- }
- const api_fuzzer::Action& action = msg.actions(action_index);
- action_index++;
- switch (action.type_case()) {
- case api_fuzzer::Action::TYPE_NOT_SET:
- no_more_actions();
- break;
- // tickle completion queue
- case api_fuzzer::Action::kPollCq: {
- GPR_ASSERT(!poll_cq());
- break;
- }
- // increment global time
- case api_fuzzer::Action::kAdvanceTime: {
- g_now = gpr_time_add(
- g_now, gpr_time_from_micros(
- std::min(static_cast<uint64_t>(action.advance_time()),
- kMaxAdvanceTimeMicros),
- GPR_TIMESPAN));
- break;
- }
- // create an insecure channel
- case api_fuzzer::Action::kCreateChannel: {
- if (!action.create_channel().channel_actions_size() ||
- g_channel != nullptr) {
- no_more_actions();
- } else {
- grpc_channel_args* args =
- ReadArgs(action.create_channel().channel_args());
- grpc_channel_credentials* creds =
- action.create_channel().has_channel_creds()
- ? ReadChannelCreds(action.create_channel().channel_creds())
- : grpc_insecure_credentials_create();
- g_channel = grpc_channel_create(
- action.create_channel().target().c_str(), creds, args);
- grpc_channel_credentials_release(creds);
- g_channel_actions.clear();
- for (int i = 0; i < action.create_channel().channel_actions_size();
- i++) {
- const api_fuzzer::ChannelAction& channel_action =
- action.create_channel().channel_actions(i);
- g_channel_actions.push_back({
- std::min(channel_action.wait_ms(), kMaxWaitMs),
- std::min(channel_action.add_n_bytes_writable(),
- kMaxAddNWritableBytes),
- std::min(channel_action.add_n_bytes_readable(),
- kMaxAddNReadableBytes),
- });
- }
- GPR_ASSERT(g_channel != nullptr);
- g_channel_force_delete = false;
- {
- grpc_core::ExecCtx exec_ctx;
- grpc_channel_args_destroy(args);
- }
- }
- break;
- }
- // destroy a channel
- case api_fuzzer::Action::kCloseChannel: {
- if (g_channel != nullptr) {
- grpc_channel_destroy(g_channel);
- g_channel = nullptr;
- } else {
- no_more_actions();
- }
- break;
- }
- // bring up a server
- case api_fuzzer::Action::kCreateServer: {
- if (g_server == nullptr) {
- grpc_channel_args* args =
- ReadArgs(action.create_server().channel_args());
- g_server = grpc_server_create(args, nullptr);
- GPR_ASSERT(g_server != nullptr);
- {
- grpc_core::ExecCtx exec_ctx;
- grpc_channel_args_destroy(args);
- }
- grpc_server_register_completion_queue(g_server, cq, nullptr);
- grpc_server_start(g_server);
- server_shutdown = false;
- GPR_ASSERT(pending_server_shutdowns == 0);
- } else {
- no_more_actions();
- }
- break;
- }
- // begin server shutdown
- case api_fuzzer::Action::kShutdownServer: {
- if (g_server != nullptr) {
- grpc_server_shutdown_and_notify(
- g_server, cq,
- AssertSuccessAndDecrement(&pending_server_shutdowns));
- pending_server_shutdowns++;
- server_shutdown = true;
- } else {
- no_more_actions();
- }
- break;
- }
- // cancel all calls if shutdown
- case api_fuzzer::Action::kCancelAllCallsIfShutdown: {
- if (g_server != nullptr && server_shutdown) {
- grpc_server_cancel_all_calls(g_server);
- } else {
- no_more_actions();
- }
- break;
- }
- // destroy server
- case api_fuzzer::Action::kDestroyServerIfReady: {
- if (g_server != nullptr && server_shutdown &&
- pending_server_shutdowns == 0) {
- grpc_server_destroy(g_server);
- g_server = nullptr;
- } else {
- no_more_actions();
- }
- break;
- }
- // check connectivity
- case api_fuzzer::Action::kCheckConnectivity: {
- if (g_channel != nullptr) {
- grpc_channel_check_connectivity_state(g_channel,
- action.check_connectivity());
- } else {
- no_more_actions();
- }
- break;
- }
- // watch connectivity
- case api_fuzzer::Action::kWatchConnectivity: {
- if (g_channel != nullptr) {
- grpc_connectivity_state st =
- grpc_channel_check_connectivity_state(g_channel, 0);
- if (st != GRPC_CHANNEL_SHUTDOWN) {
- gpr_timespec deadline =
- gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(action.watch_connectivity(),
- GPR_TIMESPAN));
- grpc_channel_watch_connectivity_state(
- g_channel, st, deadline, cq,
- ValidateConnectivityWatch(deadline, &pending_channel_watches));
- pending_channel_watches++;
- }
- } else {
- no_more_actions();
- }
- break;
- }
- // create a call
- case api_fuzzer::Action::kCreateCall: {
- bool ok = true;
- if (g_channel == nullptr) ok = false;
- // If the active call is a server call, then use it as the parent call
- // to exercise the propagation logic.
- Call* parent_call = ActiveCall();
- if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
- parent_call = nullptr;
- }
- g_calls.emplace_back(new Call(CallType::CLIENT));
- grpc_slice method =
- g_calls.back()->ReadSlice(action.create_call().method());
- if (GRPC_SLICE_LENGTH(method) == 0) {
- ok = false;
- }
- grpc_slice host =
- g_calls.back()->ReadSlice(action.create_call().host());
- gpr_timespec deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(action.create_call().timeout(), GPR_TIMESPAN));
- if (ok) {
- g_calls.back()->SetCall(grpc_channel_create_call(
- g_channel, parent_call == nullptr ? nullptr : parent_call->call(),
- action.create_call().propagation_mask(), cq, method, &host,
- deadline, nullptr));
- } else {
- g_calls.pop_back();
- no_more_actions();
- }
- break;
- }
- // switch the 'current' call
- case api_fuzzer::Action::kChangeActiveCall: {
- g_active_call++;
- ActiveCall();
- break;
- }
- // queue some ops on a call
- case api_fuzzer::Action::kQueueBatch: {
- auto* active_call = ActiveCall();
- if (active_call == nullptr ||
- active_call->type() == CallType::PENDING_SERVER ||
- active_call->call() == nullptr) {
- no_more_actions();
- break;
- }
- const auto& batch = action.queue_batch().operations();
- if (batch.size() > 6) {
- no_more_actions();
- break;
- }
- std::vector<grpc_op> ops;
- bool ok = true;
- uint8_t has_ops = 0;
- std::vector<std::function<void()>> unwinders;
- for (const auto& batch_op : batch) {
- auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
- if (!op.has_value()) continue;
- ops.push_back(*op);
- }
- if (g_channel == nullptr) ok = false;
- if (ok) {
- auto* v = active_call->FinishedBatchValidator(has_ops);
- grpc_call_error error = grpc_call_start_batch(
- active_call->call(), ops.data(), ops.size(), v, nullptr);
- if (error != GRPC_CALL_OK) {
- v->Run(false);
- }
- } else {
- no_more_actions();
- for (auto& unwind : unwinders) {
- unwind();
- }
- }
- break;
- }
- // cancel current call
- case api_fuzzer::Action::kCancelCall: {
- auto* active_call = ActiveCall();
- if (active_call != nullptr && active_call->call() != nullptr) {
- grpc_call_cancel(active_call->call(), nullptr);
- } else {
- no_more_actions();
- }
- break;
- }
- // get a calls peer
- case api_fuzzer::Action::kGetPeer: {
- auto* active_call = ActiveCall();
- if (active_call != nullptr && active_call->call() != nullptr) {
- free_non_null(grpc_call_get_peer(active_call->call()));
- } else {
- no_more_actions();
- }
- break;
- }
- // get a channels target
- case api_fuzzer::Action::kGetTarget: {
- if (g_channel != nullptr) {
- free_non_null(grpc_channel_get_target(g_channel));
- } else {
- no_more_actions();
- }
- break;
- }
- // send a ping on a channel
- case api_fuzzer::Action::kPing: {
- if (g_channel != nullptr) {
- pending_pings++;
- grpc_channel_ping(g_channel, cq, Decrement(&pending_pings), nullptr);
- } else {
- no_more_actions();
- }
- break;
- }
- // enable a tracer
- case api_fuzzer::Action::kEnableTracer: {
- grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
- break;
- }
- // disable a tracer
- case api_fuzzer::Action::kDisableTracer: {
- grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
- break;
- }
- // request a server call
- case api_fuzzer::Action::kRequestCall: {
- if (g_server == nullptr) {
- no_more_actions();
- break;
- }
- g_calls.emplace_back(new Call(CallType::PENDING_SERVER));
- g_calls.back()->RequestCall(g_server, cq);
- break;
- }
- // destroy a call
- case api_fuzzer::Action::kDestroyCall: {
- auto* active_call = ActiveCall();
- if (active_call != nullptr &&
- active_call->type() != CallType::PENDING_SERVER &&
- active_call->call() != nullptr) {
- g_calls[g_active_call]->Shutdown();
- } else {
- no_more_actions();
- }
- break;
- }
- // resize the buffer pool
- case api_fuzzer::Action::kResizeResourceQuota: {
- grpc_resource_quota_resize(g_resource_quota,
- action.resize_resource_quota());
- break;
- }
- }
- }
- GPR_ASSERT(g_channel == nullptr);
- GPR_ASSERT(g_server == nullptr);
- GPR_ASSERT(ActiveCall() == nullptr);
- GPR_ASSERT(g_calls.empty());
- grpc_completion_queue_shutdown(cq);
- GPR_ASSERT(poll_cq());
- grpc_completion_queue_destroy(cq);
- grpc_resource_quota_unref(g_resource_quota);
- grpc_shutdown_blocking();
- }
|