interop_client.cc 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302
  1. /*
  2. *
  3. * Copyright 2015-2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include "test/cpp/interop/interop_client.h"
  19. #include <cinttypes>
  20. #include <fstream>
  21. #include <memory>
  22. #include <string>
  23. #include <type_traits>
  24. #include <utility>
  25. #include "absl/strings/match.h"
  26. #include "absl/strings/str_format.h"
  27. #include <grpc/grpc.h>
  28. #include <grpc/support/alloc.h>
  29. #include <grpc/support/log.h>
  30. #include <grpc/support/string_util.h>
  31. #include <grpc/support/time.h>
  32. #include <grpcpp/channel.h>
  33. #include <grpcpp/client_context.h>
  34. #include <grpcpp/security/credentials.h>
  35. #include "src/proto/grpc/testing/empty.pb.h"
  36. #include "src/proto/grpc/testing/messages.pb.h"
  37. #include "src/proto/grpc/testing/test.grpc.pb.h"
  38. #include "test/core/util/histogram.h"
  39. #include "test/cpp/interop/client_helper.h"
  40. namespace grpc {
  41. namespace testing {
  42. namespace {
  43. // The same value is defined by the Java client.
  44. const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
  45. const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
  46. const int kNumResponseMessages = 2000;
  47. const int kResponseMessageSize = 1030;
  48. const int kReceiveDelayMilliSeconds = 20;
  49. const int kLargeRequestSize = 271828;
  50. const int kLargeResponseSize = 314159;
  51. void NoopChecks(const InteropClientContextInspector& /*inspector*/,
  52. const SimpleRequest* /*request*/,
  53. const SimpleResponse* /*response*/) {}
  54. void UnaryCompressionChecks(const InteropClientContextInspector& inspector,
  55. const SimpleRequest* request,
  56. const SimpleResponse* /*response*/) {
  57. const grpc_compression_algorithm received_compression =
  58. inspector.GetCallCompressionAlgorithm();
  59. if (request->response_compressed().value()) {
  60. if (received_compression == GRPC_COMPRESS_NONE) {
  61. // Requested some compression, got NONE. This is an error.
  62. gpr_log(GPR_ERROR,
  63. "Failure: Requested compression but got uncompressed response "
  64. "from server.");
  65. abort();
  66. }
  67. GPR_ASSERT(inspector.WasCompressed());
  68. } else {
  69. // Didn't request compression -> make sure the response is uncompressed
  70. GPR_ASSERT(!(inspector.WasCompressed()));
  71. }
  72. }
  73. } // namespace
  74. InteropClient::ServiceStub::ServiceStub(
  75. ChannelCreationFunc channel_creation_func, bool new_stub_every_call)
  76. : channel_creation_func_(std::move(channel_creation_func)),
  77. channel_(channel_creation_func_()),
  78. new_stub_every_call_(new_stub_every_call) {
  79. // If new_stub_every_call is false, then this is our chance to initialize
  80. // stub_. (see Get())
  81. if (!new_stub_every_call) {
  82. stub_ = TestService::NewStub(channel_);
  83. }
  84. }
  85. TestService::Stub* InteropClient::ServiceStub::Get() {
  86. if (new_stub_every_call_) {
  87. stub_ = TestService::NewStub(channel_);
  88. }
  89. return stub_.get();
  90. }
  91. UnimplementedService::Stub*
  92. InteropClient::ServiceStub::GetUnimplementedServiceStub() {
  93. if (unimplemented_service_stub_ == nullptr) {
  94. unimplemented_service_stub_ = UnimplementedService::NewStub(channel_);
  95. }
  96. return unimplemented_service_stub_.get();
  97. }
  98. void InteropClient::ServiceStub::ResetChannel() {
  99. channel_ = channel_creation_func_();
  100. if (!new_stub_every_call_) {
  101. stub_ = TestService::NewStub(channel_);
  102. }
  103. }
  104. InteropClient::InteropClient(ChannelCreationFunc channel_creation_func,
  105. bool new_stub_every_test_case,
  106. bool do_not_abort_on_transient_failures)
  107. : serviceStub_(std::move(channel_creation_func), new_stub_every_test_case),
  108. do_not_abort_on_transient_failures_(do_not_abort_on_transient_failures) {}
  109. bool InteropClient::AssertStatusOk(const Status& s,
  110. const std::string& optional_debug_string) {
  111. if (s.ok()) {
  112. return true;
  113. }
  114. // Note: At this point, s.error_code is definitely not StatusCode::OK (we
  115. // already checked for s.ok() above). So, the following will call abort()
  116. // (unless s.error_code() corresponds to a transient failure and
  117. // 'do_not_abort_on_transient_failures' is true)
  118. return AssertStatusCode(s, StatusCode::OK, optional_debug_string);
  119. }
  120. bool InteropClient::AssertStatusCode(const Status& s, StatusCode expected_code,
  121. const std::string& optional_debug_string) {
  122. if (s.error_code() == expected_code) {
  123. return true;
  124. }
  125. gpr_log(GPR_ERROR,
  126. "Error status code: %d (expected: %d), message: %s,"
  127. " debug string: %s",
  128. s.error_code(), expected_code, s.error_message().c_str(),
  129. optional_debug_string.c_str());
  130. // In case of transient transient/retryable failures (like a broken
  131. // connection) we may or may not abort (see TransientFailureOrAbort())
  132. if (s.error_code() == grpc::StatusCode::UNAVAILABLE) {
  133. return TransientFailureOrAbort();
  134. }
  135. abort();
  136. }
  137. bool InteropClient::DoEmpty() {
  138. gpr_log(GPR_DEBUG, "Sending an empty rpc...");
  139. Empty request;
  140. Empty response;
  141. ClientContext context;
  142. Status s = serviceStub_.Get()->EmptyCall(&context, request, &response);
  143. if (!AssertStatusOk(s, context.debug_error_string())) {
  144. return false;
  145. }
  146. gpr_log(GPR_DEBUG, "Empty rpc done.");
  147. return true;
  148. }
  149. bool InteropClient::PerformLargeUnary(SimpleRequest* request,
  150. SimpleResponse* response) {
  151. return PerformLargeUnary(request, response, NoopChecks);
  152. }
  153. bool InteropClient::PerformLargeUnary(SimpleRequest* request,
  154. SimpleResponse* response,
  155. const CheckerFn& custom_checks_fn) {
  156. ClientContext context;
  157. InteropClientContextInspector inspector(context);
  158. request->set_response_size(kLargeResponseSize);
  159. std::string payload(kLargeRequestSize, '\0');
  160. request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
  161. if (request->has_expect_compressed()) {
  162. if (request->expect_compressed().value()) {
  163. context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
  164. } else {
  165. context.set_compression_algorithm(GRPC_COMPRESS_NONE);
  166. }
  167. }
  168. Status s = serviceStub_.Get()->UnaryCall(&context, *request, response);
  169. if (!AssertStatusOk(s, context.debug_error_string())) {
  170. return false;
  171. }
  172. custom_checks_fn(inspector, request, response);
  173. // Payload related checks.
  174. GPR_ASSERT(response->payload().body() ==
  175. std::string(kLargeResponseSize, '\0'));
  176. return true;
  177. }
  178. bool InteropClient::DoComputeEngineCreds(
  179. const std::string& default_service_account,
  180. const std::string& oauth_scope) {
  181. gpr_log(GPR_DEBUG,
  182. "Sending a large unary rpc with compute engine credentials ...");
  183. SimpleRequest request;
  184. SimpleResponse response;
  185. request.set_fill_username(true);
  186. request.set_fill_oauth_scope(true);
  187. if (!PerformLargeUnary(&request, &response)) {
  188. return false;
  189. }
  190. gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
  191. gpr_log(GPR_DEBUG, "Got oauth_scope %s", response.oauth_scope().c_str());
  192. GPR_ASSERT(!response.username().empty());
  193. GPR_ASSERT(response.username().c_str() == default_service_account);
  194. GPR_ASSERT(!response.oauth_scope().empty());
  195. const char* oauth_scope_str = response.oauth_scope().c_str();
  196. GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
  197. gpr_log(GPR_DEBUG, "Large unary with compute engine creds done.");
  198. return true;
  199. }
  200. bool InteropClient::DoOauth2AuthToken(const std::string& username,
  201. const std::string& oauth_scope) {
  202. gpr_log(GPR_DEBUG,
  203. "Sending a unary rpc with raw oauth2 access token credentials ...");
  204. SimpleRequest request;
  205. SimpleResponse response;
  206. request.set_fill_username(true);
  207. request.set_fill_oauth_scope(true);
  208. ClientContext context;
  209. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  210. if (!AssertStatusOk(s, context.debug_error_string())) {
  211. return false;
  212. }
  213. GPR_ASSERT(!response.username().empty());
  214. GPR_ASSERT(!response.oauth_scope().empty());
  215. GPR_ASSERT(username == response.username());
  216. const char* oauth_scope_str = response.oauth_scope().c_str();
  217. GPR_ASSERT(absl::StrContains(oauth_scope, oauth_scope_str));
  218. gpr_log(GPR_DEBUG, "Unary with oauth2 access token credentials done.");
  219. return true;
  220. }
  221. bool InteropClient::DoPerRpcCreds(const std::string& json_key) {
  222. gpr_log(GPR_DEBUG, "Sending a unary rpc with per-rpc JWT access token ...");
  223. SimpleRequest request;
  224. SimpleResponse response;
  225. request.set_fill_username(true);
  226. ClientContext context;
  227. std::chrono::seconds token_lifetime = std::chrono::hours(1);
  228. std::shared_ptr<CallCredentials> creds =
  229. ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
  230. context.set_credentials(creds);
  231. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  232. if (!AssertStatusOk(s, context.debug_error_string())) {
  233. return false;
  234. }
  235. GPR_ASSERT(!response.username().empty());
  236. GPR_ASSERT(json_key.find(response.username()) != std::string::npos);
  237. gpr_log(GPR_DEBUG, "Unary with per-rpc JWT access token done.");
  238. return true;
  239. }
  240. bool InteropClient::DoJwtTokenCreds(const std::string& username) {
  241. gpr_log(GPR_DEBUG,
  242. "Sending a large unary rpc with JWT token credentials ...");
  243. SimpleRequest request;
  244. SimpleResponse response;
  245. request.set_fill_username(true);
  246. if (!PerformLargeUnary(&request, &response)) {
  247. return false;
  248. }
  249. GPR_ASSERT(!response.username().empty());
  250. GPR_ASSERT(username.find(response.username()) != std::string::npos);
  251. gpr_log(GPR_DEBUG, "Large unary with JWT token creds done.");
  252. return true;
  253. }
  254. bool InteropClient::DoGoogleDefaultCredentials(
  255. const std::string& default_service_account) {
  256. gpr_log(GPR_DEBUG,
  257. "Sending a large unary rpc with GoogleDefaultCredentials...");
  258. SimpleRequest request;
  259. SimpleResponse response;
  260. request.set_fill_username(true);
  261. if (!PerformLargeUnary(&request, &response)) {
  262. return false;
  263. }
  264. gpr_log(GPR_DEBUG, "Got username %s", response.username().c_str());
  265. GPR_ASSERT(!response.username().empty());
  266. GPR_ASSERT(response.username().c_str() == default_service_account);
  267. gpr_log(GPR_DEBUG, "Large unary rpc with GoogleDefaultCredentials done.");
  268. return true;
  269. }
  270. bool InteropClient::DoLargeUnary() {
  271. gpr_log(GPR_DEBUG, "Sending a large unary rpc...");
  272. SimpleRequest request;
  273. SimpleResponse response;
  274. if (!PerformLargeUnary(&request, &response)) {
  275. return false;
  276. }
  277. gpr_log(GPR_DEBUG, "Large unary done.");
  278. return true;
  279. }
  280. bool InteropClient::DoClientCompressedUnary() {
  281. // Probing for compression-checks support.
  282. ClientContext probe_context;
  283. SimpleRequest probe_req;
  284. SimpleResponse probe_res;
  285. probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
  286. probe_req.mutable_expect_compressed()->set_value(true); // lies!
  287. probe_req.set_response_size(kLargeResponseSize);
  288. probe_req.mutable_payload()->set_body(std::string(kLargeRequestSize, '\0'));
  289. gpr_log(GPR_DEBUG, "Sending probe for compressed unary request.");
  290. const Status s =
  291. serviceStub_.Get()->UnaryCall(&probe_context, probe_req, &probe_res);
  292. if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
  293. // The server isn't able to evaluate incoming compression, making the rest
  294. // of this test moot.
  295. gpr_log(GPR_DEBUG, "Compressed unary request probe failed");
  296. return false;
  297. }
  298. gpr_log(GPR_DEBUG, "Compressed unary request probe succeeded. Proceeding.");
  299. const std::vector<bool> compressions = {true, false};
  300. for (size_t i = 0; i < compressions.size(); i++) {
  301. std::string log_suffix =
  302. absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
  303. gpr_log(GPR_DEBUG, "Sending compressed unary request %s.",
  304. log_suffix.c_str());
  305. SimpleRequest request;
  306. SimpleResponse response;
  307. request.mutable_expect_compressed()->set_value(compressions[i]);
  308. if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
  309. gpr_log(GPR_ERROR, "Compressed unary request failed %s",
  310. log_suffix.c_str());
  311. return false;
  312. }
  313. gpr_log(GPR_DEBUG, "Compressed unary request failed %s",
  314. log_suffix.c_str());
  315. }
  316. return true;
  317. }
  318. bool InteropClient::DoServerCompressedUnary() {
  319. const std::vector<bool> compressions = {true, false};
  320. for (size_t i = 0; i < compressions.size(); i++) {
  321. std::string log_suffix =
  322. absl::StrFormat("(compression=%s)", compressions[i] ? "true" : "false");
  323. gpr_log(GPR_DEBUG, "Sending unary request for compressed response %s.",
  324. log_suffix.c_str());
  325. SimpleRequest request;
  326. SimpleResponse response;
  327. request.mutable_response_compressed()->set_value(compressions[i]);
  328. if (!PerformLargeUnary(&request, &response, UnaryCompressionChecks)) {
  329. gpr_log(GPR_ERROR, "Request for compressed unary failed %s",
  330. log_suffix.c_str());
  331. return false;
  332. }
  333. gpr_log(GPR_DEBUG, "Request for compressed unary failed %s",
  334. log_suffix.c_str());
  335. }
  336. return true;
  337. }
  338. // Either abort() (unless do_not_abort_on_transient_failures_ is true) or return
  339. // false
  340. bool InteropClient::TransientFailureOrAbort() {
  341. if (do_not_abort_on_transient_failures_) {
  342. return false;
  343. }
  344. abort();
  345. }
  346. bool InteropClient::DoRequestStreaming() {
  347. gpr_log(GPR_DEBUG, "Sending request steaming rpc ...");
  348. ClientContext context;
  349. StreamingInputCallRequest request;
  350. StreamingInputCallResponse response;
  351. std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
  352. serviceStub_.Get()->StreamingInputCall(&context, &response));
  353. int aggregated_payload_size = 0;
  354. for (size_t i = 0; i < request_stream_sizes.size(); ++i) {
  355. Payload* payload = request.mutable_payload();
  356. payload->set_body(std::string(request_stream_sizes[i], '\0'));
  357. if (!stream->Write(request)) {
  358. gpr_log(GPR_ERROR, "DoRequestStreaming(): stream->Write() failed");
  359. return TransientFailureOrAbort();
  360. }
  361. aggregated_payload_size += request_stream_sizes[i];
  362. }
  363. GPR_ASSERT(stream->WritesDone());
  364. Status s = stream->Finish();
  365. if (!AssertStatusOk(s, context.debug_error_string())) {
  366. return false;
  367. }
  368. GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
  369. return true;
  370. }
  371. bool InteropClient::DoResponseStreaming() {
  372. gpr_log(GPR_DEBUG, "Receiving response streaming rpc ...");
  373. ClientContext context;
  374. StreamingOutputCallRequest request;
  375. for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
  376. ResponseParameters* response_parameter = request.add_response_parameters();
  377. response_parameter->set_size(response_stream_sizes[i]);
  378. }
  379. StreamingOutputCallResponse response;
  380. std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
  381. serviceStub_.Get()->StreamingOutputCall(&context, request));
  382. unsigned int i = 0;
  383. while (stream->Read(&response)) {
  384. GPR_ASSERT(response.payload().body() ==
  385. std::string(response_stream_sizes[i], '\0'));
  386. ++i;
  387. }
  388. if (i < response_stream_sizes.size()) {
  389. // stream->Read() failed before reading all the expected messages. This is
  390. // most likely due to connection failure.
  391. gpr_log(GPR_ERROR,
  392. "DoResponseStreaming(): Read fewer streams (%d) than "
  393. "response_stream_sizes.size() (%" PRIuPTR ")",
  394. i, response_stream_sizes.size());
  395. return TransientFailureOrAbort();
  396. }
  397. Status s = stream->Finish();
  398. if (!AssertStatusOk(s, context.debug_error_string())) {
  399. return false;
  400. }
  401. gpr_log(GPR_DEBUG, "Response streaming done.");
  402. return true;
  403. }
  404. bool InteropClient::DoClientCompressedStreaming() {
  405. // Probing for compression-checks support.
  406. ClientContext probe_context;
  407. StreamingInputCallRequest probe_req;
  408. StreamingInputCallResponse probe_res;
  409. probe_context.set_compression_algorithm(GRPC_COMPRESS_NONE);
  410. probe_req.mutable_expect_compressed()->set_value(true); // lies!
  411. probe_req.mutable_payload()->set_body(std::string(27182, '\0'));
  412. gpr_log(GPR_DEBUG, "Sending probe for compressed streaming request.");
  413. std::unique_ptr<ClientWriter<StreamingInputCallRequest>> probe_stream(
  414. serviceStub_.Get()->StreamingInputCall(&probe_context, &probe_res));
  415. if (!probe_stream->Write(probe_req)) {
  416. gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
  417. return TransientFailureOrAbort();
  418. }
  419. Status s = probe_stream->Finish();
  420. if (s.error_code() != grpc::StatusCode::INVALID_ARGUMENT) {
  421. // The server isn't able to evaluate incoming compression, making the rest
  422. // of this test moot.
  423. gpr_log(GPR_DEBUG, "Compressed streaming request probe failed");
  424. return false;
  425. }
  426. gpr_log(GPR_DEBUG,
  427. "Compressed streaming request probe succeeded. Proceeding.");
  428. ClientContext context;
  429. StreamingInputCallRequest request;
  430. StreamingInputCallResponse response;
  431. context.set_compression_algorithm(GRPC_COMPRESS_GZIP);
  432. std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
  433. serviceStub_.Get()->StreamingInputCall(&context, &response));
  434. request.mutable_payload()->set_body(std::string(27182, '\0'));
  435. request.mutable_expect_compressed()->set_value(true);
  436. gpr_log(GPR_DEBUG, "Sending streaming request with compression enabled");
  437. if (!stream->Write(request)) {
  438. gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
  439. return TransientFailureOrAbort();
  440. }
  441. WriteOptions wopts;
  442. wopts.set_no_compression();
  443. request.mutable_payload()->set_body(std::string(45904, '\0'));
  444. request.mutable_expect_compressed()->set_value(false);
  445. gpr_log(GPR_DEBUG, "Sending streaming request with compression disabled");
  446. if (!stream->Write(request, wopts)) {
  447. gpr_log(GPR_ERROR, "%s(): stream->Write() failed", __func__);
  448. return TransientFailureOrAbort();
  449. }
  450. GPR_ASSERT(stream->WritesDone());
  451. s = stream->Finish();
  452. return AssertStatusOk(s, context.debug_error_string());
  453. }
  454. bool InteropClient::DoServerCompressedStreaming() {
  455. const std::vector<bool> compressions = {true, false};
  456. const std::vector<int> sizes = {31415, 92653};
  457. ClientContext context;
  458. InteropClientContextInspector inspector(context);
  459. StreamingOutputCallRequest request;
  460. GPR_ASSERT(compressions.size() == sizes.size());
  461. for (size_t i = 0; i < sizes.size(); i++) {
  462. std::string log_suffix =
  463. absl::StrFormat("(compression=%s; size=%d)",
  464. compressions[i] ? "true" : "false", sizes[i]);
  465. gpr_log(GPR_DEBUG, "Sending request streaming rpc %s.", log_suffix.c_str());
  466. ResponseParameters* const response_parameter =
  467. request.add_response_parameters();
  468. response_parameter->mutable_compressed()->set_value(compressions[i]);
  469. response_parameter->set_size(sizes[i]);
  470. }
  471. std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
  472. serviceStub_.Get()->StreamingOutputCall(&context, request));
  473. size_t k = 0;
  474. StreamingOutputCallResponse response;
  475. while (stream->Read(&response)) {
  476. // Payload size checks.
  477. GPR_ASSERT(response.payload().body() ==
  478. std::string(request.response_parameters(k).size(), '\0'));
  479. // Compression checks.
  480. GPR_ASSERT(request.response_parameters(k).has_compressed());
  481. if (request.response_parameters(k).compressed().value()) {
  482. GPR_ASSERT(inspector.GetCallCompressionAlgorithm() > GRPC_COMPRESS_NONE);
  483. GPR_ASSERT(inspector.WasCompressed());
  484. } else {
  485. // requested *no* compression.
  486. GPR_ASSERT(!(inspector.WasCompressed()));
  487. }
  488. ++k;
  489. }
  490. if (k < sizes.size()) {
  491. // stream->Read() failed before reading all the expected messages. This
  492. // is most likely due to a connection failure.
  493. gpr_log(GPR_ERROR,
  494. "%s(): Responses read (k=%" PRIuPTR
  495. ") is less than the expected number of messages (%" PRIuPTR ").",
  496. __func__, k, sizes.size());
  497. return TransientFailureOrAbort();
  498. }
  499. Status s = stream->Finish();
  500. return AssertStatusOk(s, context.debug_error_string());
  501. }
  502. bool InteropClient::DoResponseStreamingWithSlowConsumer() {
  503. gpr_log(GPR_DEBUG, "Receiving response streaming rpc with slow consumer ...");
  504. ClientContext context;
  505. StreamingOutputCallRequest request;
  506. for (int i = 0; i < kNumResponseMessages; ++i) {
  507. ResponseParameters* response_parameter = request.add_response_parameters();
  508. response_parameter->set_size(kResponseMessageSize);
  509. }
  510. StreamingOutputCallResponse response;
  511. std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
  512. serviceStub_.Get()->StreamingOutputCall(&context, request));
  513. int i = 0;
  514. while (stream->Read(&response)) {
  515. GPR_ASSERT(response.payload().body() ==
  516. std::string(kResponseMessageSize, '\0'));
  517. gpr_log(GPR_DEBUG, "received message %d", i);
  518. gpr_sleep_until(gpr_time_add(
  519. gpr_now(GPR_CLOCK_REALTIME),
  520. gpr_time_from_millis(kReceiveDelayMilliSeconds, GPR_TIMESPAN)));
  521. ++i;
  522. }
  523. if (i < kNumResponseMessages) {
  524. gpr_log(GPR_ERROR,
  525. "DoResponseStreamingWithSlowConsumer(): Responses read (i=%d) is "
  526. "less than the expected messages (i.e kNumResponseMessages = %d)",
  527. i, kNumResponseMessages);
  528. return TransientFailureOrAbort();
  529. }
  530. Status s = stream->Finish();
  531. if (!AssertStatusOk(s, context.debug_error_string())) {
  532. return false;
  533. }
  534. gpr_log(GPR_DEBUG, "Response streaming done.");
  535. return true;
  536. }
  537. bool InteropClient::DoHalfDuplex() {
  538. gpr_log(GPR_DEBUG, "Sending half-duplex streaming rpc ...");
  539. ClientContext context;
  540. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  541. StreamingOutputCallResponse>>
  542. stream(serviceStub_.Get()->HalfDuplexCall(&context));
  543. StreamingOutputCallRequest request;
  544. ResponseParameters* response_parameter = request.add_response_parameters();
  545. for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
  546. response_parameter->set_size(response_stream_sizes[i]);
  547. if (!stream->Write(request)) {
  548. gpr_log(GPR_ERROR, "DoHalfDuplex(): stream->Write() failed. i=%d", i);
  549. return TransientFailureOrAbort();
  550. }
  551. }
  552. stream->WritesDone();
  553. unsigned int i = 0;
  554. StreamingOutputCallResponse response;
  555. while (stream->Read(&response)) {
  556. GPR_ASSERT(response.payload().body() ==
  557. std::string(response_stream_sizes[i], '\0'));
  558. ++i;
  559. }
  560. if (i < response_stream_sizes.size()) {
  561. // stream->Read() failed before reading all the expected messages. This is
  562. // most likely due to a connection failure
  563. gpr_log(GPR_ERROR,
  564. "DoHalfDuplex(): Responses read (i=%d) are less than the expected "
  565. "number of messages response_stream_sizes.size() (%" PRIuPTR ")",
  566. i, response_stream_sizes.size());
  567. return TransientFailureOrAbort();
  568. }
  569. Status s = stream->Finish();
  570. if (!AssertStatusOk(s, context.debug_error_string())) {
  571. return false;
  572. }
  573. gpr_log(GPR_DEBUG, "Half-duplex streaming rpc done.");
  574. return true;
  575. }
  576. bool InteropClient::DoPingPong() {
  577. gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
  578. ClientContext context;
  579. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  580. StreamingOutputCallResponse>>
  581. stream(serviceStub_.Get()->FullDuplexCall(&context));
  582. StreamingOutputCallRequest request;
  583. ResponseParameters* response_parameter = request.add_response_parameters();
  584. Payload* payload = request.mutable_payload();
  585. StreamingOutputCallResponse response;
  586. for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
  587. response_parameter->set_size(response_stream_sizes[i]);
  588. payload->set_body(std::string(request_stream_sizes[i], '\0'));
  589. if (!stream->Write(request)) {
  590. gpr_log(GPR_ERROR, "DoPingPong(): stream->Write() failed. i: %d", i);
  591. return TransientFailureOrAbort();
  592. }
  593. if (!stream->Read(&response)) {
  594. gpr_log(GPR_ERROR, "DoPingPong(): stream->Read() failed. i:%d", i);
  595. return TransientFailureOrAbort();
  596. }
  597. GPR_ASSERT(response.payload().body() ==
  598. std::string(response_stream_sizes[i], '\0'));
  599. }
  600. stream->WritesDone();
  601. GPR_ASSERT(!stream->Read(&response));
  602. Status s = stream->Finish();
  603. if (!AssertStatusOk(s, context.debug_error_string())) {
  604. return false;
  605. }
  606. gpr_log(GPR_DEBUG, "Ping pong streaming done.");
  607. return true;
  608. }
  609. bool InteropClient::DoCancelAfterBegin() {
  610. gpr_log(GPR_DEBUG, "Sending request streaming rpc ...");
  611. ClientContext context;
  612. StreamingInputCallRequest request;
  613. StreamingInputCallResponse response;
  614. std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
  615. serviceStub_.Get()->StreamingInputCall(&context, &response));
  616. gpr_log(GPR_DEBUG, "Trying to cancel...");
  617. context.TryCancel();
  618. Status s = stream->Finish();
  619. if (!AssertStatusCode(s, StatusCode::CANCELLED,
  620. context.debug_error_string())) {
  621. return false;
  622. }
  623. gpr_log(GPR_DEBUG, "Canceling streaming done.");
  624. return true;
  625. }
  626. bool InteropClient::DoCancelAfterFirstResponse() {
  627. gpr_log(GPR_DEBUG, "Sending Ping Pong streaming rpc ...");
  628. ClientContext context;
  629. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  630. StreamingOutputCallResponse>>
  631. stream(serviceStub_.Get()->FullDuplexCall(&context));
  632. StreamingOutputCallRequest request;
  633. ResponseParameters* response_parameter = request.add_response_parameters();
  634. response_parameter->set_size(31415);
  635. request.mutable_payload()->set_body(std::string(27182, '\0'));
  636. StreamingOutputCallResponse response;
  637. if (!stream->Write(request)) {
  638. gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Write() failed");
  639. return TransientFailureOrAbort();
  640. }
  641. if (!stream->Read(&response)) {
  642. gpr_log(GPR_ERROR, "DoCancelAfterFirstResponse(): stream->Read failed");
  643. return TransientFailureOrAbort();
  644. }
  645. GPR_ASSERT(response.payload().body() == std::string(31415, '\0'));
  646. gpr_log(GPR_DEBUG, "Trying to cancel...");
  647. context.TryCancel();
  648. Status s = stream->Finish();
  649. gpr_log(GPR_DEBUG, "Canceling pingpong streaming done.");
  650. return true;
  651. }
  652. bool InteropClient::DoTimeoutOnSleepingServer() {
  653. gpr_log(GPR_DEBUG,
  654. "Sending Ping Pong streaming rpc with a short deadline...");
  655. ClientContext context;
  656. std::chrono::system_clock::time_point deadline =
  657. std::chrono::system_clock::now() + std::chrono::milliseconds(1);
  658. context.set_deadline(deadline);
  659. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  660. StreamingOutputCallResponse>>
  661. stream(serviceStub_.Get()->FullDuplexCall(&context));
  662. StreamingOutputCallRequest request;
  663. request.mutable_payload()->set_body(std::string(27182, '\0'));
  664. stream->Write(request);
  665. Status s = stream->Finish();
  666. if (!AssertStatusCode(s, StatusCode::DEADLINE_EXCEEDED,
  667. context.debug_error_string())) {
  668. return false;
  669. }
  670. gpr_log(GPR_DEBUG, "Pingpong streaming timeout done.");
  671. return true;
  672. }
  673. bool InteropClient::DoEmptyStream() {
  674. gpr_log(GPR_DEBUG, "Starting empty_stream.");
  675. ClientContext context;
  676. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  677. StreamingOutputCallResponse>>
  678. stream(serviceStub_.Get()->FullDuplexCall(&context));
  679. stream->WritesDone();
  680. StreamingOutputCallResponse response;
  681. GPR_ASSERT(stream->Read(&response) == false);
  682. Status s = stream->Finish();
  683. if (!AssertStatusOk(s, context.debug_error_string())) {
  684. return false;
  685. }
  686. gpr_log(GPR_DEBUG, "empty_stream done.");
  687. return true;
  688. }
  689. bool InteropClient::DoStatusWithMessage() {
  690. gpr_log(GPR_DEBUG,
  691. "Sending RPC with a request for status code 2 and message");
  692. const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
  693. const std::string test_msg = "This is a test message";
  694. // Test UnaryCall.
  695. ClientContext context;
  696. SimpleRequest request;
  697. SimpleResponse response;
  698. EchoStatus* requested_status = request.mutable_response_status();
  699. requested_status->set_code(test_code);
  700. requested_status->set_message(test_msg);
  701. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  702. if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
  703. context.debug_error_string())) {
  704. return false;
  705. }
  706. GPR_ASSERT(s.error_message() == test_msg);
  707. // Test FullDuplexCall.
  708. ClientContext stream_context;
  709. std::shared_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  710. StreamingOutputCallResponse>>
  711. stream(serviceStub_.Get()->FullDuplexCall(&stream_context));
  712. StreamingOutputCallRequest streaming_request;
  713. requested_status = streaming_request.mutable_response_status();
  714. requested_status->set_code(test_code);
  715. requested_status->set_message(test_msg);
  716. stream->Write(streaming_request);
  717. stream->WritesDone();
  718. StreamingOutputCallResponse streaming_response;
  719. while (stream->Read(&streaming_response)) {
  720. }
  721. s = stream->Finish();
  722. if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
  723. context.debug_error_string())) {
  724. return false;
  725. }
  726. GPR_ASSERT(s.error_message() == test_msg);
  727. gpr_log(GPR_DEBUG, "Done testing Status and Message");
  728. return true;
  729. }
  730. bool InteropClient::DoSpecialStatusMessage() {
  731. gpr_log(
  732. GPR_DEBUG,
  733. "Sending RPC with a request for status code 2 and message - \\t\\ntest "
  734. "with whitespace\\r\\nand Unicode BMP ☺ and non-BMP 😈\\t\\n");
  735. const grpc::StatusCode test_code = grpc::StatusCode::UNKNOWN;
  736. const std::string test_msg =
  737. "\t\ntest with whitespace\r\nand Unicode BMP ☺ and non-BMP 😈\t\n";
  738. ClientContext context;
  739. SimpleRequest request;
  740. SimpleResponse response;
  741. EchoStatus* requested_status = request.mutable_response_status();
  742. requested_status->set_code(test_code);
  743. requested_status->set_message(test_msg);
  744. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  745. if (!AssertStatusCode(s, grpc::StatusCode::UNKNOWN,
  746. context.debug_error_string())) {
  747. return false;
  748. }
  749. GPR_ASSERT(s.error_message() == test_msg);
  750. gpr_log(GPR_DEBUG, "Done testing Special Status Message");
  751. return true;
  752. }
  753. bool InteropClient::DoCacheableUnary() {
  754. gpr_log(GPR_DEBUG, "Sending RPC with cacheable response");
  755. // Create request with current timestamp
  756. gpr_timespec ts = gpr_now(GPR_CLOCK_PRECISE);
  757. std::string timestamp =
  758. std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
  759. SimpleRequest request;
  760. request.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
  761. // Request 1
  762. ClientContext context1;
  763. SimpleResponse response1;
  764. context1.set_cacheable(true);
  765. // Add fake user IP since some proxy's (GFE) won't cache requests from
  766. // localhost.
  767. context1.AddMetadata("x-user-ip", "1.2.3.4");
  768. Status s1 =
  769. serviceStub_.Get()->CacheableUnaryCall(&context1, request, &response1);
  770. if (!AssertStatusOk(s1, context1.debug_error_string())) {
  771. return false;
  772. }
  773. gpr_log(GPR_DEBUG, "response 1 payload: %s",
  774. response1.payload().body().c_str());
  775. // Request 2
  776. ClientContext context2;
  777. SimpleResponse response2;
  778. context2.set_cacheable(true);
  779. context2.AddMetadata("x-user-ip", "1.2.3.4");
  780. Status s2 =
  781. serviceStub_.Get()->CacheableUnaryCall(&context2, request, &response2);
  782. if (!AssertStatusOk(s2, context2.debug_error_string())) {
  783. return false;
  784. }
  785. gpr_log(GPR_DEBUG, "response 2 payload: %s",
  786. response2.payload().body().c_str());
  787. // Check that the body is same for both requests. It will be the same if the
  788. // second response is a cached copy of the first response
  789. GPR_ASSERT(response2.payload().body() == response1.payload().body());
  790. // Request 3
  791. // Modify the request body so it will not get a cache hit
  792. ts = gpr_now(GPR_CLOCK_PRECISE);
  793. timestamp = std::to_string(static_cast<long long unsigned>(ts.tv_nsec));
  794. SimpleRequest request1;
  795. request1.mutable_payload()->set_body(timestamp.c_str(), timestamp.size());
  796. ClientContext context3;
  797. SimpleResponse response3;
  798. context3.set_cacheable(true);
  799. context3.AddMetadata("x-user-ip", "1.2.3.4");
  800. Status s3 =
  801. serviceStub_.Get()->CacheableUnaryCall(&context3, request1, &response3);
  802. if (!AssertStatusOk(s3, context3.debug_error_string())) {
  803. return false;
  804. }
  805. gpr_log(GPR_DEBUG, "response 3 payload: %s",
  806. response3.payload().body().c_str());
  807. // Check that the response is different from the previous response.
  808. GPR_ASSERT(response3.payload().body() != response1.payload().body());
  809. return true;
  810. }
  811. bool InteropClient::DoPickFirstUnary() {
  812. const int rpcCount = 100;
  813. SimpleRequest request;
  814. SimpleResponse response;
  815. std::string server_id;
  816. request.set_fill_server_id(true);
  817. for (int i = 0; i < rpcCount; i++) {
  818. ClientContext context;
  819. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  820. if (!AssertStatusOk(s, context.debug_error_string())) {
  821. return false;
  822. }
  823. if (i == 0) {
  824. server_id = response.server_id();
  825. continue;
  826. }
  827. if (response.server_id() != server_id) {
  828. gpr_log(GPR_ERROR, "#%d rpc hits server_id %s, expect server_id %s", i,
  829. response.server_id().c_str(), server_id.c_str());
  830. return false;
  831. }
  832. }
  833. gpr_log(GPR_DEBUG, "pick first unary successfully finished");
  834. return true;
  835. }
  836. bool InteropClient::DoCustomMetadata() {
  837. const std::string kEchoInitialMetadataKey("x-grpc-test-echo-initial");
  838. const std::string kInitialMetadataValue("test_initial_metadata_value");
  839. const std::string kEchoTrailingBinMetadataKey(
  840. "x-grpc-test-echo-trailing-bin");
  841. const std::string kTrailingBinValue("\x0a\x0b\x0a\x0b\x0a\x0b");
  842. {
  843. gpr_log(GPR_DEBUG, "Sending RPC with custom metadata");
  844. ClientContext context;
  845. context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
  846. context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
  847. SimpleRequest request;
  848. SimpleResponse response;
  849. request.set_response_size(kLargeResponseSize);
  850. std::string payload(kLargeRequestSize, '\0');
  851. request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
  852. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  853. if (!AssertStatusOk(s, context.debug_error_string())) {
  854. return false;
  855. }
  856. const auto& server_initial_metadata = context.GetServerInitialMetadata();
  857. auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
  858. GPR_ASSERT(iter != server_initial_metadata.end());
  859. GPR_ASSERT(iter->second == kInitialMetadataValue);
  860. const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
  861. iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
  862. GPR_ASSERT(iter != server_trailing_metadata.end());
  863. GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
  864. kTrailingBinValue);
  865. gpr_log(GPR_DEBUG, "Done testing RPC with custom metadata");
  866. }
  867. {
  868. gpr_log(GPR_DEBUG, "Sending stream with custom metadata");
  869. ClientContext context;
  870. context.AddMetadata(kEchoInitialMetadataKey, kInitialMetadataValue);
  871. context.AddMetadata(kEchoTrailingBinMetadataKey, kTrailingBinValue);
  872. std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
  873. StreamingOutputCallResponse>>
  874. stream(serviceStub_.Get()->FullDuplexCall(&context));
  875. StreamingOutputCallRequest request;
  876. ResponseParameters* response_parameter = request.add_response_parameters();
  877. response_parameter->set_size(kLargeResponseSize);
  878. std::string payload(kLargeRequestSize, '\0');
  879. request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
  880. StreamingOutputCallResponse response;
  881. if (!stream->Write(request)) {
  882. gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Write() failed");
  883. return TransientFailureOrAbort();
  884. }
  885. stream->WritesDone();
  886. if (!stream->Read(&response)) {
  887. gpr_log(GPR_ERROR, "DoCustomMetadata(): stream->Read() failed");
  888. return TransientFailureOrAbort();
  889. }
  890. GPR_ASSERT(response.payload().body() ==
  891. std::string(kLargeResponseSize, '\0'));
  892. GPR_ASSERT(!stream->Read(&response));
  893. Status s = stream->Finish();
  894. if (!AssertStatusOk(s, context.debug_error_string())) {
  895. return false;
  896. }
  897. const auto& server_initial_metadata = context.GetServerInitialMetadata();
  898. auto iter = server_initial_metadata.find(kEchoInitialMetadataKey);
  899. GPR_ASSERT(iter != server_initial_metadata.end());
  900. GPR_ASSERT(iter->second == kInitialMetadataValue);
  901. const auto& server_trailing_metadata = context.GetServerTrailingMetadata();
  902. iter = server_trailing_metadata.find(kEchoTrailingBinMetadataKey);
  903. GPR_ASSERT(iter != server_trailing_metadata.end());
  904. GPR_ASSERT(std::string(iter->second.begin(), iter->second.end()) ==
  905. kTrailingBinValue);
  906. gpr_log(GPR_DEBUG, "Done testing stream with custom metadata");
  907. }
  908. return true;
  909. }
  910. std::tuple<bool, int32_t, std::string>
  911. InteropClient::PerformOneSoakTestIteration(
  912. const bool reset_channel,
  913. const int32_t max_acceptable_per_iteration_latency_ms) {
  914. gpr_timespec start = gpr_now(GPR_CLOCK_MONOTONIC);
  915. SimpleRequest request;
  916. SimpleResponse response;
  917. // Don't set the deadline on the RPC, and instead just
  918. // record how long the RPC took and compare. This makes
  919. // debugging easier when looking at failure results.
  920. ClientContext context;
  921. InteropClientContextInspector inspector(context);
  922. request.set_response_size(kLargeResponseSize);
  923. std::string payload(kLargeRequestSize, '\0');
  924. request.mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
  925. if (reset_channel) {
  926. serviceStub_.ResetChannel();
  927. }
  928. Status s = serviceStub_.Get()->UnaryCall(&context, request, &response);
  929. gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
  930. int32_t elapsed_ms = gpr_time_to_millis(gpr_time_sub(now, start));
  931. if (!s.ok()) {
  932. return std::make_tuple(false, elapsed_ms, context.debug_error_string());
  933. } else if (elapsed_ms > max_acceptable_per_iteration_latency_ms) {
  934. std::string debug_string = absl::StrFormat(
  935. "%d ms exceeds max acceptable latency: %d ms, peer: %s", elapsed_ms,
  936. max_acceptable_per_iteration_latency_ms, context.peer());
  937. return std::make_tuple(false, elapsed_ms, std::move(debug_string));
  938. } else {
  939. return std::make_tuple(true, elapsed_ms, "");
  940. }
  941. }
  942. void InteropClient::PerformSoakTest(
  943. const bool reset_channel_per_iteration, const int32_t soak_iterations,
  944. const int32_t max_failures,
  945. const int32_t max_acceptable_per_iteration_latency_ms,
  946. const int32_t overall_timeout_seconds) {
  947. std::vector<std::tuple<bool, int32_t, std::string>> results;
  948. grpc_histogram* latencies_ms_histogram = grpc_histogram_create(
  949. 1 /* resolution */,
  950. 500 * 1e3 /* largest bucket; 500 seconds is unlikely */);
  951. gpr_timespec overall_deadline = gpr_time_add(
  952. gpr_now(GPR_CLOCK_MONOTONIC),
  953. gpr_time_from_seconds(overall_timeout_seconds, GPR_TIMESPAN));
  954. int32_t iterations_ran = 0;
  955. for (int i = 0;
  956. i < soak_iterations &&
  957. gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), overall_deadline) < 0;
  958. ++i) {
  959. auto result = PerformOneSoakTestIteration(
  960. reset_channel_per_iteration, max_acceptable_per_iteration_latency_ms);
  961. results.push_back(result);
  962. grpc_histogram_add(latencies_ms_histogram, std::get<1>(result));
  963. iterations_ran++;
  964. }
  965. int total_failures = 0;
  966. for (size_t i = 0; i < results.size(); i++) {
  967. bool success = std::get<0>(results[i]);
  968. int32_t elapsed_ms = std::get<1>(results[i]);
  969. std::string debug_string = std::get<2>(results[i]);
  970. if (!success) {
  971. gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d failed: %s", i,
  972. elapsed_ms, debug_string.c_str());
  973. total_failures++;
  974. } else {
  975. gpr_log(GPR_DEBUG, "soak iteration: %ld elapsed_ms: %d succeeded", i,
  976. elapsed_ms);
  977. }
  978. }
  979. double latency_ms_median =
  980. grpc_histogram_percentile(latencies_ms_histogram, 50);
  981. double latency_ms_90th =
  982. grpc_histogram_percentile(latencies_ms_histogram, 90);
  983. double latency_ms_worst = grpc_histogram_maximum(latencies_ms_histogram);
  984. grpc_histogram_destroy(latencies_ms_histogram);
  985. if (iterations_ran < soak_iterations) {
  986. gpr_log(
  987. GPR_ERROR,
  988. "soak test consumed all %d seconds of time and quit early, only "
  989. "having ran %d out of desired %d iterations. "
  990. "total_failures: %d. "
  991. "max_failures_threshold: %d. "
  992. "median_soak_iteration_latency: %lf ms. "
  993. "90th_soak_iteration_latency: %lf ms. "
  994. "worst_soak_iteration_latency: %lf ms. "
  995. "Some or all of the iterations that did run were unexpectedly slow. "
  996. "See breakdown above for which iterations succeeded, failed, and "
  997. "why for more info.",
  998. overall_timeout_seconds, iterations_ran, soak_iterations,
  999. total_failures, max_failures, latency_ms_median, latency_ms_90th,
  1000. latency_ms_worst);
  1001. GPR_ASSERT(0);
  1002. } else if (total_failures > max_failures) {
  1003. gpr_log(GPR_ERROR,
  1004. "soak test ran: %d iterations. total_failures: %d exceeds "
  1005. "max_failures_threshold: %d. "
  1006. "median_soak_iteration_latency: %lf ms. "
  1007. "90th_soak_iteration_latency: %lf ms. "
  1008. "worst_soak_iteration_latency: %lf ms. "
  1009. "See breakdown above for which iterations succeeded, failed, and "
  1010. "why for more info.",
  1011. soak_iterations, total_failures, max_failures, latency_ms_median,
  1012. latency_ms_90th, latency_ms_worst);
  1013. GPR_ASSERT(0);
  1014. } else {
  1015. gpr_log(GPR_INFO,
  1016. "soak test ran: %d iterations. total_failures: %d is within "
  1017. "max_failures_threshold: %d. "
  1018. "median_soak_iteration_latency: %lf ms. "
  1019. "90th_soak_iteration_latency: %lf ms. "
  1020. "worst_soak_iteration_latency: %lf ms. "
  1021. "See breakdown above for which iterations succeeded, failed, and "
  1022. "why for more info.",
  1023. soak_iterations, total_failures, max_failures, latency_ms_median,
  1024. latency_ms_90th, latency_ms_worst);
  1025. }
  1026. }
  1027. bool InteropClient::DoRpcSoakTest(
  1028. int32_t soak_iterations, int32_t max_failures,
  1029. int64_t max_acceptable_per_iteration_latency_ms,
  1030. int32_t overall_timeout_seconds) {
  1031. gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
  1032. GPR_ASSERT(soak_iterations > 0);
  1033. PerformSoakTest(false /* reset channel per iteration */, soak_iterations,
  1034. max_failures, max_acceptable_per_iteration_latency_ms,
  1035. overall_timeout_seconds);
  1036. gpr_log(GPR_DEBUG, "rpc_soak test done.");
  1037. return true;
  1038. }
  1039. bool InteropClient::DoChannelSoakTest(
  1040. int32_t soak_iterations, int32_t max_failures,
  1041. int64_t max_acceptable_per_iteration_latency_ms,
  1042. int32_t overall_timeout_seconds) {
  1043. gpr_log(GPR_DEBUG, "Sending %d RPCs, tearing down the channel each time...",
  1044. soak_iterations);
  1045. GPR_ASSERT(soak_iterations > 0);
  1046. PerformSoakTest(true /* reset channel per iteration */, soak_iterations,
  1047. max_failures, max_acceptable_per_iteration_latency_ms,
  1048. overall_timeout_seconds);
  1049. gpr_log(GPR_DEBUG, "channel_soak test done.");
  1050. return true;
  1051. }
  1052. bool InteropClient::DoLongLivedChannelTest(int32_t soak_iterations,
  1053. int32_t iteration_interval) {
  1054. gpr_log(GPR_DEBUG, "Sending %d RPCs...", soak_iterations);
  1055. GPR_ASSERT(soak_iterations > 0);
  1056. GPR_ASSERT(iteration_interval > 0);
  1057. SimpleRequest request;
  1058. SimpleResponse response;
  1059. int num_failures = 0;
  1060. for (int i = 0; i < soak_iterations; ++i) {
  1061. gpr_log(GPR_DEBUG, "Sending RPC number %d...", i);
  1062. if (!PerformLargeUnary(&request, &response)) {
  1063. gpr_log(GPR_ERROR, "Iteration %d failed.", i);
  1064. num_failures++;
  1065. }
  1066. gpr_sleep_until(
  1067. gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
  1068. gpr_time_from_seconds(iteration_interval, GPR_TIMESPAN)));
  1069. }
  1070. if (num_failures == 0) {
  1071. gpr_log(GPR_DEBUG, "long_lived_channel test done.");
  1072. return true;
  1073. } else {
  1074. gpr_log(GPR_DEBUG, "long_lived_channel test failed with %d rpc failures.",
  1075. num_failures);
  1076. return false;
  1077. }
  1078. }
  1079. bool InteropClient::DoUnimplementedService() {
  1080. gpr_log(GPR_DEBUG, "Sending a request for an unimplemented service...");
  1081. Empty request;
  1082. Empty response;
  1083. ClientContext context;
  1084. UnimplementedService::Stub* stub = serviceStub_.GetUnimplementedServiceStub();
  1085. Status s = stub->UnimplementedCall(&context, request, &response);
  1086. if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
  1087. context.debug_error_string())) {
  1088. return false;
  1089. }
  1090. gpr_log(GPR_DEBUG, "unimplemented service done.");
  1091. return true;
  1092. }
  1093. bool InteropClient::DoUnimplementedMethod() {
  1094. gpr_log(GPR_DEBUG, "Sending a request for an unimplemented rpc...");
  1095. Empty request;
  1096. Empty response;
  1097. ClientContext context;
  1098. Status s =
  1099. serviceStub_.Get()->UnimplementedCall(&context, request, &response);
  1100. if (!AssertStatusCode(s, StatusCode::UNIMPLEMENTED,
  1101. context.debug_error_string())) {
  1102. return false;
  1103. }
  1104. gpr_log(GPR_DEBUG, "unimplemented rpc done.");
  1105. return true;
  1106. }
  1107. } // namespace testing
  1108. } // namespace grpc