api_fuzzer.cc 39 KB


  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <string.h>
  19. #include <grpc/grpc.h>
  20. #include <grpc/grpc_security.h>
  21. #include <grpc/support/alloc.h>
  22. #include <grpc/support/log.h>
  23. #include <grpc/support/string_util.h>
  24. #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
  25. #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
  26. #include "src/core/lib/channel/channel_args.h"
  27. #include "src/core/lib/gpr/env.h"
  28. #include "src/core/lib/iomgr/executor.h"
  29. #include "src/core/lib/iomgr/resolve_address.h"
  30. #include "src/core/lib/iomgr/tcp_client.h"
  31. #include "src/core/lib/iomgr/timer.h"
  32. #include "src/core/lib/iomgr/timer_manager.h"
  33. #include "src/core/lib/resolver/server_address.h"
  34. #include "src/core/lib/slice/slice_internal.h"
  35. #include "src/core/lib/surface/channel.h"
  36. #include "src/core/lib/surface/server.h"
  37. #include "src/libfuzzer/libfuzzer_macro.h"
  38. #include "test/core/end2end/data/ssl_test_data.h"
  39. #include "test/core/end2end/fuzzers/api_fuzzer.pb.h"
  40. #include "test/core/util/passthru_endpoint.h"
  41. static constexpr uint64_t kMaxAdvanceTimeMicros =
  42. 31536000000000; // 1 year (24 * 365 * 3600 * 1000000)
  43. // Applicable when simulating channel actions. Prevents overflows.
  44. static constexpr uint64_t kMaxWaitMs =
  45. 31536000000; // 1 year (24 * 365 * 3600 * 1000)
  46. // Applicable when simulating channel actions. Prevents overflows.
  47. static constexpr uint64_t kMaxAddNReadableBytes = (2 * 1024 * 1024); // 2GB
  48. // Applicable when simulating channel actions. Prevents overflows.
  49. static constexpr uint64_t kMaxAddNWritableBytes = (2 * 1024 * 1024); // 2GB
  50. ////////////////////////////////////////////////////////////////////////////////
  51. // logging
  52. bool squelch = true;
  53. bool leak_check = true;
  54. static void dont_log(gpr_log_func_args* /*args*/) {}
  55. ////////////////////////////////////////////////////////////////////////////////
  56. // global state
  57. static gpr_timespec g_now;
  58. static grpc_server* g_server;
  59. static grpc_channel* g_channel;
  60. static grpc_resource_quota* g_resource_quota;
  61. static std::vector<grpc_passthru_endpoint_channel_action> g_channel_actions;
  62. static std::atomic<bool> g_channel_force_delete{false};
  63. extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
  64. static gpr_timespec now_impl(gpr_clock_type clock_type) {
  65. GPR_ASSERT(clock_type != GPR_TIMESPAN);
  66. gpr_timespec ts = g_now;
  67. ts.clock_type = clock_type;
  68. return ts;
  69. }
  70. ////////////////////////////////////////////////////////////////////////////////
  71. // dns resolution
  72. typedef struct addr_req {
  73. grpc_timer timer;
  74. char* addr;
  75. grpc_closure* on_done;
  76. std::unique_ptr<grpc_core::ServerAddressList>* addresses;
  77. } addr_req;
  78. static void finish_resolve(void* arg, grpc_error_handle error) {
  79. addr_req* r = static_cast<addr_req*>(arg);
  80. if (error == GRPC_ERROR_NONE && 0 == strcmp(r->addr, "server")) {
  81. *r->addresses = absl::make_unique<grpc_core::ServerAddressList>();
  82. grpc_resolved_address fake_resolved_address;
  83. memset(&fake_resolved_address, 0, sizeof(fake_resolved_address));
  84. fake_resolved_address.len = 0;
  85. (*r->addresses)->emplace_back(fake_resolved_address, nullptr);
  86. grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done, GRPC_ERROR_NONE);
  87. } else {
  88. grpc_core::ExecCtx::Run(DEBUG_LOCATION, r->on_done,
  89. GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
  90. "Resolution failed", &error, 1));
  91. }
  92. gpr_free(r->addr);
  93. delete r;
  94. }
  95. namespace {
  96. class FuzzerDNSResolver : public grpc_core::DNSResolver {
  97. public:
  98. class FuzzerDNSRequest : public grpc_core::DNSResolver::Request {
  99. public:
  100. FuzzerDNSRequest(
  101. absl::string_view name,
  102. std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
  103. on_done)
  104. : name_(std::string(name)), on_done_(std::move(on_done)) {}
  105. void Start() override {
  106. Ref().release(); // ref held by timer callback
  107. grpc_timer_init(
  108. &timer_,
  109. grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
  110. GRPC_CLOSURE_CREATE(FinishResolve, this, grpc_schedule_on_exec_ctx));
  111. }
  112. // cancellation not implemented
  113. void Orphan() override { Unref(); }
  114. private:
  115. static void FinishResolve(void* arg, grpc_error_handle error) {
  116. FuzzerDNSRequest* self = static_cast<FuzzerDNSRequest*>(arg);
  117. if (error == GRPC_ERROR_NONE && self->name_ == "server") {
  118. std::vector<grpc_resolved_address> addrs;
  119. grpc_resolved_address addr;
  120. addr.len = 0;
  121. addrs.push_back(addr);
  122. self->on_done_(std::move(addrs));
  123. } else {
  124. self->on_done_(absl::UnknownError("Resolution failed"));
  125. }
  126. self->Unref();
  127. }
  128. const std::string name_;
  129. const std::function<void(
  130. absl::StatusOr<std::vector<grpc_resolved_address>>)>
  131. on_done_;
  132. grpc_timer timer_;
  133. };
  134. // Gets the singleton instance, possibly creating it first
  135. static FuzzerDNSResolver* GetOrCreate() {
  136. static FuzzerDNSResolver* instance = new FuzzerDNSResolver();
  137. return instance;
  138. }
  139. grpc_core::OrphanablePtr<grpc_core::DNSResolver::Request> ResolveName(
  140. absl::string_view name, absl::string_view /* default_port */,
  141. grpc_pollset_set* /* interested_parties */,
  142. std::function<void(absl::StatusOr<std::vector<grpc_resolved_address>>)>
  143. on_done) override {
  144. return grpc_core::MakeOrphanable<FuzzerDNSRequest>(name,
  145. std::move(on_done));
  146. }
  147. absl::StatusOr<std::vector<grpc_resolved_address>> ResolveNameBlocking(
  148. absl::string_view /* name */,
  149. absl::string_view /* default_port */) override {
  150. GPR_ASSERT(0);
  151. }
  152. };
  153. } // namespace
  154. grpc_ares_request* my_dns_lookup_ares(
  155. const char* /*dns_server*/, const char* addr, const char* /*default_port*/,
  156. grpc_pollset_set* /*interested_parties*/, grpc_closure* on_done,
  157. std::unique_ptr<grpc_core::ServerAddressList>* addresses,
  158. std::unique_ptr<grpc_core::ServerAddressList>* /*balancer_addresses*/,
  159. char** /*service_config_json*/, int /*query_timeout*/) {
  160. addr_req* r = new addr_req();
  161. r->addr = gpr_strdup(addr);
  162. r->on_done = on_done;
  163. r->addresses = addresses;
  164. grpc_timer_init(
  165. &r->timer,
  166. grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
  167. GRPC_CLOSURE_CREATE(finish_resolve, r, grpc_schedule_on_exec_ctx));
  168. return nullptr;
  169. }
  170. static void my_cancel_ares_request(grpc_ares_request* request) {
  171. GPR_ASSERT(request == nullptr);
  172. }
  173. ////////////////////////////////////////////////////////////////////////////////
  174. // client connection
  175. static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
  176. gpr_timespec deadline);
  177. typedef struct {
  178. grpc_timer timer;
  179. grpc_closure* closure;
  180. grpc_endpoint** ep;
  181. gpr_timespec deadline;
  182. } future_connect;
  183. static void do_connect(void* arg, grpc_error_handle error) {
  184. future_connect* fc = static_cast<future_connect*>(arg);
  185. if (error != GRPC_ERROR_NONE) {
  186. *fc->ep = nullptr;
  187. grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_REF(error));
  188. } else if (g_server != nullptr) {
  189. grpc_endpoint* client;
  190. grpc_endpoint* server;
  191. grpc_passthru_endpoint_create(&client, &server, nullptr, true);
  192. *fc->ep = client;
  193. start_scheduling_grpc_passthru_endpoint_channel_effects(client,
  194. g_channel_actions);
  195. grpc_core::Server* core_server = grpc_core::Server::FromC(g_server);
  196. grpc_transport* transport = grpc_create_chttp2_transport(
  197. core_server->channel_args(), server, false);
  198. GPR_ASSERT(GRPC_LOG_IF_ERROR(
  199. "SetupTransport",
  200. core_server->SetupTransport(transport, nullptr,
  201. core_server->channel_args(), nullptr)));
  202. grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr);
  203. grpc_core::ExecCtx::Run(DEBUG_LOCATION, fc->closure, GRPC_ERROR_NONE);
  204. } else {
  205. sched_connect(fc->closure, fc->ep, fc->deadline);
  206. }
  207. gpr_free(fc);
  208. }
  209. static void sched_connect(grpc_closure* closure, grpc_endpoint** ep,
  210. gpr_timespec deadline) {
  211. if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) {
  212. *ep = nullptr;
  213. grpc_core::ExecCtx::Run(
  214. DEBUG_LOCATION, closure,
  215. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connect deadline exceeded"));
  216. return;
  217. }
  218. future_connect* fc = static_cast<future_connect*>(gpr_malloc(sizeof(*fc)));
  219. fc->closure = closure;
  220. fc->ep = ep;
  221. fc->deadline = deadline;
  222. grpc_timer_init(
  223. &fc->timer,
  224. grpc_core::Duration::Seconds(1) + grpc_core::ExecCtx::Get()->Now(),
  225. GRPC_CLOSURE_CREATE(do_connect, fc, grpc_schedule_on_exec_ctx));
  226. }
  227. static void my_tcp_client_connect(grpc_closure* closure, grpc_endpoint** ep,
  228. grpc_pollset_set* /*interested_parties*/,
  229. const grpc_channel_args* /*channel_args*/,
  230. const grpc_resolved_address* /*addr*/,
  231. grpc_core::Timestamp deadline) {
  232. sched_connect(closure, ep, deadline.as_timespec(GPR_CLOCK_MONOTONIC));
  233. }
  234. grpc_tcp_client_vtable fuzz_tcp_client_vtable = {my_tcp_client_connect};
  235. ////////////////////////////////////////////////////////////////////////////////
  236. // test driver
  237. class Validator {
  238. public:
  239. explicit Validator(std::function<void(bool)> impl) : impl_(std::move(impl)) {}
  240. virtual ~Validator() {}
  241. void Run(bool success) {
  242. impl_(success);
  243. delete this;
  244. }
  245. private:
  246. std::function<void(bool)> impl_;
  247. };
  248. Validator* MakeValidator(std::function<void(bool)> impl) {
  249. return new Validator(std::move(impl));
  250. }
  251. static Validator* AssertSuccessAndDecrement(int* counter) {
  252. return MakeValidator([counter](bool success) {
  253. GPR_ASSERT(success);
  254. --*counter;
  255. });
  256. }
  257. static Validator* Decrement(int* counter) {
  258. return MakeValidator([counter](bool) { --*counter; });
  259. }
  260. static Validator* ValidateConnectivityWatch(gpr_timespec deadline,
  261. int* counter) {
  262. return MakeValidator([deadline, counter](bool success) {
  263. if (!success) {
  264. auto now = gpr_now(deadline.clock_type);
  265. GPR_ASSERT(gpr_time_cmp(now, deadline) >= 0);
  266. }
  267. --*counter;
  268. });
  269. }
  270. static void free_non_null(void* p) {
  271. GPR_ASSERT(p != nullptr);
  272. gpr_free(p);
  273. }
  274. enum class CallType { CLIENT, SERVER, PENDING_SERVER, TOMBSTONED };
  275. class Call : public std::enable_shared_from_this<Call> {
  276. public:
  277. explicit Call(CallType type) : type_(type) {
  278. grpc_metadata_array_init(&recv_initial_metadata_);
  279. grpc_metadata_array_init(&recv_trailing_metadata_);
  280. grpc_call_details_init(&call_details_);
  281. }
  282. ~Call();
  283. CallType type() const { return type_; }
  284. bool done() const {
  285. if ((type_ == CallType::TOMBSTONED || call_closed_) && pending_ops_ == 0) {
  286. return true;
  287. }
  288. if (call_ == nullptr && type() != CallType::PENDING_SERVER) return true;
  289. return false;
  290. }
  291. void Shutdown() {
  292. if (call_ != nullptr) {
  293. grpc_call_cancel(call_, nullptr);
  294. type_ = CallType::TOMBSTONED;
  295. }
  296. }
  297. void SetCall(grpc_call* call) {
  298. GPR_ASSERT(call_ == nullptr);
  299. call_ = call;
  300. }
  301. grpc_call* call() const { return call_; }
  302. void RequestCall(grpc_server* server, grpc_completion_queue* cq) {
  303. auto* v = FinishedRequestCall();
  304. grpc_call_error error = grpc_server_request_call(
  305. server, &call_, &call_details_, &recv_initial_metadata_, cq, cq, v);
  306. if (error != GRPC_CALL_OK) {
  307. v->Run(false);
  308. }
  309. }
  310. void* Allocate(size_t size) {
  311. void* p = gpr_malloc(size);
  312. free_pointers_.push_back(p);
  313. return p;
  314. }
  315. template <typename T>
  316. T* AllocArray(size_t elems) {
  317. return static_cast<T*>(Allocate(sizeof(T) * elems));
  318. }
  319. template <typename T>
  320. T* NewCopy(T value) {
  321. T* p = AllocArray<T>(1);
  322. new (p) T(value);
  323. return p;
  324. }
  325. template <typename T>
  326. grpc_slice ReadSlice(const T& s) {
  327. grpc_slice slice = grpc_slice_from_cpp_string(s.value());
  328. unref_slices_.push_back(slice);
  329. return slice;
  330. }
  331. template <typename M>
  332. grpc_metadata_array ReadMetadata(const M& metadata) {
  333. grpc_metadata* m = AllocArray<grpc_metadata>(metadata.size());
  334. for (int i = 0; i < metadata.size(); ++i) {
  335. m[i].key = ReadSlice(metadata[i].key());
  336. m[i].value = ReadSlice(metadata[i].value());
  337. }
  338. return grpc_metadata_array{static_cast<size_t>(metadata.size()),
  339. static_cast<size_t>(metadata.size()), m};
  340. }
  341. absl::optional<grpc_op> ReadOp(
  342. const api_fuzzer::BatchOp& batch_op, bool* batch_is_ok,
  343. uint8_t* batch_ops, std::vector<std::function<void()>>* unwinders) {
  344. grpc_op op;
  345. memset(&op, 0, sizeof(op));
  346. switch (batch_op.op_case()) {
  347. case api_fuzzer::BatchOp::OP_NOT_SET:
  348. /* invalid value */
  349. return {};
  350. case api_fuzzer::BatchOp::kSendInitialMetadata:
  351. if (sent_initial_metadata_) {
  352. *batch_is_ok = false;
  353. } else {
  354. sent_initial_metadata_ = true;
  355. op.op = GRPC_OP_SEND_INITIAL_METADATA;
  356. *batch_ops |= 1 << GRPC_OP_SEND_INITIAL_METADATA;
  357. auto ary = ReadMetadata(batch_op.send_initial_metadata().metadata());
  358. op.data.send_initial_metadata.count = ary.count;
  359. op.data.send_initial_metadata.metadata = ary.metadata;
  360. }
  361. break;
  362. case api_fuzzer::BatchOp::kSendMessage:
  363. op.op = GRPC_OP_SEND_MESSAGE;
  364. if (send_message_ != nullptr) {
  365. *batch_is_ok = false;
  366. } else {
  367. *batch_ops |= 1 << GRPC_OP_SEND_MESSAGE;
  368. std::vector<grpc_slice> slices;
  369. for (const auto& m : batch_op.send_message().message()) {
  370. slices.push_back(ReadSlice(m));
  371. }
  372. send_message_ = op.data.send_message.send_message =
  373. grpc_raw_byte_buffer_create(slices.data(), slices.size());
  374. unwinders->push_back([this]() {
  375. grpc_byte_buffer_destroy(send_message_);
  376. send_message_ = nullptr;
  377. });
  378. }
  379. break;
  380. case api_fuzzer::BatchOp::kSendCloseFromClient:
  381. op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
  382. *batch_ops |= 1 << GRPC_OP_SEND_CLOSE_FROM_CLIENT;
  383. break;
  384. case api_fuzzer::BatchOp::kSendStatusFromServer: {
  385. op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  386. *batch_ops |= 1 << GRPC_OP_SEND_STATUS_FROM_SERVER;
  387. auto ary = ReadMetadata(batch_op.send_status_from_server().metadata());
  388. op.data.send_status_from_server.trailing_metadata_count = ary.count;
  389. op.data.send_status_from_server.trailing_metadata = ary.metadata;
  390. op.data.send_status_from_server.status = static_cast<grpc_status_code>(
  391. batch_op.send_status_from_server().status_code());
  392. op.data.send_status_from_server.status_details =
  393. batch_op.send_status_from_server().has_status_details()
  394. ? NewCopy(ReadSlice(
  395. batch_op.send_status_from_server().status_details()))
  396. : nullptr;
  397. } break;
  398. case api_fuzzer::BatchOp::kReceiveInitialMetadata:
  399. if (enqueued_recv_initial_metadata_) {
  400. *batch_is_ok = false;
  401. } else {
  402. enqueued_recv_initial_metadata_ = true;
  403. op.op = GRPC_OP_RECV_INITIAL_METADATA;
  404. *batch_ops |= 1 << GRPC_OP_RECV_INITIAL_METADATA;
  405. op.data.recv_initial_metadata.recv_initial_metadata =
  406. &recv_initial_metadata_;
  407. }
  408. break;
  409. case api_fuzzer::BatchOp::kReceiveMessage:
  410. // Allow only one active pending_recv_message_op to exist. Otherwise if
  411. // the previous enqueued recv_message_op is not complete by the time
  412. // we get here, then under certain conditions, enqueing this op will
  413. // over-write the internal call->receiving_buffer maintained by grpc
  414. // leading to a memory leak.
  415. if (call_closed_ || pending_recv_message_op_) {
  416. *batch_is_ok = false;
  417. } else {
  418. op.op = GRPC_OP_RECV_MESSAGE;
  419. *batch_ops |= 1 << GRPC_OP_RECV_MESSAGE;
  420. pending_recv_message_op_ = true;
  421. op.data.recv_message.recv_message = &recv_message_;
  422. unwinders->push_back([this]() { pending_recv_message_op_ = false; });
  423. }
  424. break;
  425. case api_fuzzer::BatchOp::kReceiveStatusOnClient:
  426. op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
  427. op.data.recv_status_on_client.status = &status_;
  428. op.data.recv_status_on_client.trailing_metadata =
  429. &recv_trailing_metadata_;
  430. op.data.recv_status_on_client.status_details = &recv_status_details_;
  431. *batch_ops |= 1 << GRPC_OP_RECV_STATUS_ON_CLIENT;
  432. break;
  433. case api_fuzzer::BatchOp::kReceiveCloseOnServer:
  434. op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  435. *batch_ops |= 1 << GRPC_OP_RECV_CLOSE_ON_SERVER;
  436. op.data.recv_close_on_server.cancelled = &cancelled_;
  437. break;
  438. }
  439. op.reserved = nullptr;
  440. op.flags = batch_op.flags();
  441. return op;
  442. }
  443. Validator* FinishedBatchValidator(uint8_t has_ops) {
  444. ++pending_ops_;
  445. auto self = shared_from_this();
  446. return MakeValidator([self, has_ops](bool /*success*/) {
  447. --self->pending_ops_;
  448. if (has_ops & (1u << GRPC_OP_RECV_MESSAGE)) {
  449. self->pending_recv_message_op_ = false;
  450. if (self->recv_message_ != nullptr) {
  451. grpc_byte_buffer_destroy(self->recv_message_);
  452. self->recv_message_ = nullptr;
  453. }
  454. }
  455. if ((has_ops & (1u << GRPC_OP_SEND_MESSAGE))) {
  456. grpc_byte_buffer_destroy(self->send_message_);
  457. self->send_message_ = nullptr;
  458. }
  459. if ((has_ops & (1u << GRPC_OP_RECV_STATUS_ON_CLIENT)) ||
  460. (has_ops & (1u << GRPC_OP_RECV_CLOSE_ON_SERVER))) {
  461. self->call_closed_ = true;
  462. }
  463. });
  464. }
  465. Validator* FinishedRequestCall() {
  466. ++pending_ops_;
  467. auto self = shared_from_this();
  468. return MakeValidator([self](bool success) {
  469. GPR_ASSERT(self->pending_ops_ > 0);
  470. --self->pending_ops_;
  471. if (success) {
  472. GPR_ASSERT(self->call_ != nullptr);
  473. self->type_ = CallType::SERVER;
  474. } else {
  475. self->type_ = CallType::TOMBSTONED;
  476. }
  477. });
  478. }
  479. private:
  480. CallType type_;
  481. grpc_call* call_ = nullptr;
  482. grpc_byte_buffer* recv_message_ = nullptr;
  483. grpc_status_code status_;
  484. grpc_metadata_array recv_initial_metadata_{0, 0, nullptr};
  485. grpc_metadata_array recv_trailing_metadata_{0, 0, nullptr};
  486. grpc_slice recv_status_details_ = grpc_empty_slice();
  487. // set by receive close on server, unset here to trigger
  488. // msan if misused
  489. int cancelled_;
  490. int pending_ops_ = 0;
  491. bool sent_initial_metadata_ = false;
  492. bool enqueued_recv_initial_metadata_ = false;
  493. grpc_call_details call_details_{};
  494. grpc_byte_buffer* send_message_ = nullptr;
  495. bool call_closed_ = false;
  496. bool pending_recv_message_op_ = false;
  497. std::vector<void*> free_pointers_;
  498. std::vector<grpc_slice> unref_slices_;
  499. };
  500. static std::vector<std::shared_ptr<Call>> g_calls;
  501. static size_t g_active_call = 0;
  502. static Call* ActiveCall() {
  503. while (!g_calls.empty()) {
  504. if (g_active_call >= g_calls.size()) {
  505. g_active_call = 0;
  506. }
  507. if (g_calls[g_active_call] != nullptr && !g_calls[g_active_call]->done()) {
  508. return g_calls[g_active_call].get();
  509. }
  510. g_calls.erase(g_calls.begin() + g_active_call);
  511. }
  512. return nullptr;
  513. }
  514. Call::~Call() {
  515. if (call_ != nullptr) {
  516. grpc_call_unref(call_);
  517. }
  518. grpc_slice_unref(recv_status_details_);
  519. grpc_call_details_destroy(&call_details_);
  520. for (auto* p : free_pointers_) {
  521. gpr_free(p);
  522. }
  523. for (auto s : unref_slices_) {
  524. grpc_slice_unref(s);
  525. }
  526. if (recv_message_ != nullptr) {
  527. grpc_byte_buffer_destroy(recv_message_);
  528. recv_message_ = nullptr;
  529. }
  530. grpc_metadata_array_destroy(&recv_initial_metadata_);
  531. grpc_metadata_array_destroy(&recv_trailing_metadata_);
  532. }
  533. template <typename ChannelArgContainer>
  534. grpc_channel_args* ReadArgs(const ChannelArgContainer& args) {
  535. grpc_channel_args* res =
  536. static_cast<grpc_channel_args*>(gpr_malloc(sizeof(grpc_channel_args)));
  537. res->args =
  538. static_cast<grpc_arg*>(gpr_malloc(sizeof(grpc_arg) * args.size()));
  539. int j = 0;
  540. for (int i = 0; i < args.size(); i++) {
  541. switch (args[i].value_case()) {
  542. case api_fuzzer::ChannelArg::kStr:
  543. res->args[j].type = GRPC_ARG_STRING;
  544. res->args[j].value.string = gpr_strdup(args[i].str().c_str());
  545. break;
  546. case api_fuzzer::ChannelArg::kI:
  547. res->args[j].type = GRPC_ARG_INTEGER;
  548. res->args[j].value.integer = args[i].i();
  549. break;
  550. case api_fuzzer::ChannelArg::kResourceQuota:
  551. if (args[i].key() != GRPC_ARG_RESOURCE_QUOTA) continue;
  552. grpc_resource_quota_ref(g_resource_quota);
  553. res->args[j].type = GRPC_ARG_POINTER;
  554. res->args[j].value.pointer.p = g_resource_quota;
  555. res->args[j].value.pointer.vtable = grpc_resource_quota_arg_vtable();
  556. break;
  557. case api_fuzzer::ChannelArg::VALUE_NOT_SET:
  558. res->args[j].type = GRPC_ARG_INTEGER;
  559. res->args[j].value.integer = 0;
  560. break;
  561. }
  562. res->args[j].key = gpr_strdup(args[i].key().c_str());
  563. ++j;
  564. }
  565. res->num_args = j;
  566. return res;
  567. }
  568. static const char* ReadCredArtifact(
  569. const api_fuzzer::CredArtifact& artifact,
  570. std::initializer_list<const char*> builtins) {
  571. switch (artifact.type_case()) {
  572. case api_fuzzer::CredArtifact::kCustom:
  573. return artifact.custom().c_str();
  574. case api_fuzzer::CredArtifact::kBuiltin:
  575. if (artifact.builtin() < 0) return nullptr;
  576. if (artifact.builtin() < static_cast<int>(builtins.size())) {
  577. return *(builtins.begin() + artifact.builtin());
  578. }
  579. return nullptr;
  580. case api_fuzzer::CredArtifact::TYPE_NOT_SET:
  581. return nullptr;
  582. }
  583. }
  584. static grpc_channel_credentials* ReadSslChannelCreds(
  585. const api_fuzzer::SslChannelCreds& creds) {
  586. const char* root_certs =
  587. creds.has_root_certs()
  588. ? ReadCredArtifact(creds.root_certs(), {test_root_cert})
  589. : nullptr;
  590. const char* private_key =
  591. creds.has_private_key()
  592. ? ReadCredArtifact(creds.private_key(),
  593. {test_server1_key, test_self_signed_client_key,
  594. test_signed_client_key})
  595. : nullptr;
  596. const char* certs =
  597. creds.has_certs()
  598. ? ReadCredArtifact(creds.certs(),
  599. {test_server1_cert, test_self_signed_client_cert,
  600. test_signed_client_cert})
  601. : nullptr;
  602. grpc_ssl_pem_key_cert_pair key_cert_pair = {private_key, certs};
  603. return grpc_ssl_credentials_create(
  604. root_certs,
  605. private_key != nullptr && certs != nullptr ? &key_cert_pair : nullptr,
  606. nullptr, nullptr);
  607. }
  608. static grpc_call_credentials* ReadCallCreds(
  609. const api_fuzzer::CallCreds& creds) {
  610. switch (creds.type_case()) {
  611. case api_fuzzer::CallCreds::TYPE_NOT_SET:
  612. return nullptr;
  613. case api_fuzzer::CallCreds::kNull:
  614. return nullptr;
  615. case api_fuzzer::CallCreds::kCompositeCallCreds: {
  616. grpc_call_credentials* out = nullptr;
  617. for (const auto& child_creds :
  618. creds.composite_call_creds().call_creds()) {
  619. grpc_call_credentials* child = ReadCallCreds(child_creds);
  620. if (child != nullptr) {
  621. if (out == nullptr) {
  622. out = child;
  623. } else {
  624. auto* composed =
  625. grpc_composite_call_credentials_create(out, child, nullptr);
  626. grpc_call_credentials_release(child);
  627. grpc_call_credentials_release(out);
  628. out = composed;
  629. }
  630. }
  631. }
  632. return out;
  633. }
  634. case api_fuzzer::CallCreds::kAccessToken:
  635. return grpc_access_token_credentials_create(creds.access_token().c_str(),
  636. nullptr);
  637. case api_fuzzer::CallCreds::kIam:
  638. return grpc_google_iam_credentials_create(
  639. creds.iam().auth_token().c_str(), creds.iam().auth_selector().c_str(),
  640. nullptr);
  641. /* TODO(ctiller): more cred types here */
  642. }
  643. }
  644. static grpc_channel_credentials* ReadChannelCreds(
  645. const api_fuzzer::ChannelCreds& creds) {
  646. switch (creds.type_case()) {
  647. case api_fuzzer::ChannelCreds::TYPE_NOT_SET:
  648. return nullptr;
  649. case api_fuzzer::ChannelCreds::kSslChannelCreds:
  650. return ReadSslChannelCreds(creds.ssl_channel_creds());
  651. case api_fuzzer::ChannelCreds::kCompositeChannelCreds: {
  652. const auto& comp = creds.composite_channel_creds();
  653. grpc_channel_credentials* c1 =
  654. comp.has_channel_creds() ? ReadChannelCreds(comp.channel_creds())
  655. : nullptr;
  656. grpc_call_credentials* c2 =
  657. comp.has_call_creds() ? ReadCallCreds(comp.call_creds()) : nullptr;
  658. if (c1 != nullptr && c2 != nullptr) {
  659. grpc_channel_credentials* out =
  660. grpc_composite_channel_credentials_create(c1, c2, nullptr);
  661. grpc_channel_credentials_release(c1);
  662. grpc_call_credentials_release(c2);
  663. return out;
  664. } else if (c1 != nullptr) {
  665. return c1;
  666. } else if (c2 != nullptr) {
  667. grpc_call_credentials_release(c2);
  668. return nullptr;
  669. } else {
  670. return nullptr;
  671. }
  672. GPR_UNREACHABLE_CODE(return nullptr);
  673. }
  674. case api_fuzzer::ChannelCreds::kNull:
  675. return nullptr;
  676. }
  677. }
  678. DEFINE_PROTO_FUZZER(const api_fuzzer::Msg& msg) {
  679. grpc_test_only_set_slice_hash_seed(0);
  680. char* grpc_trace_fuzzer = gpr_getenv("GRPC_TRACE_FUZZER");
  681. if (squelch && grpc_trace_fuzzer == nullptr) gpr_set_log_function(dont_log);
  682. gpr_free(grpc_trace_fuzzer);
  683. grpc_set_tcp_client_impl(&fuzz_tcp_client_vtable);
  684. g_now = {1, 0, GPR_CLOCK_MONOTONIC};
  685. grpc_core::TestOnlySetProcessEpoch(g_now);
  686. gpr_now_impl = now_impl;
  687. grpc_init();
  688. grpc_timer_manager_set_threading(false);
  689. {
  690. grpc_core::ExecCtx exec_ctx;
  691. grpc_core::Executor::SetThreadingAll(false);
  692. }
  693. grpc_core::SetDNSResolver(FuzzerDNSResolver::GetOrCreate());
  694. grpc_dns_lookup_ares = my_dns_lookup_ares;
  695. grpc_cancel_ares_request = my_cancel_ares_request;
  696. GPR_ASSERT(g_channel == nullptr);
  697. GPR_ASSERT(g_server == nullptr);
  698. bool server_shutdown = false;
  699. int pending_server_shutdowns = 0;
  700. int pending_channel_watches = 0;
  701. int pending_pings = 0;
  702. g_resource_quota = grpc_resource_quota_create("api_fuzzer");
  703. grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
  704. int action_index = 0;
  705. auto no_more_actions = [&]() { action_index = msg.actions_size(); };
  706. auto poll_cq = [&]() -> bool {
  707. grpc_event ev = grpc_completion_queue_next(
  708. cq, gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
  709. switch (ev.type) {
  710. case GRPC_OP_COMPLETE: {
  711. static_cast<Validator*>(ev.tag)->Run(ev.success);
  712. break;
  713. }
  714. case GRPC_QUEUE_TIMEOUT:
  715. break;
  716. case GRPC_QUEUE_SHUTDOWN:
  717. return true;
  718. }
  719. return false;
  720. };
  721. while (action_index < msg.actions_size() || g_channel != nullptr ||
  722. g_server != nullptr || pending_channel_watches > 0 ||
  723. pending_pings > 0 || ActiveCall() != nullptr) {
  724. if (action_index == msg.actions_size()) {
  725. if (g_channel != nullptr) {
  726. grpc_channel_destroy(g_channel);
  727. g_channel = nullptr;
  728. }
  729. if (g_server != nullptr) {
  730. if (!server_shutdown) {
  731. grpc_server_shutdown_and_notify(
  732. g_server, cq,
  733. AssertSuccessAndDecrement(&pending_server_shutdowns));
  734. server_shutdown = true;
  735. pending_server_shutdowns++;
  736. } else if (pending_server_shutdowns == 0) {
  737. grpc_server_destroy(g_server);
  738. g_server = nullptr;
  739. }
  740. }
  741. for (auto& call : g_calls) {
  742. if (call == nullptr) continue;
  743. if (call->type() == CallType::PENDING_SERVER) continue;
  744. call->Shutdown();
  745. }
  746. g_now = gpr_time_add(
  747. g_now,
  748. gpr_time_from_seconds(
  749. std::max<int64_t>(1, static_cast<int64_t>(kMaxWaitMs / 1000)),
  750. GPR_TIMESPAN));
  751. grpc_timer_manager_tick();
  752. GPR_ASSERT(!poll_cq());
  753. continue;
  754. }
  755. grpc_timer_manager_tick();
  756. if (g_channel_force_delete.exchange(false) && g_channel) {
  757. grpc_channel_destroy(g_channel);
  758. g_channel = nullptr;
  759. g_channel_actions.clear();
  760. }
  761. const api_fuzzer::Action& action = msg.actions(action_index);
  762. action_index++;
  763. switch (action.type_case()) {
  764. case api_fuzzer::Action::TYPE_NOT_SET:
  765. no_more_actions();
  766. break;
  767. // tickle completion queue
  768. case api_fuzzer::Action::kPollCq: {
  769. GPR_ASSERT(!poll_cq());
  770. break;
  771. }
  772. // increment global time
  773. case api_fuzzer::Action::kAdvanceTime: {
  774. g_now = gpr_time_add(
  775. g_now, gpr_time_from_micros(
  776. std::min(static_cast<uint64_t>(action.advance_time()),
  777. kMaxAdvanceTimeMicros),
  778. GPR_TIMESPAN));
  779. break;
  780. }
  781. // create an insecure channel
  782. case api_fuzzer::Action::kCreateChannel: {
  783. if (!action.create_channel().channel_actions_size() ||
  784. g_channel != nullptr) {
  785. no_more_actions();
  786. } else {
  787. grpc_channel_args* args =
  788. ReadArgs(action.create_channel().channel_args());
  789. grpc_channel_credentials* creds =
  790. action.create_channel().has_channel_creds()
  791. ? ReadChannelCreds(action.create_channel().channel_creds())
  792. : grpc_insecure_credentials_create();
  793. g_channel = grpc_channel_create(
  794. action.create_channel().target().c_str(), creds, args);
  795. grpc_channel_credentials_release(creds);
  796. g_channel_actions.clear();
  797. for (int i = 0; i < action.create_channel().channel_actions_size();
  798. i++) {
  799. const api_fuzzer::ChannelAction& channel_action =
  800. action.create_channel().channel_actions(i);
  801. g_channel_actions.push_back({
  802. std::min(channel_action.wait_ms(), kMaxWaitMs),
  803. std::min(channel_action.add_n_bytes_writable(),
  804. kMaxAddNWritableBytes),
  805. std::min(channel_action.add_n_bytes_readable(),
  806. kMaxAddNReadableBytes),
  807. });
  808. }
  809. GPR_ASSERT(g_channel != nullptr);
  810. g_channel_force_delete = false;
  811. {
  812. grpc_core::ExecCtx exec_ctx;
  813. grpc_channel_args_destroy(args);
  814. }
  815. }
  816. break;
  817. }
  818. // destroy a channel
  819. case api_fuzzer::Action::kCloseChannel: {
  820. if (g_channel != nullptr) {
  821. grpc_channel_destroy(g_channel);
  822. g_channel = nullptr;
  823. } else {
  824. no_more_actions();
  825. }
  826. break;
  827. }
  828. // bring up a server
  829. case api_fuzzer::Action::kCreateServer: {
  830. if (g_server == nullptr) {
  831. grpc_channel_args* args =
  832. ReadArgs(action.create_server().channel_args());
  833. g_server = grpc_server_create(args, nullptr);
  834. GPR_ASSERT(g_server != nullptr);
  835. {
  836. grpc_core::ExecCtx exec_ctx;
  837. grpc_channel_args_destroy(args);
  838. }
  839. grpc_server_register_completion_queue(g_server, cq, nullptr);
  840. grpc_server_start(g_server);
  841. server_shutdown = false;
  842. GPR_ASSERT(pending_server_shutdowns == 0);
  843. } else {
  844. no_more_actions();
  845. }
  846. break;
  847. }
  848. // begin server shutdown
  849. case api_fuzzer::Action::kShutdownServer: {
  850. if (g_server != nullptr) {
  851. grpc_server_shutdown_and_notify(
  852. g_server, cq,
  853. AssertSuccessAndDecrement(&pending_server_shutdowns));
  854. pending_server_shutdowns++;
  855. server_shutdown = true;
  856. } else {
  857. no_more_actions();
  858. }
  859. break;
  860. }
  861. // cancel all calls if shutdown
  862. case api_fuzzer::Action::kCancelAllCallsIfShutdown: {
  863. if (g_server != nullptr && server_shutdown) {
  864. grpc_server_cancel_all_calls(g_server);
  865. } else {
  866. no_more_actions();
  867. }
  868. break;
  869. }
  870. // destroy server
  871. case api_fuzzer::Action::kDestroyServerIfReady: {
  872. if (g_server != nullptr && server_shutdown &&
  873. pending_server_shutdowns == 0) {
  874. grpc_server_destroy(g_server);
  875. g_server = nullptr;
  876. } else {
  877. no_more_actions();
  878. }
  879. break;
  880. }
  881. // check connectivity
  882. case api_fuzzer::Action::kCheckConnectivity: {
  883. if (g_channel != nullptr) {
  884. grpc_channel_check_connectivity_state(g_channel,
  885. action.check_connectivity());
  886. } else {
  887. no_more_actions();
  888. }
  889. break;
  890. }
  891. // watch connectivity
  892. case api_fuzzer::Action::kWatchConnectivity: {
  893. if (g_channel != nullptr) {
  894. grpc_connectivity_state st =
  895. grpc_channel_check_connectivity_state(g_channel, 0);
  896. if (st != GRPC_CHANNEL_SHUTDOWN) {
  897. gpr_timespec deadline =
  898. gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
  899. gpr_time_from_micros(action.watch_connectivity(),
  900. GPR_TIMESPAN));
  901. grpc_channel_watch_connectivity_state(
  902. g_channel, st, deadline, cq,
  903. ValidateConnectivityWatch(deadline, &pending_channel_watches));
  904. pending_channel_watches++;
  905. }
  906. } else {
  907. no_more_actions();
  908. }
  909. break;
  910. }
  911. // create a call
  912. case api_fuzzer::Action::kCreateCall: {
  913. bool ok = true;
  914. if (g_channel == nullptr) ok = false;
  915. // If the active call is a server call, then use it as the parent call
  916. // to exercise the propagation logic.
  917. Call* parent_call = ActiveCall();
  918. if (parent_call != nullptr && parent_call->type() != CallType::SERVER) {
  919. parent_call = nullptr;
  920. }
  921. g_calls.emplace_back(new Call(CallType::CLIENT));
  922. grpc_slice method =
  923. g_calls.back()->ReadSlice(action.create_call().method());
  924. if (GRPC_SLICE_LENGTH(method) == 0) {
  925. ok = false;
  926. }
  927. grpc_slice host =
  928. g_calls.back()->ReadSlice(action.create_call().host());
  929. gpr_timespec deadline = gpr_time_add(
  930. gpr_now(GPR_CLOCK_REALTIME),
  931. gpr_time_from_micros(action.create_call().timeout(), GPR_TIMESPAN));
  932. if (ok) {
  933. g_calls.back()->SetCall(grpc_channel_create_call(
  934. g_channel, parent_call == nullptr ? nullptr : parent_call->call(),
  935. action.create_call().propagation_mask(), cq, method, &host,
  936. deadline, nullptr));
  937. } else {
  938. g_calls.pop_back();
  939. no_more_actions();
  940. }
  941. break;
  942. }
  943. // switch the 'current' call
  944. case api_fuzzer::Action::kChangeActiveCall: {
  945. g_active_call++;
  946. ActiveCall();
  947. break;
  948. }
  949. // queue some ops on a call
  950. case api_fuzzer::Action::kQueueBatch: {
  951. auto* active_call = ActiveCall();
  952. if (active_call == nullptr ||
  953. active_call->type() == CallType::PENDING_SERVER ||
  954. active_call->call() == nullptr) {
  955. no_more_actions();
  956. break;
  957. }
  958. const auto& batch = action.queue_batch().operations();
  959. if (batch.size() > 6) {
  960. no_more_actions();
  961. break;
  962. }
  963. std::vector<grpc_op> ops;
  964. bool ok = true;
  965. uint8_t has_ops = 0;
  966. std::vector<std::function<void()>> unwinders;
  967. for (const auto& batch_op : batch) {
  968. auto op = active_call->ReadOp(batch_op, &ok, &has_ops, &unwinders);
  969. if (!op.has_value()) continue;
  970. ops.push_back(*op);
  971. }
  972. if (g_channel == nullptr) ok = false;
  973. if (ok) {
  974. auto* v = active_call->FinishedBatchValidator(has_ops);
  975. grpc_call_error error = grpc_call_start_batch(
  976. active_call->call(), ops.data(), ops.size(), v, nullptr);
  977. if (error != GRPC_CALL_OK) {
  978. v->Run(false);
  979. }
  980. } else {
  981. no_more_actions();
  982. for (auto& unwind : unwinders) {
  983. unwind();
  984. }
  985. }
  986. break;
  987. }
  988. // cancel current call
  989. case api_fuzzer::Action::kCancelCall: {
  990. auto* active_call = ActiveCall();
  991. if (active_call != nullptr && active_call->call() != nullptr) {
  992. grpc_call_cancel(active_call->call(), nullptr);
  993. } else {
  994. no_more_actions();
  995. }
  996. break;
  997. }
  998. // get a calls peer
  999. case api_fuzzer::Action::kGetPeer: {
  1000. auto* active_call = ActiveCall();
  1001. if (active_call != nullptr && active_call->call() != nullptr) {
  1002. free_non_null(grpc_call_get_peer(active_call->call()));
  1003. } else {
  1004. no_more_actions();
  1005. }
  1006. break;
  1007. }
  1008. // get a channels target
  1009. case api_fuzzer::Action::kGetTarget: {
  1010. if (g_channel != nullptr) {
  1011. free_non_null(grpc_channel_get_target(g_channel));
  1012. } else {
  1013. no_more_actions();
  1014. }
  1015. break;
  1016. }
  1017. // send a ping on a channel
  1018. case api_fuzzer::Action::kPing: {
  1019. if (g_channel != nullptr) {
  1020. pending_pings++;
  1021. grpc_channel_ping(g_channel, cq, Decrement(&pending_pings), nullptr);
  1022. } else {
  1023. no_more_actions();
  1024. }
  1025. break;
  1026. }
  1027. // enable a tracer
  1028. case api_fuzzer::Action::kEnableTracer: {
  1029. grpc_tracer_set_enabled(action.enable_tracer().c_str(), 1);
  1030. break;
  1031. }
  1032. // disable a tracer
  1033. case api_fuzzer::Action::kDisableTracer: {
  1034. grpc_tracer_set_enabled(action.disable_tracer().c_str(), 0);
  1035. break;
  1036. }
  1037. // request a server call
  1038. case api_fuzzer::Action::kRequestCall: {
  1039. if (g_server == nullptr) {
  1040. no_more_actions();
  1041. break;
  1042. }
  1043. g_calls.emplace_back(new Call(CallType::PENDING_SERVER));
  1044. g_calls.back()->RequestCall(g_server, cq);
  1045. break;
  1046. }
  1047. // destroy a call
  1048. case api_fuzzer::Action::kDestroyCall: {
  1049. auto* active_call = ActiveCall();
  1050. if (active_call != nullptr &&
  1051. active_call->type() != CallType::PENDING_SERVER &&
  1052. active_call->call() != nullptr) {
  1053. g_calls[g_active_call]->Shutdown();
  1054. } else {
  1055. no_more_actions();
  1056. }
  1057. break;
  1058. }
  1059. // resize the buffer pool
  1060. case api_fuzzer::Action::kResizeResourceQuota: {
  1061. grpc_resource_quota_resize(g_resource_quota,
  1062. action.resize_resource_quota());
  1063. break;
  1064. }
  1065. }
  1066. }
  1067. GPR_ASSERT(g_channel == nullptr);
  1068. GPR_ASSERT(g_server == nullptr);
  1069. GPR_ASSERT(ActiveCall() == nullptr);
  1070. GPR_ASSERT(g_calls.empty());
  1071. grpc_completion_queue_shutdown(cq);
  1072. GPR_ASSERT(poll_cq());
  1073. grpc_completion_queue_destroy(cq);
  1074. grpc_resource_quota_unref(g_resource_quota);
  1075. grpc_shutdown_blocking();
  1076. }