server_async.cc 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <algorithm>
  19. #include <forward_list>
  20. #include <functional>
  21. #include <memory>
  22. #include <mutex>
  23. #include <thread>
  24. #include <grpc/grpc.h>
  25. #include <grpc/support/alloc.h>
  26. #include <grpc/support/log.h>
  27. #include <grpcpp/generic/async_generic_service.h>
  28. #include <grpcpp/resource_quota.h>
  29. #include <grpcpp/security/server_credentials.h>
  30. #include <grpcpp/server.h>
  31. #include <grpcpp/server_builder.h>
  32. #include <grpcpp/server_context.h>
  33. #include <grpcpp/support/config.h>
  34. #include "src/core/lib/gprpp/host_port.h"
  35. #include "src/core/lib/surface/completion_queue.h"
  36. #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
  37. #include "test/core/util/test_config.h"
  38. #include "test/cpp/qps/qps_server_builder.h"
  39. #include "test/cpp/qps/server.h"
  40. namespace grpc {
  41. namespace testing {
  42. template <class RequestType, class ResponseType, class ServiceType,
  43. class ServerContextType>
  44. class AsyncQpsServerTest final : public grpc::testing::Server {
  45. public:
  46. AsyncQpsServerTest(
  47. const ServerConfig& config,
  48. std::function<void(ServerBuilder*, ServiceType*)> register_service,
  49. std::function<void(ServiceType*, ServerContextType*, RequestType*,
  50. ServerAsyncResponseWriter<ResponseType>*,
  51. CompletionQueue*, ServerCompletionQueue*, void*)>
  52. request_unary_function,
  53. std::function<void(ServiceType*, ServerContextType*,
  54. ServerAsyncReaderWriter<ResponseType, RequestType>*,
  55. CompletionQueue*, ServerCompletionQueue*, void*)>
  56. request_streaming_function,
  57. std::function<void(ServiceType*, ServerContextType*,
  58. ServerAsyncReader<ResponseType, RequestType>*,
  59. CompletionQueue*, ServerCompletionQueue*, void*)>
  60. request_streaming_from_client_function,
  61. std::function<void(ServiceType*, ServerContextType*, RequestType*,
  62. ServerAsyncWriter<ResponseType>*, CompletionQueue*,
  63. ServerCompletionQueue*, void*)>
  64. request_streaming_from_server_function,
  65. std::function<void(ServiceType*, ServerContextType*,
  66. ServerAsyncReaderWriter<ResponseType, RequestType>*,
  67. CompletionQueue*, ServerCompletionQueue*, void*)>
  68. request_streaming_both_ways_function,
  69. std::function<grpc::Status(const PayloadConfig&, RequestType*,
  70. ResponseType*)>
  71. process_rpc)
  72. : Server(config) {
  73. std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
  74. auto port_num = port();
  75. // Negative port number means inproc server, so no listen port needed
  76. if (port_num >= 0) {
  77. std::string server_address = grpc_core::JoinHostPort("::", port_num);
  78. builder->AddListeningPort(server_address.c_str(),
  79. Server::CreateServerCredentials(config),
  80. &port_num);
  81. }
  82. register_service(builder.get(), &async_service_);
  83. int num_threads = config.async_server_threads();
  84. if (num_threads <= 0) { // dynamic sizing
  85. num_threads = std::min(64, cores());
  86. gpr_log(GPR_INFO,
  87. "Sizing async server to %d threads. Defaults to number of cores "
  88. "in machine or 64 threads if machine has more than 64 cores to "
  89. "avoid OOMs.",
  90. num_threads);
  91. }
  92. int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
  93. int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
  94. for (int i = 0; i < num_cqs; i++) {
  95. srv_cqs_.emplace_back(builder->AddCompletionQueue());
  96. }
  97. for (int i = 0; i < num_threads; i++) {
  98. cq_.emplace_back(i % srv_cqs_.size());
  99. }
  100. ApplyConfigToBuilder(config, builder.get());
  101. server_ = builder->BuildAndStart();
  102. if (server_ == nullptr) {
  103. gpr_log(GPR_ERROR, "Server: Fail to BuildAndStart(port=%d)", port_num);
  104. } else {
  105. gpr_log(GPR_INFO, "Server: BuildAndStart(port=%d)", port_num);
  106. }
  107. auto process_rpc_bound =
  108. std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
  109. std::placeholders::_2);
  110. for (int i = 0; i < 5000; i++) {
  111. for (int j = 0; j < num_cqs; j++) {
  112. if (request_unary_function) {
  113. auto request_unary = std::bind(
  114. request_unary_function, &async_service_, std::placeholders::_1,
  115. std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
  116. srv_cqs_[j].get(), std::placeholders::_4);
  117. contexts_.emplace_back(
  118. new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
  119. }
  120. if (request_streaming_function) {
  121. auto request_streaming = std::bind(
  122. request_streaming_function, &async_service_,
  123. std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
  124. srv_cqs_[j].get(), std::placeholders::_3);
  125. contexts_.emplace_back(new ServerRpcContextStreamingImpl(
  126. request_streaming, process_rpc_bound));
  127. }
  128. if (request_streaming_from_client_function) {
  129. auto request_streaming_from_client = std::bind(
  130. request_streaming_from_client_function, &async_service_,
  131. std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
  132. srv_cqs_[j].get(), std::placeholders::_3);
  133. contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
  134. request_streaming_from_client, process_rpc_bound));
  135. }
  136. if (request_streaming_from_server_function) {
  137. auto request_streaming_from_server =
  138. std::bind(request_streaming_from_server_function, &async_service_,
  139. std::placeholders::_1, std::placeholders::_2,
  140. std::placeholders::_3, srv_cqs_[j].get(),
  141. srv_cqs_[j].get(), std::placeholders::_4);
  142. contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
  143. request_streaming_from_server, process_rpc_bound));
  144. }
  145. if (request_streaming_both_ways_function) {
  146. // TODO(vjpai): Add this code
  147. }
  148. }
  149. }
  150. for (int i = 0; i < num_threads; i++) {
  151. shutdown_state_.emplace_back(new PerThreadShutdownState());
  152. threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
  153. }
  154. }
  155. ~AsyncQpsServerTest() override {
  156. for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
  157. std::lock_guard<std::mutex> lock((*ss)->mutex);
  158. (*ss)->shutdown = true;
  159. }
  160. // TODO(vjpai): Remove the following deadline and allow full proper
  161. // shutdown.
  162. server_->Shutdown(std::chrono::system_clock::now() +
  163. std::chrono::seconds(3));
  164. for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
  165. (*cq)->Shutdown();
  166. }
  167. for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
  168. thr->join();
  169. }
  170. for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
  171. bool ok;
  172. void* got_tag;
  173. while ((*cq)->Next(&got_tag, &ok)) {
  174. }
  175. }
  176. }
  177. int GetPollCount() override {
  178. int count = 0;
  179. for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
  180. count += grpc_get_cq_poll_num((*cq)->cq());
  181. }
  182. return count;
  183. }
  184. std::shared_ptr<Channel> InProcessChannel(
  185. const ChannelArguments& args) override {
  186. return server_->InProcessChannel(args);
  187. }
  188. private:
  189. void ThreadFunc(int thread_idx) {
  190. // Wait until work is available or we are shutting down
  191. bool ok;
  192. void* got_tag;
  193. if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
  194. return;
  195. }
  196. ServerRpcContext* ctx;
  197. std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
  198. do {
  199. ctx = detag(got_tag);
  200. // The tag is a pointer to an RPC context to invoke
  201. // Proceed while holding a lock to make sure that
  202. // this thread isn't supposed to shut down
  203. mu_ptr->lock();
  204. if (shutdown_state_[thread_idx]->shutdown) {
  205. mu_ptr->unlock();
  206. return;
  207. }
  208. } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
  209. [&, ctx, ok, mu_ptr]() {
  210. ctx->lock();
  211. if (!ctx->RunNextState(ok)) {
  212. ctx->Reset();
  213. }
  214. ctx->unlock();
  215. mu_ptr->unlock();
  216. },
  217. &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
  218. }
  219. class ServerRpcContext {
  220. public:
  221. ServerRpcContext() {}
  222. void lock() { mu_.lock(); }
  223. void unlock() { mu_.unlock(); }
  224. virtual ~ServerRpcContext(){};
  225. virtual bool RunNextState(bool) = 0; // next state, return false if done
  226. virtual void Reset() = 0; // start this back at a clean state
  227. private:
  228. std::mutex mu_;
  229. };
  230. static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
  231. static ServerRpcContext* detag(void* tag) {
  232. return static_cast<ServerRpcContext*>(tag);
  233. }
  234. class ServerRpcContextUnaryImpl final : public ServerRpcContext {
  235. public:
  236. ServerRpcContextUnaryImpl(
  237. std::function<void(ServerContextType*, RequestType*,
  238. grpc::ServerAsyncResponseWriter<ResponseType>*,
  239. void*)>
  240. request_method,
  241. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
  242. : srv_ctx_(new ServerContextType),
  243. next_state_(&ServerRpcContextUnaryImpl::invoker),
  244. request_method_(request_method),
  245. invoke_method_(invoke_method),
  246. response_writer_(srv_ctx_.get()) {
  247. request_method_(srv_ctx_.get(), &req_, &response_writer_,
  248. AsyncQpsServerTest::tag(this));
  249. }
  250. ~ServerRpcContextUnaryImpl() override {}
  251. bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
  252. void Reset() override {
  253. srv_ctx_.reset(new ServerContextType);
  254. req_ = RequestType();
  255. response_writer_ =
  256. grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
  257. // Then request the method
  258. next_state_ = &ServerRpcContextUnaryImpl::invoker;
  259. request_method_(srv_ctx_.get(), &req_, &response_writer_,
  260. AsyncQpsServerTest::tag(this));
  261. }
  262. private:
  263. bool finisher(bool) { return false; }
  264. bool invoker(bool ok) {
  265. if (!ok) {
  266. return false;
  267. }
  268. // Call the RPC processing function
  269. grpc::Status status = invoke_method_(&req_, &response_);
  270. // Have the response writer work and invoke on_finish when done
  271. next_state_ = &ServerRpcContextUnaryImpl::finisher;
  272. response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
  273. return true;
  274. }
  275. std::unique_ptr<ServerContextType> srv_ctx_;
  276. RequestType req_;
  277. ResponseType response_;
  278. bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
  279. std::function<void(ServerContextType*, RequestType*,
  280. grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
  281. request_method_;
  282. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
  283. grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
  284. };
  285. class ServerRpcContextStreamingImpl final : public ServerRpcContext {
  286. public:
  287. ServerRpcContextStreamingImpl(
  288. std::function<void(
  289. ServerContextType*,
  290. grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
  291. request_method,
  292. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
  293. : srv_ctx_(new ServerContextType),
  294. next_state_(&ServerRpcContextStreamingImpl::request_done),
  295. request_method_(request_method),
  296. invoke_method_(invoke_method),
  297. stream_(srv_ctx_.get()) {
  298. request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
  299. }
  300. ~ServerRpcContextStreamingImpl() override {}
  301. bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
  302. void Reset() override {
  303. srv_ctx_.reset(new ServerContextType);
  304. req_ = RequestType();
  305. stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
  306. srv_ctx_.get());
  307. // Then request the method
  308. next_state_ = &ServerRpcContextStreamingImpl::request_done;
  309. request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
  310. }
  311. private:
  312. bool request_done(bool ok) {
  313. if (!ok) {
  314. return false;
  315. }
  316. next_state_ = &ServerRpcContextStreamingImpl::read_done;
  317. stream_.Read(&req_, AsyncQpsServerTest::tag(this));
  318. return true;
  319. }
  320. bool read_done(bool ok) {
  321. if (ok) {
  322. // invoke the method
  323. // Call the RPC processing function
  324. grpc::Status status = invoke_method_(&req_, &response_);
  325. // initiate the write
  326. next_state_ = &ServerRpcContextStreamingImpl::write_done;
  327. stream_.Write(response_, AsyncQpsServerTest::tag(this));
  328. } else { // client has sent writes done
  329. // finish the stream
  330. next_state_ = &ServerRpcContextStreamingImpl::finish_done;
  331. stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
  332. }
  333. return true;
  334. }
  335. bool write_done(bool ok) {
  336. // now go back and get another streaming read!
  337. if (ok) {
  338. next_state_ = &ServerRpcContextStreamingImpl::read_done;
  339. stream_.Read(&req_, AsyncQpsServerTest::tag(this));
  340. } else {
  341. next_state_ = &ServerRpcContextStreamingImpl::finish_done;
  342. stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
  343. }
  344. return true;
  345. }
  346. bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
  347. std::unique_ptr<ServerContextType> srv_ctx_;
  348. RequestType req_;
  349. ResponseType response_;
  350. bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
  351. std::function<void(
  352. ServerContextType*,
  353. grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
  354. request_method_;
  355. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
  356. grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
  357. };
  358. class ServerRpcContextStreamingFromClientImpl final
  359. : public ServerRpcContext {
  360. public:
  361. ServerRpcContextStreamingFromClientImpl(
  362. std::function<void(ServerContextType*,
  363. grpc::ServerAsyncReader<ResponseType, RequestType>*,
  364. void*)>
  365. request_method,
  366. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
  367. : srv_ctx_(new ServerContextType),
  368. next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
  369. request_method_(request_method),
  370. invoke_method_(invoke_method),
  371. stream_(srv_ctx_.get()) {
  372. request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
  373. }
  374. ~ServerRpcContextStreamingFromClientImpl() override {}
  375. bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
  376. void Reset() override {
  377. srv_ctx_.reset(new ServerContextType);
  378. req_ = RequestType();
  379. stream_ =
  380. grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
  381. // Then request the method
  382. next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
  383. request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
  384. }
  385. private:
  386. bool request_done(bool ok) {
  387. if (!ok) {
  388. return false;
  389. }
  390. next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
  391. stream_.Read(&req_, AsyncQpsServerTest::tag(this));
  392. return true;
  393. }
  394. bool read_done(bool ok) {
  395. if (ok) {
  396. // In this case, just do another read
  397. // next_state_ is unchanged
  398. stream_.Read(&req_, AsyncQpsServerTest::tag(this));
  399. return true;
  400. } else { // client has sent writes done
  401. // invoke the method
  402. // Call the RPC processing function
  403. grpc::Status status = invoke_method_(&req_, &response_);
  404. // finish the stream
  405. next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
  406. stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
  407. }
  408. return true;
  409. }
  410. bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
  411. std::unique_ptr<ServerContextType> srv_ctx_;
  412. RequestType req_;
  413. ResponseType response_;
  414. bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
  415. std::function<void(ServerContextType*,
  416. grpc::ServerAsyncReader<ResponseType, RequestType>*,
  417. void*)>
  418. request_method_;
  419. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
  420. grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
  421. };
  422. class ServerRpcContextStreamingFromServerImpl final
  423. : public ServerRpcContext {
  424. public:
  425. ServerRpcContextStreamingFromServerImpl(
  426. std::function<void(ServerContextType*, RequestType*,
  427. grpc::ServerAsyncWriter<ResponseType>*, void*)>
  428. request_method,
  429. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
  430. : srv_ctx_(new ServerContextType),
  431. next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
  432. request_method_(request_method),
  433. invoke_method_(invoke_method),
  434. stream_(srv_ctx_.get()) {
  435. request_method_(srv_ctx_.get(), &req_, &stream_,
  436. AsyncQpsServerTest::tag(this));
  437. }
  438. ~ServerRpcContextStreamingFromServerImpl() override {}
  439. bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
  440. void Reset() override {
  441. srv_ctx_.reset(new ServerContextType);
  442. req_ = RequestType();
  443. stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
  444. // Then request the method
  445. next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
  446. request_method_(srv_ctx_.get(), &req_, &stream_,
  447. AsyncQpsServerTest::tag(this));
  448. }
  449. private:
  450. bool request_done(bool ok) {
  451. if (!ok) {
  452. return false;
  453. }
  454. // invoke the method
  455. // Call the RPC processing function
  456. grpc::Status status = invoke_method_(&req_, &response_);
  457. next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
  458. stream_.Write(response_, AsyncQpsServerTest::tag(this));
  459. return true;
  460. }
  461. bool write_done(bool ok) {
  462. if (ok) {
  463. // Do another write!
  464. // next_state_ is unchanged
  465. stream_.Write(response_, AsyncQpsServerTest::tag(this));
  466. } else { // must be done so let's finish
  467. next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
  468. stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
  469. }
  470. return true;
  471. }
  472. bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
  473. std::unique_ptr<ServerContextType> srv_ctx_;
  474. RequestType req_;
  475. ResponseType response_;
  476. bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
  477. std::function<void(ServerContextType*, RequestType*,
  478. grpc::ServerAsyncWriter<ResponseType>*, void*)>
  479. request_method_;
  480. std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
  481. grpc::ServerAsyncWriter<ResponseType> stream_;
  482. };
  483. std::vector<std::thread> threads_;
  484. std::unique_ptr<grpc::Server> server_;
  485. std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
  486. std::vector<int> cq_;
  487. ServiceType async_service_;
  488. std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
  489. struct PerThreadShutdownState {
  490. mutable std::mutex mutex;
  491. bool shutdown;
  492. PerThreadShutdownState() : shutdown(false) {}
  493. };
  494. std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
  495. };
  496. static void RegisterBenchmarkService(ServerBuilder* builder,
  497. BenchmarkService::AsyncService* service) {
  498. builder->RegisterService(service);
  499. }
  500. static void RegisterGenericService(ServerBuilder* builder,
  501. grpc::AsyncGenericService* service) {
  502. builder->RegisterAsyncGenericService(service);
  503. }
  504. static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
  505. SimpleResponse* response) {
  506. if (request->response_size() > 0) {
  507. if (!Server::SetPayload(request->response_type(), request->response_size(),
  508. response->mutable_payload())) {
  509. return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
  510. }
  511. }
  512. // We are done using the request. Clear it to reduce working memory.
  513. // This proves to reduce cache misses in large message size cases.
  514. request->Clear();
  515. return Status::OK;
  516. }
  517. static Status ProcessGenericRPC(const PayloadConfig& payload_config,
  518. ByteBuffer* request, ByteBuffer* response) {
  519. // We are done using the request. Clear it to reduce working memory.
  520. // This proves to reduce cache misses in large message size cases.
  521. request->Clear();
  522. int resp_size = payload_config.bytebuf_params().resp_size();
  523. std::unique_ptr<char[]> buf(new char[resp_size]);
  524. memset(buf.get(), 0, static_cast<size_t>(resp_size));
  525. Slice slice(buf.get(), resp_size);
  526. *response = ByteBuffer(&slice, 1);
  527. return Status::OK;
  528. }
  529. std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
  530. return std::unique_ptr<Server>(
  531. new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
  532. BenchmarkService::AsyncService,
  533. grpc::ServerContext>(
  534. config, RegisterBenchmarkService,
  535. &BenchmarkService::AsyncService::RequestUnaryCall,
  536. &BenchmarkService::AsyncService::RequestStreamingCall,
  537. &BenchmarkService::AsyncService::RequestStreamingFromClient,
  538. &BenchmarkService::AsyncService::RequestStreamingFromServer,
  539. &BenchmarkService::AsyncService::RequestStreamingBothWays,
  540. ProcessSimpleRPC));
  541. }
  542. std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
  543. return std::unique_ptr<Server>(
  544. new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
  545. grpc::GenericServerContext>(
  546. config, RegisterGenericService, nullptr,
  547. &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
  548. ProcessGenericRPC));
  549. }
  550. } // namespace testing
  551. } // namespace grpc