client_async.cc 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <forward_list>
  19. #include <functional>
  20. #include <list>
  21. #include <memory>
  22. #include <mutex>
  23. #include <sstream>
  24. #include <string>
  25. #include <thread>
  26. #include <utility>
  27. #include <vector>
  28. #include "absl/memory/memory.h"
  29. #include <grpc/grpc.h>
  30. #include <grpc/support/cpu.h>
  31. #include <grpc/support/log.h>
  32. #include <grpcpp/alarm.h>
  33. #include <grpcpp/channel.h>
  34. #include <grpcpp/client_context.h>
  35. #include <grpcpp/generic/generic_stub.h>
  36. #include "src/core/lib/surface/completion_queue.h"
  37. #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
  38. #include "test/cpp/qps/client.h"
  39. #include "test/cpp/qps/usage_timer.h"
  40. #include "test/cpp/util/create_test_channel.h"
  41. namespace grpc {
  42. namespace testing {
  43. class ClientRpcContext {
  44. public:
  45. ClientRpcContext() {}
  46. virtual ~ClientRpcContext() {}
  47. // next state, return false if done. Collect stats when appropriate
  48. virtual bool RunNextState(bool, HistogramEntry* entry) = 0;
  49. virtual void StartNewClone(CompletionQueue* cq) = 0;
  50. static void* tag(ClientRpcContext* c) { return static_cast<void*>(c); }
  51. static ClientRpcContext* detag(void* t) {
  52. return static_cast<ClientRpcContext*>(t);
  53. }
  54. virtual void Start(CompletionQueue* cq, const ClientConfig& config) = 0;
  55. virtual void TryCancel() = 0;
  56. };
  57. template <class RequestType, class ResponseType>
  58. class ClientRpcContextUnaryImpl : public ClientRpcContext {
  59. public:
  60. ClientRpcContextUnaryImpl(
  61. BenchmarkService::Stub* stub, const RequestType& req,
  62. std::function<gpr_timespec()> next_issue,
  63. std::function<
  64. std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
  65. BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
  66. CompletionQueue*)>
  67. prepare_req,
  68. std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> on_done)
  69. : context_(),
  70. stub_(stub),
  71. cq_(nullptr),
  72. req_(req),
  73. response_(),
  74. next_state_(State::READY),
  75. callback_(on_done),
  76. next_issue_(std::move(next_issue)),
  77. prepare_req_(prepare_req) {}
  78. ~ClientRpcContextUnaryImpl() override {}
  79. void Start(CompletionQueue* cq, const ClientConfig& config) override {
  80. GPR_ASSERT(!config.use_coalesce_api()); // not supported.
  81. StartInternal(cq);
  82. }
  83. bool RunNextState(bool /*ok*/, HistogramEntry* entry) override {
  84. switch (next_state_) {
  85. case State::READY:
  86. start_ = UsageTimer::Now();
  87. response_reader_ = prepare_req_(stub_, &context_, req_, cq_);
  88. response_reader_->StartCall();
  89. next_state_ = State::RESP_DONE;
  90. response_reader_->Finish(&response_, &status_,
  91. ClientRpcContext::tag(this));
  92. return true;
  93. case State::RESP_DONE:
  94. if (status_.ok()) {
  95. entry->set_value((UsageTimer::Now() - start_) * 1e9);
  96. }
  97. callback_(status_, &response_, entry);
  98. next_state_ = State::INVALID;
  99. return false;
  100. default:
  101. GPR_ASSERT(false);
  102. return false;
  103. }
  104. }
  105. void StartNewClone(CompletionQueue* cq) override {
  106. auto* clone = new ClientRpcContextUnaryImpl(stub_, req_, next_issue_,
  107. prepare_req_, callback_);
  108. clone->StartInternal(cq);
  109. }
  110. void TryCancel() override { context_.TryCancel(); }
  111. private:
  112. grpc::ClientContext context_;
  113. BenchmarkService::Stub* stub_;
  114. CompletionQueue* cq_;
  115. std::unique_ptr<Alarm> alarm_;
  116. const RequestType& req_;
  117. ResponseType response_;
  118. enum State { INVALID, READY, RESP_DONE };
  119. State next_state_;
  120. std::function<void(grpc::Status, ResponseType*, HistogramEntry*)> callback_;
  121. std::function<gpr_timespec()> next_issue_;
  122. std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
  123. BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
  124. CompletionQueue*)>
  125. prepare_req_;
  126. grpc::Status status_;
  127. double start_;
  128. std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
  129. response_reader_;
  130. void StartInternal(CompletionQueue* cq) {
  131. cq_ = cq;
  132. if (!next_issue_) { // ready to issue
  133. RunNextState(true, nullptr);
  134. } else { // wait for the issue time
  135. alarm_ = absl::make_unique<Alarm>();
  136. alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
  137. }
  138. }
  139. };
  140. template <class StubType, class RequestType>
  141. class AsyncClient : public ClientImpl<StubType, RequestType> {
  142. // Specify which protected members we are using since there is no
  143. // member name resolution until the template types are fully resolved
  144. public:
  145. using Client::closed_loop_;
  146. using Client::NextIssuer;
  147. using Client::SetupLoadTest;
  148. using ClientImpl<StubType, RequestType>::cores_;
  149. using ClientImpl<StubType, RequestType>::channels_;
  150. using ClientImpl<StubType, RequestType>::request_;
  151. AsyncClient(const ClientConfig& config,
  152. std::function<ClientRpcContext*(
  153. StubType*, std::function<gpr_timespec()> next_issue,
  154. const RequestType&)>
  155. setup_ctx,
  156. std::function<std::unique_ptr<StubType>(std::shared_ptr<Channel>)>
  157. create_stub)
  158. : ClientImpl<StubType, RequestType>(config, create_stub),
  159. num_async_threads_(NumThreads(config)) {
  160. SetupLoadTest(config, num_async_threads_);
  161. int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
  162. int num_cqs = (num_async_threads_ + tpc - 1) / tpc; // ceiling operator
  163. for (int i = 0; i < num_cqs; i++) {
  164. cli_cqs_.emplace_back(new CompletionQueue);
  165. }
  166. for (int i = 0; i < num_async_threads_; i++) {
  167. cq_.emplace_back(i % cli_cqs_.size());
  168. next_issuers_.emplace_back(NextIssuer(i));
  169. shutdown_state_.emplace_back(new PerThreadShutdownState());
  170. }
  171. int t = 0;
  172. for (int ch = 0; ch < config.client_channels(); ch++) {
  173. for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
  174. auto* cq = cli_cqs_[t].get();
  175. auto ctx =
  176. setup_ctx(channels_[ch].get_stub(), next_issuers_[t], request_);
  177. ctx->Start(cq, config);
  178. }
  179. t = (t + 1) % cli_cqs_.size();
  180. }
  181. }
  182. ~AsyncClient() override {
  183. for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
  184. void* got_tag;
  185. bool ok;
  186. while ((*cq)->Next(&got_tag, &ok)) {
  187. delete ClientRpcContext::detag(got_tag);
  188. }
  189. }
  190. }
  191. int GetPollCount() override {
  192. int count = 0;
  193. for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
  194. count += grpc_get_cq_poll_num((*cq)->cq());
  195. }
  196. return count;
  197. }
  198. protected:
  199. const int num_async_threads_;
  200. private:
  201. struct PerThreadShutdownState {
  202. mutable std::mutex mutex;
  203. bool shutdown;
  204. PerThreadShutdownState() : shutdown(false) {}
  205. };
  206. int NumThreads(const ClientConfig& config) {
  207. int num_threads = config.async_client_threads();
  208. if (num_threads <= 0) { // Use dynamic sizing
  209. num_threads = cores_;
  210. gpr_log(GPR_INFO, "Sizing async client to %d threads", num_threads);
  211. }
  212. return num_threads;
  213. }
  214. void DestroyMultithreading() final {
  215. for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
  216. std::lock_guard<std::mutex> lock((*ss)->mutex);
  217. (*ss)->shutdown = true;
  218. }
  219. for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
  220. (*cq)->Shutdown();
  221. }
  222. this->EndThreads(); // this needed for resolution
  223. }
  224. ClientRpcContext* ProcessTag(size_t thread_idx, void* tag) {
  225. ClientRpcContext* ctx = ClientRpcContext::detag(tag);
  226. if (shutdown_state_[thread_idx]->shutdown) {
  227. ctx->TryCancel();
  228. delete ctx;
  229. bool ok;
  230. while (cli_cqs_[cq_[thread_idx]]->Next(&tag, &ok)) {
  231. ctx = ClientRpcContext::detag(tag);
  232. ctx->TryCancel();
  233. delete ctx;
  234. }
  235. return nullptr;
  236. }
  237. return ctx;
  238. }
  239. void ThreadFunc(size_t thread_idx, Client::Thread* t) final {
  240. void* got_tag;
  241. bool ok;
  242. HistogramEntry entry;
  243. HistogramEntry* entry_ptr = &entry;
  244. if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
  245. return;
  246. }
  247. std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
  248. shutdown_mu->lock();
  249. ClientRpcContext* ctx = ProcessTag(thread_idx, got_tag);
  250. if (ctx == nullptr) {
  251. shutdown_mu->unlock();
  252. return;
  253. }
  254. while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
  255. [&, ctx, ok, entry_ptr, shutdown_mu]() {
  256. if (!ctx->RunNextState(ok, entry_ptr)) {
  257. // The RPC and callback are done, so clone the ctx
  258. // and kickstart the new one
  259. ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
  260. delete ctx;
  261. }
  262. shutdown_mu->unlock();
  263. },
  264. &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
  265. t->UpdateHistogram(entry_ptr);
  266. entry = HistogramEntry();
  267. shutdown_mu->lock();
  268. ctx = ProcessTag(thread_idx, got_tag);
  269. if (ctx == nullptr) {
  270. shutdown_mu->unlock();
  271. return;
  272. }
  273. }
  274. }
  275. std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
  276. std::vector<int> cq_;
  277. std::vector<std::function<gpr_timespec()>> next_issuers_;
  278. std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
  279. };
  280. static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
  281. const std::shared_ptr<Channel>& ch) {
  282. return BenchmarkService::NewStub(ch);
  283. }
  284. class AsyncUnaryClient final
  285. : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  286. public:
  287. explicit AsyncUnaryClient(const ClientConfig& config)
  288. : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
  289. config, SetupCtx, BenchmarkStubCreator) {
  290. StartThreads(num_async_threads_);
  291. }
  292. ~AsyncUnaryClient() override {}
  293. private:
  294. static void CheckDone(const grpc::Status& s, SimpleResponse* /*response*/,
  295. HistogramEntry* entry) {
  296. entry->set_status(s.error_code());
  297. }
  298. static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
  299. PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
  300. const SimpleRequest& request, CompletionQueue* cq) {
  301. return stub->PrepareAsyncUnaryCall(ctx, request, cq);
  302. };
  303. static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
  304. std::function<gpr_timespec()> next_issue,
  305. const SimpleRequest& req) {
  306. return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
  307. stub, req, std::move(next_issue), AsyncUnaryClient::PrepareReq,
  308. AsyncUnaryClient::CheckDone);
  309. }
  310. };
  311. template <class RequestType, class ResponseType>
  312. class ClientRpcContextStreamingPingPongImpl : public ClientRpcContext {
  313. public:
  314. ClientRpcContextStreamingPingPongImpl(
  315. BenchmarkService::Stub* stub, const RequestType& req,
  316. std::function<gpr_timespec()> next_issue,
  317. std::function<std::unique_ptr<
  318. grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
  319. BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
  320. prepare_req,
  321. std::function<void(grpc::Status, ResponseType*)> on_done)
  322. : context_(),
  323. stub_(stub),
  324. cq_(nullptr),
  325. req_(req),
  326. response_(),
  327. next_state_(State::INVALID),
  328. callback_(on_done),
  329. next_issue_(std::move(next_issue)),
  330. prepare_req_(prepare_req),
  331. coalesce_(false) {}
  332. ~ClientRpcContextStreamingPingPongImpl() override {}
  333. void Start(CompletionQueue* cq, const ClientConfig& config) override {
  334. StartInternal(cq, config.messages_per_stream(), config.use_coalesce_api());
  335. }
  336. bool RunNextState(bool ok, HistogramEntry* entry) override {
  337. while (true) {
  338. switch (next_state_) {
  339. case State::STREAM_IDLE:
  340. if (!next_issue_) { // ready to issue
  341. next_state_ = State::READY_TO_WRITE;
  342. } else {
  343. next_state_ = State::WAIT;
  344. }
  345. break; // loop around, don't return
  346. case State::WAIT:
  347. next_state_ = State::READY_TO_WRITE;
  348. alarm_ = absl::make_unique<Alarm>();
  349. alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
  350. return true;
  351. case State::READY_TO_WRITE:
  352. if (!ok) {
  353. return false;
  354. }
  355. start_ = UsageTimer::Now();
  356. next_state_ = State::WRITE_DONE;
  357. if (coalesce_ && messages_issued_ == messages_per_stream_ - 1) {
  358. stream_->WriteLast(req_, WriteOptions(),
  359. ClientRpcContext::tag(this));
  360. } else {
  361. stream_->Write(req_, ClientRpcContext::tag(this));
  362. }
  363. return true;
  364. case State::WRITE_DONE:
  365. if (!ok) {
  366. return false;
  367. }
  368. next_state_ = State::READ_DONE;
  369. stream_->Read(&response_, ClientRpcContext::tag(this));
  370. return true;
  371. break;
  372. case State::READ_DONE:
  373. entry->set_value((UsageTimer::Now() - start_) * 1e9);
  374. callback_(status_, &response_);
  375. if ((messages_per_stream_ != 0) &&
  376. (++messages_issued_ >= messages_per_stream_)) {
  377. next_state_ = State::WRITES_DONE_DONE;
  378. if (coalesce_) {
  379. // WritesDone should have been called on the last Write.
  380. // loop around to call Finish.
  381. break;
  382. }
  383. stream_->WritesDone(ClientRpcContext::tag(this));
  384. return true;
  385. }
  386. next_state_ = State::STREAM_IDLE;
  387. break; // loop around
  388. case State::WRITES_DONE_DONE:
  389. next_state_ = State::FINISH_DONE;
  390. stream_->Finish(&status_, ClientRpcContext::tag(this));
  391. return true;
  392. case State::FINISH_DONE:
  393. next_state_ = State::INVALID;
  394. return false;
  395. break;
  396. default:
  397. GPR_ASSERT(false);
  398. return false;
  399. }
  400. }
  401. }
  402. void StartNewClone(CompletionQueue* cq) override {
  403. auto* clone = new ClientRpcContextStreamingPingPongImpl(
  404. stub_, req_, next_issue_, prepare_req_, callback_);
  405. clone->StartInternal(cq, messages_per_stream_, coalesce_);
  406. }
  407. void TryCancel() override { context_.TryCancel(); }
  408. private:
  409. grpc::ClientContext context_;
  410. BenchmarkService::Stub* stub_;
  411. CompletionQueue* cq_;
  412. std::unique_ptr<Alarm> alarm_;
  413. const RequestType& req_;
  414. ResponseType response_;
  415. enum State {
  416. INVALID,
  417. STREAM_IDLE,
  418. WAIT,
  419. READY_TO_WRITE,
  420. WRITE_DONE,
  421. READ_DONE,
  422. WRITES_DONE_DONE,
  423. FINISH_DONE
  424. };
  425. State next_state_;
  426. std::function<void(grpc::Status, ResponseType*)> callback_;
  427. std::function<gpr_timespec()> next_issue_;
  428. std::function<
  429. std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
  430. BenchmarkService::Stub*, grpc::ClientContext*, CompletionQueue*)>
  431. prepare_req_;
  432. grpc::Status status_;
  433. double start_;
  434. std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
  435. stream_;
  436. // Allow a limit on number of messages in a stream
  437. int messages_per_stream_;
  438. int messages_issued_;
  439. // Whether to use coalescing API.
  440. bool coalesce_;
  441. void StartInternal(CompletionQueue* cq, int messages_per_stream,
  442. bool coalesce) {
  443. cq_ = cq;
  444. messages_per_stream_ = messages_per_stream;
  445. messages_issued_ = 0;
  446. coalesce_ = coalesce;
  447. if (coalesce_) {
  448. GPR_ASSERT(messages_per_stream_ != 0);
  449. context_.set_initial_metadata_corked(true);
  450. }
  451. stream_ = prepare_req_(stub_, &context_, cq);
  452. next_state_ = State::STREAM_IDLE;
  453. stream_->StartCall(ClientRpcContext::tag(this));
  454. if (coalesce_) {
  455. // When the initial metadata is corked, the tag will not come back and we
  456. // need to manually drive the state machine.
  457. RunNextState(true, nullptr);
  458. }
  459. }
  460. };
  461. class AsyncStreamingPingPongClient final
  462. : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  463. public:
  464. explicit AsyncStreamingPingPongClient(const ClientConfig& config)
  465. : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
  466. config, SetupCtx, BenchmarkStubCreator) {
  467. StartThreads(num_async_threads_);
  468. }
  469. ~AsyncStreamingPingPongClient() override {}
  470. private:
  471. static void CheckDone(const grpc::Status& /*s*/,
  472. SimpleResponse* /*response*/) {}
  473. static std::unique_ptr<
  474. grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
  475. PrepareReq(BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
  476. CompletionQueue* cq) {
  477. auto stream = stub->PrepareAsyncStreamingCall(ctx, cq);
  478. return stream;
  479. };
  480. static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
  481. std::function<gpr_timespec()> next_issue,
  482. const SimpleRequest& req) {
  483. return new ClientRpcContextStreamingPingPongImpl<SimpleRequest,
  484. SimpleResponse>(
  485. stub, req, std::move(next_issue),
  486. AsyncStreamingPingPongClient::PrepareReq,
  487. AsyncStreamingPingPongClient::CheckDone);
  488. }
  489. };
  490. template <class RequestType, class ResponseType>
  491. class ClientRpcContextStreamingFromClientImpl : public ClientRpcContext {
  492. public:
  493. ClientRpcContextStreamingFromClientImpl(
  494. BenchmarkService::Stub* stub, const RequestType& req,
  495. std::function<gpr_timespec()> next_issue,
  496. std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
  497. BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
  498. CompletionQueue*)>
  499. prepare_req,
  500. std::function<void(grpc::Status, ResponseType*)> on_done)
  501. : context_(),
  502. stub_(stub),
  503. cq_(nullptr),
  504. req_(req),
  505. response_(),
  506. next_state_(State::INVALID),
  507. callback_(on_done),
  508. next_issue_(std::move(next_issue)),
  509. prepare_req_(prepare_req) {}
  510. ~ClientRpcContextStreamingFromClientImpl() override {}
  511. void Start(CompletionQueue* cq, const ClientConfig& config) override {
  512. GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
  513. StartInternal(cq);
  514. }
  515. bool RunNextState(bool ok, HistogramEntry* entry) override {
  516. while (true) {
  517. switch (next_state_) {
  518. case State::STREAM_IDLE:
  519. if (!next_issue_) { // ready to issue
  520. next_state_ = State::READY_TO_WRITE;
  521. } else {
  522. next_state_ = State::WAIT;
  523. }
  524. break; // loop around, don't return
  525. case State::WAIT:
  526. alarm_ = absl::make_unique<Alarm>();
  527. alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
  528. next_state_ = State::READY_TO_WRITE;
  529. return true;
  530. case State::READY_TO_WRITE:
  531. if (!ok) {
  532. return false;
  533. }
  534. start_ = UsageTimer::Now();
  535. next_state_ = State::WRITE_DONE;
  536. stream_->Write(req_, ClientRpcContext::tag(this));
  537. return true;
  538. case State::WRITE_DONE:
  539. if (!ok) {
  540. return false;
  541. }
  542. entry->set_value((UsageTimer::Now() - start_) * 1e9);
  543. next_state_ = State::STREAM_IDLE;
  544. break; // loop around
  545. default:
  546. GPR_ASSERT(false);
  547. return false;
  548. }
  549. }
  550. }
  551. void StartNewClone(CompletionQueue* cq) override {
  552. auto* clone = new ClientRpcContextStreamingFromClientImpl(
  553. stub_, req_, next_issue_, prepare_req_, callback_);
  554. clone->StartInternal(cq);
  555. }
  556. void TryCancel() override { context_.TryCancel(); }
  557. private:
  558. grpc::ClientContext context_;
  559. BenchmarkService::Stub* stub_;
  560. CompletionQueue* cq_;
  561. std::unique_ptr<Alarm> alarm_;
  562. const RequestType& req_;
  563. ResponseType response_;
  564. enum State {
  565. INVALID,
  566. STREAM_IDLE,
  567. WAIT,
  568. READY_TO_WRITE,
  569. WRITE_DONE,
  570. };
  571. State next_state_;
  572. std::function<void(grpc::Status, ResponseType*)> callback_;
  573. std::function<gpr_timespec()> next_issue_;
  574. std::function<std::unique_ptr<grpc::ClientAsyncWriter<RequestType>>(
  575. BenchmarkService::Stub*, grpc::ClientContext*, ResponseType*,
  576. CompletionQueue*)>
  577. prepare_req_;
  578. grpc::Status status_;
  579. double start_;
  580. std::unique_ptr<grpc::ClientAsyncWriter<RequestType>> stream_;
  581. void StartInternal(CompletionQueue* cq) {
  582. cq_ = cq;
  583. stream_ = prepare_req_(stub_, &context_, &response_, cq);
  584. next_state_ = State::STREAM_IDLE;
  585. stream_->StartCall(ClientRpcContext::tag(this));
  586. }
  587. };
  588. class AsyncStreamingFromClientClient final
  589. : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  590. public:
  591. explicit AsyncStreamingFromClientClient(const ClientConfig& config)
  592. : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
  593. config, SetupCtx, BenchmarkStubCreator) {
  594. StartThreads(num_async_threads_);
  595. }
  596. ~AsyncStreamingFromClientClient() override {}
  597. private:
  598. static void CheckDone(const grpc::Status& /*s*/,
  599. SimpleResponse* /*response*/) {}
  600. static std::unique_ptr<grpc::ClientAsyncWriter<SimpleRequest>> PrepareReq(
  601. BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
  602. SimpleResponse* resp, CompletionQueue* cq) {
  603. auto stream = stub->PrepareAsyncStreamingFromClient(ctx, resp, cq);
  604. return stream;
  605. };
  606. static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
  607. std::function<gpr_timespec()> next_issue,
  608. const SimpleRequest& req) {
  609. return new ClientRpcContextStreamingFromClientImpl<SimpleRequest,
  610. SimpleResponse>(
  611. stub, req, std::move(next_issue),
  612. AsyncStreamingFromClientClient::PrepareReq,
  613. AsyncStreamingFromClientClient::CheckDone);
  614. }
  615. };
  616. template <class RequestType, class ResponseType>
  617. class ClientRpcContextStreamingFromServerImpl : public ClientRpcContext {
  618. public:
  619. ClientRpcContextStreamingFromServerImpl(
  620. BenchmarkService::Stub* stub, const RequestType& req,
  621. std::function<gpr_timespec()> next_issue,
  622. std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
  623. BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
  624. CompletionQueue*)>
  625. prepare_req,
  626. std::function<void(grpc::Status, ResponseType*)> on_done)
  627. : context_(),
  628. stub_(stub),
  629. cq_(nullptr),
  630. req_(req),
  631. response_(),
  632. next_state_(State::INVALID),
  633. callback_(on_done),
  634. next_issue_(std::move(next_issue)),
  635. prepare_req_(prepare_req) {}
  636. ~ClientRpcContextStreamingFromServerImpl() override {}
  637. void Start(CompletionQueue* cq, const ClientConfig& config) override {
  638. GPR_ASSERT(!config.use_coalesce_api()); // not supported
  639. StartInternal(cq);
  640. }
  641. bool RunNextState(bool ok, HistogramEntry* entry) override {
  642. while (true) {
  643. switch (next_state_) {
  644. case State::STREAM_IDLE:
  645. if (!ok) {
  646. return false;
  647. }
  648. start_ = UsageTimer::Now();
  649. next_state_ = State::READ_DONE;
  650. stream_->Read(&response_, ClientRpcContext::tag(this));
  651. return true;
  652. case State::READ_DONE:
  653. if (!ok) {
  654. return false;
  655. }
  656. entry->set_value((UsageTimer::Now() - start_) * 1e9);
  657. callback_(status_, &response_);
  658. next_state_ = State::STREAM_IDLE;
  659. break; // loop around
  660. default:
  661. GPR_ASSERT(false);
  662. return false;
  663. }
  664. }
  665. }
  666. void StartNewClone(CompletionQueue* cq) override {
  667. auto* clone = new ClientRpcContextStreamingFromServerImpl(
  668. stub_, req_, next_issue_, prepare_req_, callback_);
  669. clone->StartInternal(cq);
  670. }
  671. void TryCancel() override { context_.TryCancel(); }
  672. private:
  673. grpc::ClientContext context_;
  674. BenchmarkService::Stub* stub_;
  675. CompletionQueue* cq_;
  676. std::unique_ptr<Alarm> alarm_;
  677. const RequestType& req_;
  678. ResponseType response_;
  679. enum State { INVALID, STREAM_IDLE, READ_DONE };
  680. State next_state_;
  681. std::function<void(grpc::Status, ResponseType*)> callback_;
  682. std::function<gpr_timespec()> next_issue_;
  683. std::function<std::unique_ptr<grpc::ClientAsyncReader<ResponseType>>(
  684. BenchmarkService::Stub*, grpc::ClientContext*, const RequestType&,
  685. CompletionQueue*)>
  686. prepare_req_;
  687. grpc::Status status_;
  688. double start_;
  689. std::unique_ptr<grpc::ClientAsyncReader<ResponseType>> stream_;
  690. void StartInternal(CompletionQueue* cq) {
  691. // TODO(vjpai): Add support to rate-pace this
  692. cq_ = cq;
  693. stream_ = prepare_req_(stub_, &context_, req_, cq);
  694. next_state_ = State::STREAM_IDLE;
  695. stream_->StartCall(ClientRpcContext::tag(this));
  696. }
  697. };
  698. class AsyncStreamingFromServerClient final
  699. : public AsyncClient<BenchmarkService::Stub, SimpleRequest> {
  700. public:
  701. explicit AsyncStreamingFromServerClient(const ClientConfig& config)
  702. : AsyncClient<BenchmarkService::Stub, SimpleRequest>(
  703. config, SetupCtx, BenchmarkStubCreator) {
  704. StartThreads(num_async_threads_);
  705. }
  706. ~AsyncStreamingFromServerClient() override {}
  707. private:
  708. static void CheckDone(const grpc::Status& /*s*/,
  709. SimpleResponse* /*response*/) {}
  710. static std::unique_ptr<grpc::ClientAsyncReader<SimpleResponse>> PrepareReq(
  711. BenchmarkService::Stub* stub, grpc::ClientContext* ctx,
  712. const SimpleRequest& req, CompletionQueue* cq) {
  713. auto stream = stub->PrepareAsyncStreamingFromServer(ctx, req, cq);
  714. return stream;
  715. };
  716. static ClientRpcContext* SetupCtx(BenchmarkService::Stub* stub,
  717. std::function<gpr_timespec()> next_issue,
  718. const SimpleRequest& req) {
  719. return new ClientRpcContextStreamingFromServerImpl<SimpleRequest,
  720. SimpleResponse>(
  721. stub, req, std::move(next_issue),
  722. AsyncStreamingFromServerClient::PrepareReq,
  723. AsyncStreamingFromServerClient::CheckDone);
  724. }
  725. };
  726. class ClientRpcContextGenericStreamingImpl : public ClientRpcContext {
  727. public:
  728. ClientRpcContextGenericStreamingImpl(
  729. grpc::GenericStub* stub, const ByteBuffer& req,
  730. std::function<gpr_timespec()> next_issue,
  731. std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
  732. grpc::GenericStub*, grpc::ClientContext*,
  733. const std::string& method_name, CompletionQueue*)>
  734. prepare_req,
  735. std::function<void(grpc::Status, ByteBuffer*)> on_done)
  736. : context_(),
  737. stub_(stub),
  738. cq_(nullptr),
  739. req_(req),
  740. response_(),
  741. next_state_(State::INVALID),
  742. callback_(std::move(on_done)),
  743. next_issue_(std::move(next_issue)),
  744. prepare_req_(std::move(prepare_req)) {}
  745. ~ClientRpcContextGenericStreamingImpl() override {}
  746. void Start(CompletionQueue* cq, const ClientConfig& config) override {
  747. GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
  748. StartInternal(cq, config.messages_per_stream());
  749. }
  750. bool RunNextState(bool ok, HistogramEntry* entry) override {
  751. while (true) {
  752. switch (next_state_) {
  753. case State::STREAM_IDLE:
  754. if (!next_issue_) { // ready to issue
  755. next_state_ = State::READY_TO_WRITE;
  756. } else {
  757. next_state_ = State::WAIT;
  758. }
  759. break; // loop around, don't return
  760. case State::WAIT:
  761. next_state_ = State::READY_TO_WRITE;
  762. alarm_ = absl::make_unique<Alarm>();
  763. alarm_->Set(cq_, next_issue_(), ClientRpcContext::tag(this));
  764. return true;
  765. case State::READY_TO_WRITE:
  766. if (!ok) {
  767. return false;
  768. }
  769. start_ = UsageTimer::Now();
  770. next_state_ = State::WRITE_DONE;
  771. stream_->Write(req_, ClientRpcContext::tag(this));
  772. return true;
  773. case State::WRITE_DONE:
  774. if (!ok) {
  775. return false;
  776. }
  777. next_state_ = State::READ_DONE;
  778. stream_->Read(&response_, ClientRpcContext::tag(this));
  779. return true;
  780. case State::READ_DONE:
  781. entry->set_value((UsageTimer::Now() - start_) * 1e9);
  782. callback_(status_, &response_);
  783. if ((messages_per_stream_ != 0) &&
  784. (++messages_issued_ >= messages_per_stream_)) {
  785. next_state_ = State::WRITES_DONE_DONE;
  786. stream_->WritesDone(ClientRpcContext::tag(this));
  787. return true;
  788. }
  789. next_state_ = State::STREAM_IDLE;
  790. break; // loop around
  791. case State::WRITES_DONE_DONE:
  792. next_state_ = State::FINISH_DONE;
  793. stream_->Finish(&status_, ClientRpcContext::tag(this));
  794. return true;
  795. case State::FINISH_DONE:
  796. next_state_ = State::INVALID;
  797. return false;
  798. default:
  799. GPR_ASSERT(false);
  800. return false;
  801. }
  802. }
  803. }
  804. void StartNewClone(CompletionQueue* cq) override {
  805. auto* clone = new ClientRpcContextGenericStreamingImpl(
  806. stub_, req_, next_issue_, prepare_req_, callback_);
  807. clone->StartInternal(cq, messages_per_stream_);
  808. }
  809. void TryCancel() override { context_.TryCancel(); }
  810. private:
  811. grpc::ClientContext context_;
  812. grpc::GenericStub* stub_;
  813. CompletionQueue* cq_;
  814. std::unique_ptr<Alarm> alarm_;
  815. ByteBuffer req_;
  816. ByteBuffer response_;
  817. enum State {
  818. INVALID,
  819. STREAM_IDLE,
  820. WAIT,
  821. READY_TO_WRITE,
  822. WRITE_DONE,
  823. READ_DONE,
  824. WRITES_DONE_DONE,
  825. FINISH_DONE
  826. };
  827. State next_state_;
  828. std::function<void(grpc::Status, ByteBuffer*)> callback_;
  829. std::function<gpr_timespec()> next_issue_;
  830. std::function<std::unique_ptr<grpc::GenericClientAsyncReaderWriter>(
  831. grpc::GenericStub*, grpc::ClientContext*, const std::string&,
  832. CompletionQueue*)>
  833. prepare_req_;
  834. grpc::Status status_;
  835. double start_;
  836. std::unique_ptr<grpc::GenericClientAsyncReaderWriter> stream_;
  837. // Allow a limit on number of messages in a stream
  838. int messages_per_stream_;
  839. int messages_issued_;
  840. void StartInternal(CompletionQueue* cq, int messages_per_stream) {
  841. cq_ = cq;
  842. const std::string kMethodName(
  843. "/grpc.testing.BenchmarkService/StreamingCall");
  844. messages_per_stream_ = messages_per_stream;
  845. messages_issued_ = 0;
  846. stream_ = prepare_req_(stub_, &context_, kMethodName, cq);
  847. next_state_ = State::STREAM_IDLE;
  848. stream_->StartCall(ClientRpcContext::tag(this));
  849. }
  850. };
  851. static std::unique_ptr<grpc::GenericStub> GenericStubCreator(
  852. const std::shared_ptr<Channel>& ch) {
  853. return absl::make_unique<grpc::GenericStub>(ch);
  854. }
  855. class GenericAsyncStreamingClient final
  856. : public AsyncClient<grpc::GenericStub, ByteBuffer> {
  857. public:
  858. explicit GenericAsyncStreamingClient(const ClientConfig& config)
  859. : AsyncClient<grpc::GenericStub, ByteBuffer>(config, SetupCtx,
  860. GenericStubCreator) {
  861. StartThreads(num_async_threads_);
  862. }
  863. ~GenericAsyncStreamingClient() override {}
  864. private:
  865. static void CheckDone(const grpc::Status& /*s*/, ByteBuffer* /*response*/) {}
  866. static std::unique_ptr<grpc::GenericClientAsyncReaderWriter> PrepareReq(
  867. grpc::GenericStub* stub, grpc::ClientContext* ctx,
  868. const std::string& method_name, CompletionQueue* cq) {
  869. auto stream = stub->PrepareCall(ctx, method_name, cq);
  870. return stream;
  871. };
  872. static ClientRpcContext* SetupCtx(grpc::GenericStub* stub,
  873. std::function<gpr_timespec()> next_issue,
  874. const ByteBuffer& req) {
  875. return new ClientRpcContextGenericStreamingImpl(
  876. stub, req, std::move(next_issue),
  877. GenericAsyncStreamingClient::PrepareReq,
  878. GenericAsyncStreamingClient::CheckDone);
  879. }
  880. };
  881. std::unique_ptr<Client> CreateAsyncClient(const ClientConfig& config) {
  882. switch (config.rpc_type()) {
  883. case UNARY:
  884. return std::unique_ptr<Client>(new AsyncUnaryClient(config));
  885. case STREAMING:
  886. return std::unique_ptr<Client>(new AsyncStreamingPingPongClient(config));
  887. case STREAMING_FROM_CLIENT:
  888. return std::unique_ptr<Client>(
  889. new AsyncStreamingFromClientClient(config));
  890. case STREAMING_FROM_SERVER:
  891. return std::unique_ptr<Client>(
  892. new AsyncStreamingFromServerClient(config));
  893. case STREAMING_BOTH_WAYS:
  894. // TODO(vjpai): Implement this
  895. assert(false);
  896. return nullptr;
  897. default:
  898. assert(false);
  899. return nullptr;
  900. }
  901. }
  902. std::unique_ptr<Client> CreateGenericAsyncStreamingClient(
  903. const ClientConfig& config) {
  904. return std::unique_ptr<Client>(new GenericAsyncStreamingClient(config));
  905. }
  906. } // namespace testing
  907. } // namespace grpc