123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521 |
- /*
- *
- * Copyright 2015 gRPC authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
- #include "src/core/lib/http/httpcli.h"
- #include <string.h>
- #include <gmock/gmock.h>
- #include <gtest/gtest.h>
- #include <grpc/grpc.h>
- #include <grpc/support/alloc.h>
- #include <grpc/support/log.h>
- #include <grpc/support/string_util.h>
- #include <grpc/support/sync.h>
- #include "src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h"
- #include "src/core/lib/gprpp/time.h"
- #include "src/core/lib/iomgr/iomgr.h"
- #include "test/core/http/httpcli_test_util.h"
- #include "test/core/util/fake_udp_and_tcp_server.h"
- #include "test/core/util/port.h"
- #include "test/core/util/subprocess.h"
- #include "test/core/util/test_config.h"
- namespace {
- grpc_core::Timestamp NSecondsTime(int seconds) {
- return grpc_core::Timestamp::FromTimespecRoundUp(
- grpc_timeout_seconds_to_deadline(seconds));
- }
- absl::Time AbslDeadlineSeconds(int s) {
- return grpc_core::ToAbslTime(grpc_timeout_seconds_to_deadline(s));
- }
- int g_argc;
- char** g_argv;
- int g_server_port;
- gpr_subprocess* g_server;
- class HttpRequestTest : public ::testing::Test {
- public:
- HttpRequestTest() {
- grpc_init();
- grpc_core::ExecCtx exec_ctx;
- grpc_pollset* pollset =
- static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
- grpc_pollset_init(pollset, &mu_);
- pops_ = grpc_polling_entity_create_from_pollset(pollset);
- }
- ~HttpRequestTest() override {
- {
- grpc_core::ExecCtx exec_ctx;
- grpc_pollset_shutdown(
- grpc_polling_entity_pollset(&pops_),
- GRPC_CLOSURE_CREATE(DestroyPops, &pops_, grpc_schedule_on_exec_ctx));
- }
- grpc_shutdown();
- }
- void RunAndKick(const std::function<void()>& f) {
- grpc_core::MutexLockForGprMu lock(mu_);
- f();
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_kick",
- grpc_pollset_kick(grpc_polling_entity_pollset(&pops_), nullptr)));
- }
- void PollUntil(const std::function<bool()>& predicate, absl::Time deadline) {
- gpr_mu_lock(mu_);
- while (!predicate()) {
- GPR_ASSERT(absl::Now() < deadline);
- grpc_pollset_worker* worker = nullptr;
- GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(grpc_polling_entity_pollset(&pops_),
- &worker, NSecondsTime(1))));
- gpr_mu_unlock(mu_);
- gpr_mu_lock(mu_);
- }
- gpr_mu_unlock(mu_);
- }
- grpc_polling_entity* pops() { return &pops_; }
- protected:
- static void SetUpTestSuite() {
- auto test_server = grpc_core::testing::StartHttpRequestTestServer(
- g_argc, g_argv, false /* use_ssl */);
- g_server = test_server.server;
- g_server_port = test_server.port;
- }
- static void TearDownTestSuite() { gpr_subprocess_destroy(g_server); }
- private:
- static void DestroyPops(void* p, grpc_error_handle /*error*/) {
- grpc_polling_entity* pops = static_cast<grpc_polling_entity*>(p);
- grpc_pollset_destroy(grpc_polling_entity_pollset(pops));
- gpr_free(grpc_polling_entity_pollset(pops));
- }
- gpr_mu* mu_;
- grpc_polling_entity pops_;
- };
- struct RequestState {
- explicit RequestState(HttpRequestTest* test) : test(test) {}
- ~RequestState() {
- grpc_core::ExecCtx exec_ctx;
- grpc_http_response_destroy(&response);
- }
- HttpRequestTest* test;
- bool done = false;
- grpc_http_response response = {};
- grpc_pollset_set* pollset_set_to_destroy_eagerly = nullptr;
- };
- void OnFinish(void* arg, grpc_error_handle error) {
- RequestState* request_state = static_cast<RequestState*>(arg);
- if (request_state->pollset_set_to_destroy_eagerly != nullptr) {
- // Destroy the request's polling entity param. The goal is to try to catch a
- // bug where we might still be referencing the polling entity by
- // a pending TCP connect.
- grpc_pollset_set_destroy(request_state->pollset_set_to_destroy_eagerly);
- }
- const char* expect =
- "<html><head><title>Hello world!</title></head>"
- "<body><p>This is a test</p></body></html>";
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- grpc_http_response response = request_state->response;
- gpr_log(GPR_INFO, "response status=%d error=%s", response.status,
- grpc_error_std_string(error).c_str());
- GPR_ASSERT(response.status == 200);
- GPR_ASSERT(response.body_length == strlen(expect));
- GPR_ASSERT(0 == memcmp(expect, response.body, response.body_length));
- request_state->test->RunAndKick(
- [request_state]() { request_state->done = true; });
- }
- void OnFinishExpectFailure(void* arg, grpc_error_handle error) {
- RequestState* request_state = static_cast<RequestState*>(arg);
- if (request_state->pollset_set_to_destroy_eagerly != nullptr) {
- // Destroy the request's polling entity param. The goal is to try to catch a
- // bug where we might still be referencing the polling entity by
- // a pending TCP connect.
- grpc_pollset_set_destroy(request_state->pollset_set_to_destroy_eagerly);
- }
- grpc_http_response response = request_state->response;
- gpr_log(GPR_INFO, "response status=%d error=%s", response.status,
- grpc_error_std_string(error).c_str());
- GPR_ASSERT(error != GRPC_ERROR_NONE);
- request_state->test->RunAndKick(
- [request_state]() { request_state->done = true; });
- }
- TEST_F(HttpRequestTest, Get) {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- std::string host = absl::StrFormat("localhost:%d", g_server_port);
- gpr_log(GPR_INFO, "requesting from %s", host.c_str());
- memset(&req, 0, sizeof(req));
- auto uri = grpc_core::URI::Create("http", host, "/get", {} /* query params */,
- "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(15),
- GRPC_CLOSURE_CREATE(OnFinish, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- http_request->Start();
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- }
- TEST_F(HttpRequestTest, Post) {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- std::string host = absl::StrFormat("localhost:%d", g_server_port);
- gpr_log(GPR_INFO, "posting to %s", host.c_str());
- memset(&req, 0, sizeof(req));
- req.body = const_cast<char*>("hello");
- req.body_length = 5;
- auto uri = grpc_core::URI::Create("http", host, "/post",
- {} /* query params */, "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Post(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(15),
- GRPC_CLOSURE_CREATE(OnFinish, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- http_request->Start();
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- }
- int g_fake_non_responsive_dns_server_port;
- void InjectNonResponsiveDNSServer(ares_channel channel) {
- gpr_log(GPR_DEBUG,
- "Injecting broken nameserver list. Bad server address:|[::1]:%d|.",
- g_fake_non_responsive_dns_server_port);
- // Configure a non-responsive DNS server at the front of c-ares's nameserver
- // list.
- struct ares_addr_port_node dns_server_addrs[1];
- dns_server_addrs[0].family = AF_INET6;
- (reinterpret_cast<char*>(&dns_server_addrs[0].addr.addr6))[15] = 0x1;
- dns_server_addrs[0].tcp_port = g_fake_non_responsive_dns_server_port;
- dns_server_addrs[0].udp_port = g_fake_non_responsive_dns_server_port;
- dns_server_addrs[0].next = nullptr;
- GPR_ASSERT(ares_set_servers_ports(channel, dns_server_addrs) == ARES_SUCCESS);
- }
- TEST_F(HttpRequestTest, CancelGetDuringDNSResolution) {
- // Inject an unresponsive DNS server into the resolver's DNS server config
- grpc_core::testing::FakeUdpAndTcpServer fake_dns_server(
- grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
- kWaitForClientToSendFirstBytes,
- grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
- g_fake_non_responsive_dns_server_port = fake_dns_server.port();
- void (*prev_test_only_inject_config)(ares_channel channel) =
- grpc_ares_test_only_inject_config;
- grpc_ares_test_only_inject_config = InjectNonResponsiveDNSServer;
- // Run the same test on several threads in parallel to try to trigger races
- // etc.
- int kNumThreads = 100;
- std::vector<std::thread> threads;
- threads.reserve(kNumThreads);
- for (int i = 0; i < kNumThreads; i++) {
- threads.push_back(std::thread([this]() {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- memset(&req, 0, sizeof(grpc_http_request));
- auto uri = grpc_core::URI::Create(
- "http", "dont-care-since-wont-be-resolved.test.com:443", "/get",
- {} /* query params */, "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(120),
- GRPC_CLOSURE_CREATE(OnFinishExpectFailure, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- http_request->Start();
- std::thread cancel_thread([&http_request]() {
- gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
- grpc_core::ExecCtx exec_ctx;
- http_request.reset();
- });
- // Poll with a deadline explicitly lower than the request timeout, so
- // that we know that the request timeout isn't just kicking in.
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- cancel_thread.join();
- }));
- }
- for (auto& t : threads) {
- t.join();
- }
- grpc_ares_test_only_inject_config = prev_test_only_inject_config;
- }
- TEST_F(HttpRequestTest, CancelGetWhileReadingResponse) {
- // Start up a fake HTTP server which just accepts connections
- // and then hangs, i.e. does not send back any bytes to the client.
- // The goal here is to get the client to connect to this fake server
- // and send a request, and then sit waiting for a response. Then, a
- // separate thread will cancel the HTTP request, and that should let it
- // complete.
- grpc_core::testing::FakeUdpAndTcpServer fake_http_server(
- grpc_core::testing::FakeUdpAndTcpServer::AcceptMode::
- kWaitForClientToSendFirstBytes,
- grpc_core::testing::FakeUdpAndTcpServer::CloseSocketUponCloseFromPeer);
- // Run the same test on several threads in parallel to try to trigger races
- // etc.
- int kNumThreads = 100;
- std::vector<std::thread> threads;
- threads.reserve(kNumThreads);
- for (int i = 0; i < kNumThreads; i++) {
- grpc_core::testing::FakeUdpAndTcpServer* fake_http_server_ptr =
- &fake_http_server;
- threads.push_back(std::thread([this, fake_http_server_ptr]() {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- memset(&req, 0, sizeof(req));
- auto uri = grpc_core::URI::Create("http", fake_http_server_ptr->address(),
- "/get", {} /* query params */,
- "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(120),
- GRPC_CLOSURE_CREATE(OnFinishExpectFailure, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- http_request->Start();
- exec_ctx.Flush();
- std::thread cancel_thread([&http_request]() {
- gpr_sleep_until(grpc_timeout_seconds_to_deadline(1));
- grpc_core::ExecCtx exec_ctx;
- http_request.reset();
- });
- // Poll with a deadline explicitly lower than the request timeout, so
- // that we know that the request timeout isn't just kicking in.
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- cancel_thread.join();
- }));
- }
- for (auto& t : threads) {
- t.join();
- }
- }
- // The main point of this test is just to exercise the machinery around
- // cancellation during TCP connection establishment, to make sure there are no
- // crashes/races etc. This test doesn't actually verify that cancellation during
- // TCP setup is happening, though. For that, we would need to induce packet loss
- // in the test.
- TEST_F(HttpRequestTest, CancelGetRacesWithConnectionFailure) {
- // Grab an unoccupied port but don't listen on it. The goal
- // here is just to have a server address that will reject
- // TCP connection setups.
- // Note that because the server is rejecting TCP connections, we
- // don't really need to cancel the HTTP requests in this test case
- // in order for them proceeed i.e. in order for them to pass. The test
- // is still beneficial though because it can exercise the same code paths
- // that would get taken if the HTTP request was cancelled while the TCP
- // connect attempt was actually hanging.
- int fake_server_port = grpc_pick_unused_port_or_die();
- std::string fake_server_address =
- absl::StrCat("[::1]:", std::to_string(fake_server_port));
- // Run the same test on several threads in parallel to try to trigger races
- // etc.
- int kNumThreads = 100;
- std::vector<std::thread> threads;
- threads.reserve(kNumThreads);
- for (int i = 0; i < kNumThreads; i++) {
- threads.push_back(std::thread([this, fake_server_address]() {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- memset(&req, 0, sizeof(req));
- auto uri =
- grpc_core::URI::Create("http", fake_server_address, "/get",
- {} /* query params */, "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(120),
- GRPC_CLOSURE_CREATE(OnFinishExpectFailure, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- // Start the HTTP request. We will ~immediately begin a TCP connect
- // attempt because there's no name to resolve.
- http_request->Start();
- exec_ctx.Flush();
- // Spawn a separate thread which ~immediately cancels the HTTP request.
- // Note that even though the server is rejecting TCP connections, it can
- // still take some time for the client to receive that rejection. So
- // cancelling the request now can trigger the code paths that would get
- // taken if the TCP connection was truly hanging e.g. from packet loss.
- // The goal is just to make sure there are no crashes, races, etc.
- std::thread cancel_thread([&http_request]() {
- grpc_core::ExecCtx exec_ctx;
- http_request.reset();
- });
- // Poll with a deadline explicitly lower than the request timeout, so
- // that we know that the request timeout isn't just kicking in.
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- cancel_thread.join();
- }));
- }
- for (auto& t : threads) {
- t.join();
- }
- }
- // The pollent parameter passed to HttpRequest::Get or Post is owned by
- // the caller and must not be referenced by the HttpRequest after the
- // requests's on_done callback is invoked. This test verifies that this
- // isn't happening by destroying the request's pollset set within the
- // on_done callback.
- TEST_F(HttpRequestTest, CallerPollentsAreNotReferencedAfterCallbackIsRan) {
- // Grab an unoccupied port but don't listen on it. The goal
- // here is just to have a server address that will reject
- // TCP connection setups.
- // Note that we could have used a different server for this test case, e.g.
- // one which accepts TCP connections. All we need here is something for the
- // client to connect to, since it will be cancelled roughly during the
- // connection attempt anyways.
- int fake_server_port = grpc_pick_unused_port_or_die();
- std::string fake_server_address =
- absl::StrCat("[::1]:", std::to_string(fake_server_port));
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- memset(&req, 0, sizeof(req));
- req.path = const_cast<char*>("/get");
- request_state.pollset_set_to_destroy_eagerly = grpc_pollset_set_create();
- grpc_polling_entity_add_to_pollset_set(
- pops(), request_state.pollset_set_to_destroy_eagerly);
- grpc_polling_entity wrapped_pollset_set_to_destroy_eagerly =
- grpc_polling_entity_create_from_pollset_set(
- request_state.pollset_set_to_destroy_eagerly);
- auto uri = grpc_core::URI::Create("http", fake_server_address, "/get",
- {} /* query params */, "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */,
- &wrapped_pollset_set_to_destroy_eagerly, &req, NSecondsTime(15),
- GRPC_CLOSURE_CREATE(OnFinishExpectFailure, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- // Start the HTTP request. We'll start the TCP connect attempt right away.
- http_request->Start();
- exec_ctx.Flush();
- http_request.reset(); // cancel the request
- // Since the request was cancelled, the on_done callback should be flushed
- // out on the ExecCtx flush below. When the on_done callback is ran, it will
- // eagerly destroy 'request_state.pollset_set_to_destroy_eagerly'. Thus, we
- // can't poll on that pollset here.
- exec_ctx.Flush();
- }
- void CancelRequest(grpc_core::HttpRequest* req) {
- gpr_log(
- GPR_INFO,
- "test only HttpRequest::OnHandshakeDone intercept orphaning request: %p",
- req);
- req->Orphan();
- }
- // This exercises the code paths that happen when we cancel an HTTP request
- // before the security handshake callback runs, but after that callback has
- // already been scheduled with a success result. This case is interesting
- // because the current security handshake API transfers ownership of output
- // arguments to the caller only if the handshake is successful, rendering
- // this code path as something that only occurs with just the right timing.
- TEST_F(HttpRequestTest,
- CancelDuringSecurityHandshakeButHandshakeStillSucceeds) {
- RequestState request_state(this);
- grpc_http_request req;
- grpc_core::ExecCtx exec_ctx;
- std::string host = absl::StrFormat("localhost:%d", g_server_port);
- gpr_log(GPR_INFO, "requesting from %s", host.c_str());
- memset(&req, 0, sizeof(req));
- auto uri = grpc_core::URI::Create("http", host, "/get", {} /* query params */,
- "" /* fragment */);
- GPR_ASSERT(uri.ok());
- grpc_core::OrphanablePtr<grpc_core::HttpRequest> http_request =
- grpc_core::HttpRequest::Get(
- std::move(*uri), nullptr /* channel args */, pops(), &req,
- NSecondsTime(15),
- GRPC_CLOSURE_CREATE(OnFinishExpectFailure, &request_state,
- grpc_schedule_on_exec_ctx),
- &request_state.response,
- grpc_core::RefCountedPtr<grpc_channel_credentials>(
- grpc_insecure_credentials_create()));
- grpc_core::HttpRequest::TestOnlySetOnHandshakeDoneIntercept(CancelRequest);
- http_request->Start();
- (void)http_request.release(); // request will be orphaned by CancelRequest
- exec_ctx.Flush();
- PollUntil([&request_state]() { return request_state.done; },
- AbslDeadlineSeconds(60));
- grpc_core::HttpRequest::TestOnlySetOnHandshakeDoneIntercept(nullptr);
- }
- } // namespace
- int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- grpc::testing::TestEnvironment env(argc, argv);
- // launch the test server later, so that --gtest_list_tests works
- g_argc = argc;
- g_argv = argv;
- // run tests
- return RUN_ALL_TESTS();
- }
|