server_load_reporting_end2end_test.cc 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc/support/port_platform.h>
  19. #include <thread>
  20. #include <gmock/gmock.h>
  21. #include <gtest/gtest.h>
  22. #include <grpc++/grpc++.h>
  23. #include <grpc/grpc.h>
  24. #include <grpc/support/log.h>
  25. #include <grpc/support/string_util.h>
  26. #include <grpcpp/ext/server_load_reporting.h>
  27. #include <grpcpp/server_builder.h>
  28. #include "src/proto/grpc/lb/v1/load_reporter.grpc.pb.h"
  29. #include "src/proto/grpc/testing/echo.grpc.pb.h"
  30. #include "test/core/util/port.h"
  31. #include "test/core/util/test_config.h"
  32. namespace grpc {
  33. namespace testing {
  34. namespace {
  35. constexpr double kMetricValue = 3.1415;
  36. constexpr char kMetricName[] = "METRIC_PI";
  37. // Different messages result in different response statuses. For simplicity in
  38. // computing request bytes, the message sizes should be the same.
  39. const char kOkMessage[] = "hello";
  40. const char kServerErrorMessage[] = "sverr";
  41. const char kClientErrorMessage[] = "clerr";
  42. class EchoTestServiceImpl : public EchoTestService::Service {
  43. public:
  44. ~EchoTestServiceImpl() override {}
  45. Status Echo(ServerContext* context, const EchoRequest* request,
  46. EchoResponse* response) override {
  47. if (request->message() == kServerErrorMessage) {
  48. return Status(StatusCode::UNKNOWN, "Server error requested");
  49. }
  50. if (request->message() == kClientErrorMessage) {
  51. return Status(StatusCode::FAILED_PRECONDITION, "Client error requested");
  52. }
  53. response->set_message(request->message());
  54. grpc::load_reporter::experimental::AddLoadReportingCost(
  55. context, kMetricName, kMetricValue);
  56. return Status::OK;
  57. }
  58. };
  59. class ServerLoadReportingEnd2endTest : public ::testing::Test {
  60. protected:
  61. void SetUp() override {
  62. server_address_ =
  63. "localhost:" + std::to_string(grpc_pick_unused_port_or_die());
  64. server_ =
  65. ServerBuilder()
  66. .AddListeningPort(server_address_, InsecureServerCredentials())
  67. .RegisterService(&echo_service_)
  68. .SetOption(std::unique_ptr<grpc::ServerBuilderOption>(
  69. new grpc::load_reporter::experimental::
  70. LoadReportingServiceServerBuilderOption()))
  71. .BuildAndStart();
  72. server_thread_ =
  73. std::thread(&ServerLoadReportingEnd2endTest::RunServerLoop, this);
  74. }
  75. void RunServerLoop() { server_->Wait(); }
  76. void TearDown() override {
  77. server_->Shutdown();
  78. server_thread_.join();
  79. }
  80. void ClientMakeEchoCalls(const std::string& lb_id, const std::string& lb_tag,
  81. const std::string& message, size_t num_requests) {
  82. auto stub = EchoTestService::NewStub(
  83. grpc::CreateChannel(server_address_, InsecureChannelCredentials()));
  84. std::string lb_token = lb_id + lb_tag;
  85. for (size_t i = 0; i < num_requests; ++i) {
  86. ClientContext ctx;
  87. if (!lb_token.empty()) ctx.AddMetadata(GRPC_LB_TOKEN_MD_KEY, lb_token);
  88. EchoRequest request;
  89. EchoResponse response;
  90. request.set_message(message);
  91. Status status = stub->Echo(&ctx, request, &response);
  92. if (message == kOkMessage) {
  93. ASSERT_EQ(status.error_code(), StatusCode::OK);
  94. ASSERT_EQ(request.message(), response.message());
  95. } else if (message == kServerErrorMessage) {
  96. ASSERT_EQ(status.error_code(), StatusCode::UNKNOWN);
  97. } else if (message == kClientErrorMessage) {
  98. ASSERT_EQ(status.error_code(), StatusCode::FAILED_PRECONDITION);
  99. }
  100. }
  101. }
  102. std::string server_address_;
  103. std::unique_ptr<Server> server_;
  104. std::thread server_thread_;
  105. EchoTestServiceImpl echo_service_;
  106. };
  107. TEST_F(ServerLoadReportingEnd2endTest, NoCall) {}
  108. TEST_F(ServerLoadReportingEnd2endTest, BasicReport) {
  109. auto channel =
  110. grpc::CreateChannel(server_address_, InsecureChannelCredentials());
  111. auto stub = grpc::lb::v1::LoadReporter::NewStub(channel);
  112. ClientContext ctx;
  113. auto stream = stub->ReportLoad(&ctx);
  114. grpc::lb::v1::LoadReportRequest request;
  115. request.mutable_initial_request()->set_load_balanced_hostname(
  116. server_address_);
  117. request.mutable_initial_request()->set_load_key("LOAD_KEY");
  118. request.mutable_initial_request()
  119. ->mutable_load_report_interval()
  120. ->set_seconds(5);
  121. stream->Write(request);
  122. gpr_log(GPR_INFO, "Initial request sent.");
  123. grpc::lb::v1::LoadReportResponse response;
  124. stream->Read(&response);
  125. const std::string& lb_id = response.initial_response().load_balancer_id();
  126. gpr_log(GPR_INFO, "Initial response received (lb_id: %s).", lb_id.c_str());
  127. ClientMakeEchoCalls(lb_id, "LB_TAG", kOkMessage, 1);
  128. while (true) {
  129. stream->Read(&response);
  130. if (!response.load().empty()) {
  131. ASSERT_EQ(response.load().size(), 3);
  132. for (const auto& load : response.load()) {
  133. if (load.in_progress_report_case()) {
  134. // The special load record that reports the number of in-progress
  135. // calls.
  136. ASSERT_EQ(load.num_calls_in_progress(), 1);
  137. } else if (load.orphaned_load_case()) {
  138. // The call from the balancer doesn't have any valid LB token.
  139. ASSERT_EQ(load.orphaned_load_case(), load.kLoadKeyUnknown);
  140. ASSERT_EQ(load.num_calls_started(), 1);
  141. ASSERT_EQ(load.num_calls_finished_without_error(), 0);
  142. ASSERT_EQ(load.num_calls_finished_with_error(), 0);
  143. } else {
  144. // This corresponds to the calls from the client.
  145. ASSERT_EQ(load.num_calls_started(), 1);
  146. ASSERT_EQ(load.num_calls_finished_without_error(), 1);
  147. ASSERT_EQ(load.num_calls_finished_with_error(), 0);
  148. ASSERT_GE(load.total_bytes_received(), sizeof(kOkMessage));
  149. ASSERT_GE(load.total_bytes_sent(), sizeof(kOkMessage));
  150. ASSERT_EQ(load.metric_data().size(), 1);
  151. ASSERT_EQ(load.metric_data().Get(0).metric_name(), kMetricName);
  152. ASSERT_EQ(load.metric_data().Get(0).num_calls_finished_with_metric(),
  153. 1);
  154. ASSERT_EQ(load.metric_data().Get(0).total_metric_value(),
  155. kMetricValue);
  156. }
  157. }
  158. break;
  159. }
  160. }
  161. stream->WritesDone();
  162. ASSERT_EQ(stream->Finish().error_code(), StatusCode::CANCELLED);
  163. }
  164. // TODO(juanlishen): Add more tests.
  165. } // namespace
  166. } // namespace testing
  167. } // namespace grpc
  168. int main(int argc, char** argv) {
  169. grpc::testing::TestEnvironment env(argc, argv);
  170. ::testing::InitGoogleTest(&argc, argv);
  171. return RUN_ALL_TESTS();
  172. }