server.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <signal.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <string.h>
  22. #include <time.h>
  23. #include <string>
  24. #include "absl/flags/flag.h"
  25. #include "absl/flags/parse.h"
  26. #include <grpc/grpc.h>
  27. #include <grpc/grpc_security.h>
  28. #include "test/core/memory_usage/memstats.h"
  29. #ifndef _WIN32
  30. /* This is for _exit() below, which is temporary. */
  31. #include <unistd.h>
  32. #endif
  33. #include <grpc/support/alloc.h>
  34. #include <grpc/support/log.h>
  35. #include <grpc/support/time.h>
  36. #include "src/core/lib/gprpp/host_port.h"
  37. #include "test/core/end2end/data/ssl_test_data.h"
  38. #include "test/core/memory_usage/memstats.h"
  39. #include "test/core/util/port.h"
  40. #include "test/core/util/test_config.h"
  41. static grpc_completion_queue* cq;
  42. static grpc_server* server;
  43. static grpc_op metadata_ops[2];
  44. static grpc_op snapshot_ops[5];
  45. static grpc_op status_op;
  46. static int got_sigint = 0;
  47. static grpc_byte_buffer* payload_buffer = nullptr;
  48. static grpc_byte_buffer* terminal_buffer = nullptr;
  49. static int was_cancelled = 2;
  50. static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
  51. typedef enum {
  52. FLING_SERVER_NEW_REQUEST = 1,
  53. FLING_SERVER_SEND_INIT_METADATA,
  54. FLING_SERVER_WAIT_FOR_DESTROY,
  55. FLING_SERVER_SEND_STATUS_FLING_CALL,
  56. FLING_SERVER_SEND_STATUS_SNAPSHOT,
  57. FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL
  58. } fling_server_tags;
  59. typedef struct {
  60. fling_server_tags state;
  61. grpc_call* call;
  62. grpc_call_details call_details;
  63. grpc_metadata_array request_metadata_recv;
  64. grpc_metadata_array initial_metadata_send;
  65. } fling_call;
  66. // hold up to 100000 calls and 6 snaphost calls
  67. static fling_call calls[1000006];
  68. static void request_call_unary(int call_idx) {
  69. if (call_idx == static_cast<int>(sizeof(calls) / sizeof(fling_call))) {
  70. gpr_log(GPR_INFO, "Used all call slots (10000) on server. Server exit.");
  71. _exit(0);
  72. }
  73. grpc_metadata_array_init(&calls[call_idx].request_metadata_recv);
  74. grpc_server_request_call(
  75. server, &calls[call_idx].call, &calls[call_idx].call_details,
  76. &calls[call_idx].request_metadata_recv, cq, cq, &calls[call_idx]);
  77. }
  78. static void send_initial_metadata_unary(void* tag) {
  79. grpc_metadata_array_init(
  80. &(*static_cast<fling_call*>(tag)).initial_metadata_send);
  81. metadata_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
  82. metadata_ops[0].data.send_initial_metadata.count = 0;
  83. GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch((*(fling_call*)tag).call,
  84. metadata_ops, 1, tag,
  85. nullptr));
  86. }
  87. static void send_status(void* tag) {
  88. status_op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  89. status_op.data.send_status_from_server.status = GRPC_STATUS_OK;
  90. status_op.data.send_status_from_server.trailing_metadata_count = 0;
  91. grpc_slice details = grpc_slice_from_static_string("");
  92. status_op.data.send_status_from_server.status_details = &details;
  93. GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch((*(fling_call*)tag).call,
  94. &status_op, 1, tag,
  95. nullptr));
  96. }
  97. static void send_snapshot(void* tag, MemStats* snapshot) {
  98. grpc_op* op;
  99. grpc_slice snapshot_slice =
  100. grpc_slice_new(snapshot, sizeof(*snapshot), gpr_free);
  101. payload_buffer = grpc_raw_byte_buffer_create(&snapshot_slice, 1);
  102. grpc_metadata_array_init(
  103. &(*static_cast<fling_call*>(tag)).initial_metadata_send);
  104. op = snapshot_ops;
  105. op->op = GRPC_OP_SEND_INITIAL_METADATA;
  106. op->data.send_initial_metadata.count = 0;
  107. op++;
  108. op->op = GRPC_OP_RECV_MESSAGE;
  109. op->data.recv_message.recv_message = &terminal_buffer;
  110. op++;
  111. op->op = GRPC_OP_SEND_MESSAGE;
  112. if (payload_buffer == nullptr) {
  113. gpr_log(GPR_INFO, "NULL payload buffer !!!");
  114. }
  115. op->data.send_message.send_message = payload_buffer;
  116. op++;
  117. op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
  118. op->data.send_status_from_server.status = GRPC_STATUS_OK;
  119. op->data.send_status_from_server.trailing_metadata_count = 0;
  120. grpc_slice details = grpc_slice_from_static_string("");
  121. op->data.send_status_from_server.status_details = &details;
  122. op++;
  123. op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
  124. op->data.recv_close_on_server.cancelled = &was_cancelled;
  125. op++;
  126. GPR_ASSERT(GRPC_CALL_OK ==
  127. grpc_call_start_batch((*(fling_call*)tag).call, snapshot_ops,
  128. (size_t)(op - snapshot_ops), tag, nullptr));
  129. }
  130. /* We have some sort of deadlock, so let's not exit gracefully for now.
  131. When that is resolved, please remove the #include <unistd.h> above. */
  132. static void sigint_handler(int /*x*/) { _exit(0); }
  133. ABSL_FLAG(std::string, bind, "", "Bind host:port");
  134. ABSL_FLAG(bool, secure, false, "Use security");
  135. int main(int argc, char** argv) {
  136. absl::ParseCommandLine(argc, argv);
  137. grpc_event ev;
  138. grpc_completion_queue* shutdown_cq;
  139. int shutdown_started = 0;
  140. int shutdown_finished = 0;
  141. char* fake_argv[1];
  142. GPR_ASSERT(argc >= 1);
  143. fake_argv[0] = argv[0];
  144. grpc::testing::TestEnvironment env(1, fake_argv);
  145. grpc_init();
  146. srand(static_cast<unsigned>(clock()));
  147. std::string addr = absl::GetFlag(FLAGS_bind);
  148. if (addr.empty()) {
  149. addr = grpc_core::JoinHostPort("::", grpc_pick_unused_port_or_die());
  150. }
  151. gpr_log(GPR_INFO, "creating server on: %s", addr.c_str());
  152. cq = grpc_completion_queue_create_for_next(nullptr);
  153. MemStats before_server_create = MemStats::Snapshot();
  154. if (absl::GetFlag(FLAGS_secure)) {
  155. grpc_ssl_pem_key_cert_pair pem_key_cert_pair = {test_server1_key,
  156. test_server1_cert};
  157. grpc_server_credentials* ssl_creds = grpc_ssl_server_credentials_create(
  158. nullptr, &pem_key_cert_pair, 1, 0, nullptr);
  159. server = grpc_server_create(nullptr, nullptr);
  160. GPR_ASSERT(grpc_server_add_http2_port(server, addr.c_str(), ssl_creds));
  161. grpc_server_credentials_release(ssl_creds);
  162. } else {
  163. server = grpc_server_create(nullptr, nullptr);
  164. GPR_ASSERT(grpc_server_add_http2_port(
  165. server, addr.c_str(), grpc_insecure_server_credentials_create()));
  166. }
  167. grpc_server_register_completion_queue(server, cq, nullptr);
  168. grpc_server_start(server);
  169. MemStats after_server_create = MemStats::Snapshot();
  170. // initialize call instances
  171. for (int i = 0; i < static_cast<int>(sizeof(calls) / sizeof(fling_call));
  172. i++) {
  173. grpc_call_details_init(&calls[i].call_details);
  174. calls[i].state = FLING_SERVER_NEW_REQUEST;
  175. }
  176. int next_call_idx = 0;
  177. MemStats current_snapshot;
  178. request_call_unary(next_call_idx);
  179. signal(SIGINT, sigint_handler);
  180. while (!shutdown_finished) {
  181. if (got_sigint && !shutdown_started) {
  182. gpr_log(GPR_INFO, "Shutting down due to SIGINT");
  183. shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
  184. grpc_server_shutdown_and_notify(server, shutdown_cq, tag(1000));
  185. GPR_ASSERT(grpc_completion_queue_pluck(
  186. shutdown_cq, tag(1000),
  187. grpc_timeout_seconds_to_deadline(5), nullptr)
  188. .type == GRPC_OP_COMPLETE);
  189. grpc_completion_queue_destroy(shutdown_cq);
  190. grpc_completion_queue_shutdown(cq);
  191. shutdown_started = 1;
  192. }
  193. ev = grpc_completion_queue_next(
  194. cq,
  195. gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
  196. gpr_time_from_micros(1000000, GPR_TIMESPAN)),
  197. nullptr);
  198. fling_call* s = static_cast<fling_call*>(ev.tag);
  199. switch (ev.type) {
  200. case GRPC_OP_COMPLETE:
  201. switch (s->state) {
  202. case FLING_SERVER_NEW_REQUEST:
  203. request_call_unary(++next_call_idx);
  204. if (0 == grpc_slice_str_cmp(s->call_details.method,
  205. "/Reflector/reflectUnary")) {
  206. s->state = FLING_SERVER_SEND_INIT_METADATA;
  207. send_initial_metadata_unary(s);
  208. } else if (0 ==
  209. grpc_slice_str_cmp(s->call_details.method,
  210. "Reflector/GetBeforeSvrCreation")) {
  211. s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
  212. send_snapshot(s, &before_server_create);
  213. } else if (0 ==
  214. grpc_slice_str_cmp(s->call_details.method,
  215. "Reflector/GetAfterSvrCreation")) {
  216. s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
  217. send_snapshot(s, &after_server_create);
  218. } else if (0 == grpc_slice_str_cmp(s->call_details.method,
  219. "Reflector/SimpleSnapshot")) {
  220. s->state = FLING_SERVER_SEND_STATUS_SNAPSHOT;
  221. current_snapshot = MemStats::Snapshot();
  222. send_snapshot(s, &current_snapshot);
  223. } else if (0 == grpc_slice_str_cmp(s->call_details.method,
  224. "Reflector/DestroyCalls")) {
  225. s->state = FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL;
  226. current_snapshot = MemStats::Snapshot();
  227. send_snapshot(s, &current_snapshot);
  228. } else {
  229. gpr_log(GPR_ERROR, "Wrong call method");
  230. }
  231. break;
  232. case FLING_SERVER_SEND_INIT_METADATA:
  233. s->state = FLING_SERVER_WAIT_FOR_DESTROY;
  234. break;
  235. case FLING_SERVER_WAIT_FOR_DESTROY:
  236. break;
  237. case FLING_SERVER_SEND_STATUS_FLING_CALL:
  238. grpc_call_unref(s->call);
  239. grpc_call_details_destroy(&s->call_details);
  240. grpc_metadata_array_destroy(&s->initial_metadata_send);
  241. grpc_metadata_array_destroy(&s->request_metadata_recv);
  242. break;
  243. case FLING_SERVER_BATCH_SEND_STATUS_FLING_CALL:
  244. for (int k = 0;
  245. k < static_cast<int>(sizeof(calls) / sizeof(fling_call));
  246. ++k) {
  247. if (calls[k].state == FLING_SERVER_WAIT_FOR_DESTROY) {
  248. calls[k].state = FLING_SERVER_SEND_STATUS_FLING_CALL;
  249. send_status(&calls[k]);
  250. }
  251. }
  252. ABSL_FALLTHROUGH_INTENDED;
  253. // no break here since we want to continue to case
  254. // FLING_SERVER_SEND_STATUS_SNAPSHOT to destroy the snapshot call
  255. case FLING_SERVER_SEND_STATUS_SNAPSHOT:
  256. grpc_byte_buffer_destroy(payload_buffer);
  257. grpc_byte_buffer_destroy(terminal_buffer);
  258. grpc_call_unref(s->call);
  259. grpc_call_details_destroy(&s->call_details);
  260. grpc_metadata_array_destroy(&s->initial_metadata_send);
  261. grpc_metadata_array_destroy(&s->request_metadata_recv);
  262. terminal_buffer = nullptr;
  263. payload_buffer = nullptr;
  264. break;
  265. }
  266. break;
  267. case GRPC_QUEUE_SHUTDOWN:
  268. GPR_ASSERT(shutdown_started);
  269. shutdown_finished = 1;
  270. break;
  271. case GRPC_QUEUE_TIMEOUT:
  272. break;
  273. }
  274. }
  275. grpc_server_destroy(server);
  276. grpc_completion_queue_destroy(cq);
  277. grpc_shutdown_blocking();
  278. return 0;
  279. }