concurrent_connectivity_test.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <memory.h>
  19. #include <stdio.h>
  20. #include <atomic>
  21. #include <string>
  22. #include "absl/strings/str_cat.h"
  23. #include <grpc/grpc.h>
  24. #include <grpc/grpc_security.h>
  25. #include <grpc/support/alloc.h>
  26. #include <grpc/support/log.h>
  27. #include "src/core/lib/address_utils/sockaddr_utils.h"
  28. #include "src/core/lib/gprpp/thd.h"
  29. #include "src/core/lib/gprpp/time.h"
  30. #include "src/core/lib/iomgr/exec_ctx.h"
  31. #include "src/core/lib/iomgr/iomgr.h"
  32. #include "src/core/lib/iomgr/resolve_address.h"
  33. #include "src/core/lib/iomgr/sockaddr.h"
  34. #include "src/core/lib/iomgr/tcp_server.h"
  35. #include "src/core/lib/resource_quota/api.h"
  36. #include "test/core/util/port.h"
  37. #include "test/core/util/test_config.h"
  38. /* TODO(yashykt): When our macos testing infrastructure becomes good enough, we
  39. * wouldn't need to reduce the number of threads on MacOS */
  40. #ifdef __APPLE__
  41. #define NUM_THREADS 10
  42. #else
  43. #define NUM_THREADS 100
  44. #endif /* __APPLE */
  45. #define NUM_OUTER_LOOPS 10
  46. #define NUM_INNER_LOOPS 10
  47. #define DELAY_MILLIS 10
  48. #define POLL_MILLIS 15000
  49. #define NUM_OUTER_LOOPS_SHORT_TIMEOUTS 10
  50. #define NUM_INNER_LOOPS_SHORT_TIMEOUTS 100
  51. #define DELAY_MILLIS_SHORT_TIMEOUTS 1
  52. // in a successful test run, POLL_MILLIS should never be reached because all
  53. // runs should end after the shorter delay_millis
  54. #define POLL_MILLIS_SHORT_TIMEOUTS 30000
  55. // it should never take longer that this to shutdown the server
  56. #define SERVER_SHUTDOWN_TIMEOUT 30000
  57. static void* tag(int n) { return reinterpret_cast<void*>(n); }
  58. void create_loop_destroy(void* addr) {
  59. for (int i = 0; i < NUM_OUTER_LOOPS; ++i) {
  60. grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
  61. grpc_channel_credentials* creds = grpc_insecure_credentials_create();
  62. grpc_channel* chan =
  63. grpc_channel_create(static_cast<char*>(addr), creds, nullptr);
  64. grpc_channel_credentials_release(creds);
  65. for (int j = 0; j < NUM_INNER_LOOPS; ++j) {
  66. gpr_timespec later_time =
  67. grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS);
  68. grpc_connectivity_state state =
  69. grpc_channel_check_connectivity_state(chan, 1);
  70. grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
  71. nullptr);
  72. gpr_timespec poll_time =
  73. grpc_timeout_milliseconds_to_deadline(POLL_MILLIS);
  74. GPR_ASSERT(grpc_completion_queue_next(cq, poll_time, nullptr).type ==
  75. GRPC_OP_COMPLETE);
  76. /* check that the watcher from "watch state" was free'd */
  77. GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
  78. }
  79. grpc_channel_destroy(chan);
  80. grpc_completion_queue_destroy(cq);
  81. }
  82. }
  83. // Always stack-allocate or new ServerThreadArgs; never use gpr_malloc since
  84. // this contains C++ objects.
  85. struct ServerThreadArgs {
  86. std::string addr;
  87. grpc_server* server = nullptr;
  88. grpc_completion_queue* cq = nullptr;
  89. std::vector<grpc_pollset*> pollset;
  90. gpr_mu* mu = nullptr;
  91. gpr_event ready;
  92. std::atomic_bool stop{false};
  93. };
  94. void server_thread(void* vargs) {
  95. struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
  96. grpc_event ev;
  97. gpr_timespec deadline =
  98. grpc_timeout_milliseconds_to_deadline(SERVER_SHUTDOWN_TIMEOUT);
  99. ev = grpc_completion_queue_next(args->cq, deadline, nullptr);
  100. GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
  101. GPR_ASSERT(ev.tag == tag(0xd1e));
  102. }
  103. static void on_connect(void* vargs, grpc_endpoint* tcp,
  104. grpc_pollset* /*accepting_pollset*/,
  105. grpc_tcp_server_acceptor* acceptor) {
  106. gpr_free(acceptor);
  107. struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
  108. grpc_endpoint_shutdown(tcp,
  109. GRPC_ERROR_CREATE_FROM_STATIC_STRING("Connected"));
  110. grpc_endpoint_destroy(tcp);
  111. gpr_mu_lock(args->mu);
  112. GRPC_LOG_IF_ERROR("pollset_kick",
  113. grpc_pollset_kick(args->pollset[0], nullptr));
  114. gpr_mu_unlock(args->mu);
  115. }
  116. void bad_server_thread(void* vargs) {
  117. struct ServerThreadArgs* args = static_cast<struct ServerThreadArgs*>(vargs);
  118. grpc_core::ExecCtx exec_ctx;
  119. grpc_resolved_address resolved_addr;
  120. grpc_sockaddr* addr = reinterpret_cast<grpc_sockaddr*>(resolved_addr.addr);
  121. int port;
  122. grpc_tcp_server* s;
  123. const grpc_channel_args* channel_args = grpc_core::CoreConfiguration::Get()
  124. .channel_args_preconditioning()
  125. .PreconditionChannelArgs(nullptr);
  126. grpc_error_handle error = grpc_tcp_server_create(nullptr, channel_args, &s);
  127. grpc_channel_args_destroy(channel_args);
  128. GPR_ASSERT(error == GRPC_ERROR_NONE);
  129. memset(&resolved_addr, 0, sizeof(resolved_addr));
  130. addr->sa_family = GRPC_AF_INET;
  131. error = grpc_tcp_server_add_port(s, &resolved_addr, &port);
  132. GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_tcp_server_add_port", error));
  133. GPR_ASSERT(port > 0);
  134. args->addr = absl::StrCat("localhost:", port);
  135. grpc_tcp_server_start(s, &args->pollset, on_connect, args);
  136. gpr_event_set(&args->ready, reinterpret_cast<void*>(1));
  137. gpr_mu_lock(args->mu);
  138. while (!args->stop.load(std::memory_order_acquire)) {
  139. grpc_core::Timestamp deadline = grpc_core::ExecCtx::Get()->Now() +
  140. grpc_core::Duration::Milliseconds(100);
  141. grpc_pollset_worker* worker = nullptr;
  142. if (!GRPC_LOG_IF_ERROR(
  143. "pollset_work",
  144. grpc_pollset_work(args->pollset[0], &worker, deadline))) {
  145. args->stop.store(true, std::memory_order_release);
  146. }
  147. gpr_mu_unlock(args->mu);
  148. gpr_mu_lock(args->mu);
  149. }
  150. gpr_mu_unlock(args->mu);
  151. grpc_tcp_server_unref(s);
  152. }
  153. static void done_pollset_shutdown(void* pollset, grpc_error_handle /*error*/) {
  154. grpc_pollset_destroy(static_cast<grpc_pollset*>(pollset));
  155. gpr_free(pollset);
  156. }
  157. int run_concurrent_connectivity_test() {
  158. struct ServerThreadArgs args;
  159. grpc_init();
  160. /* First round, no server */
  161. {
  162. gpr_log(GPR_DEBUG, "Wave 1");
  163. grpc_core::Thread threads[NUM_THREADS];
  164. args.addr = "localhost:54321";
  165. for (auto& th : threads) {
  166. th = grpc_core::Thread("grpc_wave_1", create_loop_destroy,
  167. const_cast<char*>(args.addr.c_str()));
  168. th.Start();
  169. }
  170. for (auto& th : threads) {
  171. th.Join();
  172. }
  173. }
  174. {
  175. /* Second round, actual grpc server */
  176. gpr_log(GPR_DEBUG, "Wave 2");
  177. int port = grpc_pick_unused_port_or_die();
  178. args.addr = absl::StrCat("localhost:", port);
  179. args.server = grpc_server_create(nullptr, nullptr);
  180. grpc_server_credentials* server_creds =
  181. grpc_insecure_server_credentials_create();
  182. grpc_server_add_http2_port(args.server, args.addr.c_str(), server_creds);
  183. grpc_server_credentials_release(server_creds);
  184. args.cq = grpc_completion_queue_create_for_next(nullptr);
  185. grpc_server_register_completion_queue(args.server, args.cq, nullptr);
  186. grpc_server_start(args.server);
  187. grpc_core::Thread server2("grpc_wave_2_server", server_thread, &args);
  188. server2.Start();
  189. grpc_core::Thread threads[NUM_THREADS];
  190. for (auto& th : threads) {
  191. th = grpc_core::Thread("grpc_wave_2", create_loop_destroy,
  192. const_cast<char*>(args.addr.c_str()));
  193. th.Start();
  194. }
  195. for (auto& th : threads) {
  196. th.Join();
  197. }
  198. grpc_server_shutdown_and_notify(args.server, args.cq, tag(0xd1e));
  199. server2.Join();
  200. grpc_server_destroy(args.server);
  201. grpc_completion_queue_destroy(args.cq);
  202. }
  203. {
  204. /* Third round, bogus tcp server */
  205. gpr_log(GPR_DEBUG, "Wave 3");
  206. auto* pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
  207. grpc_pollset_init(pollset, &args.mu);
  208. args.pollset.push_back(pollset);
  209. gpr_event_init(&args.ready);
  210. grpc_core::Thread server3("grpc_wave_3_server", bad_server_thread, &args);
  211. server3.Start();
  212. gpr_event_wait(&args.ready, gpr_inf_future(GPR_CLOCK_MONOTONIC));
  213. grpc_core::Thread threads[NUM_THREADS];
  214. for (auto& th : threads) {
  215. th = grpc_core::Thread("grpc_wave_3", create_loop_destroy,
  216. const_cast<char*>(args.addr.c_str()));
  217. th.Start();
  218. }
  219. for (auto& th : threads) {
  220. th.Join();
  221. }
  222. args.stop.store(true, std::memory_order_release);
  223. server3.Join();
  224. {
  225. grpc_core::ExecCtx exec_ctx;
  226. grpc_pollset_shutdown(
  227. args.pollset[0],
  228. GRPC_CLOSURE_CREATE(done_pollset_shutdown, args.pollset[0],
  229. grpc_schedule_on_exec_ctx));
  230. }
  231. }
  232. grpc_shutdown();
  233. return 0;
  234. }
  235. void watches_with_short_timeouts(void* addr) {
  236. for (int i = 0; i < NUM_OUTER_LOOPS_SHORT_TIMEOUTS; ++i) {
  237. grpc_completion_queue* cq = grpc_completion_queue_create_for_next(nullptr);
  238. grpc_channel_credentials* creds = grpc_insecure_credentials_create();
  239. grpc_channel* chan =
  240. grpc_channel_create(static_cast<char*>(addr), creds, nullptr);
  241. grpc_channel_credentials_release(creds);
  242. for (int j = 0; j < NUM_INNER_LOOPS_SHORT_TIMEOUTS; ++j) {
  243. gpr_timespec later_time =
  244. grpc_timeout_milliseconds_to_deadline(DELAY_MILLIS_SHORT_TIMEOUTS);
  245. grpc_connectivity_state state =
  246. grpc_channel_check_connectivity_state(chan, 0);
  247. GPR_ASSERT(state == GRPC_CHANNEL_IDLE);
  248. grpc_channel_watch_connectivity_state(chan, state, later_time, cq,
  249. nullptr);
  250. gpr_timespec poll_time =
  251. grpc_timeout_milliseconds_to_deadline(POLL_MILLIS_SHORT_TIMEOUTS);
  252. grpc_event ev = grpc_completion_queue_next(cq, poll_time, nullptr);
  253. GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
  254. GPR_ASSERT(ev.success == false);
  255. /* check that the watcher from "watch state" was free'd */
  256. GPR_ASSERT(grpc_channel_num_external_connectivity_watchers(chan) == 0);
  257. }
  258. grpc_channel_destroy(chan);
  259. grpc_completion_queue_destroy(cq);
  260. }
  261. }
  262. // This test tries to catch deadlock situations.
  263. // With short timeouts on "watches" and long timeouts on cq next calls,
  264. // so that a QUEUE_TIMEOUT likely means that something is stuck.
  265. int run_concurrent_watches_with_short_timeouts_test() {
  266. grpc_init();
  267. grpc_core::Thread threads[NUM_THREADS];
  268. for (auto& th : threads) {
  269. th = grpc_core::Thread("grpc_short_watches", watches_with_short_timeouts,
  270. const_cast<char*>("localhost:54321"));
  271. th.Start();
  272. }
  273. for (auto& th : threads) {
  274. th.Join();
  275. }
  276. grpc_shutdown();
  277. return 0;
  278. }
  279. int main(int argc, char** argv) {
  280. grpc::testing::TestEnvironment env(argc, argv);
  281. run_concurrent_connectivity_test();
  282. run_concurrent_watches_with_short_timeouts_test();
  283. }