pollset_windows_starvation_test.cc 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <vector>
  19. #include <grpc/grpc.h>
  20. #include <grpc/support/time.h>
  21. #include "src/core/lib/gprpp/thd.h"
  22. #include "src/core/lib/iomgr/exec_ctx.h"
  23. #include "src/core/lib/iomgr/iocp_windows.h"
  24. #include "src/core/lib/iomgr/iomgr_internal.h"
  25. #include "src/core/lib/iomgr/pollset.h"
  26. #include "src/core/lib/iomgr/pollset_windows.h"
  27. #include "src/core/lib/surface/init.h"
  28. #include "test/core/util/test_config.h"
  29. #if defined(GRPC_WINSOCK_SOCKET)
  30. // At least three threads are required to reproduce #18848
  31. const size_t THREADS = 3;
  32. struct ThreadParams {
  33. gpr_cv cv;
  34. gpr_mu mu;
  35. int complete;
  36. int queuing;
  37. gpr_mu* pollset_mu[THREADS];
  38. };
  39. int main(int argc, char** argv) {
  40. grpc_init();
  41. // Create the threads that all start queueing for work.
  42. //
  43. // The first one becomes the active poller for work and the two other
  44. // threads go into the poller queue.
  45. //
  46. // When work arrives, the first one notifies the next queued poller,
  47. // this wakes the second thread - however all this does is return from
  48. // the grpc_pollset_work function. It's up to that thread to figure
  49. // out if it still wants to queue for more work or if it should kick
  50. // other pollers.
  51. //
  52. // Previously that kick only affected pollers in the same pollset, thus
  53. // leaving the other threads stuck in the poller queue. Now the pollset-
  54. // specific grpc_pollset_kick will also kick pollers from other pollsets
  55. // if there are no pollers in the current pollset. This frees up the
  56. // last threads and completes the test.
  57. ThreadParams params = {};
  58. gpr_cv_init(&params.cv);
  59. gpr_mu_init(&params.mu);
  60. std::vector<grpc_core::Thread> threads;
  61. for (int i = 0; i < THREADS; i++) {
  62. grpc_core::Thread thd(
  63. "Poller",
  64. [](void* params) {
  65. ThreadParams* tparams = static_cast<ThreadParams*>(params);
  66. grpc_core::ExecCtx exec_ctx;
  67. gpr_mu* mu;
  68. grpc_pollset pollset = {};
  69. grpc_pollset_init(&pollset, &mu);
  70. // Lock the pollset mutex before notifying the test runner thread that
  71. // one more thread is queuing. This allows the test runner thread to
  72. // wait for all threads to be queued before sending the first kick by
  73. // waiting for the mutexes to be released, which happens in
  74. // gpr_pollset_work when the poller is queued.
  75. gpr_mu_lock(mu);
  76. gpr_mu_lock(&tparams->mu);
  77. tparams->pollset_mu[tparams->queuing] = mu;
  78. tparams->queuing++;
  79. gpr_cv_signal(&tparams->cv);
  80. gpr_mu_unlock(&tparams->mu);
  81. // Queue for work and once we're done, make sure to kick the remaining
  82. // threads.
  83. grpc_error_handle error;
  84. error = grpc_pollset_work(&pollset, NULL,
  85. grpc_core::Timestamp::InfFuture());
  86. error = grpc_pollset_kick(&pollset, NULL);
  87. gpr_mu_unlock(mu);
  88. gpr_mu_lock(&tparams->mu);
  89. tparams->complete++;
  90. gpr_cv_signal(&tparams->cv);
  91. gpr_mu_unlock(&tparams->mu);
  92. },
  93. &params);
  94. thd.Start();
  95. threads.push_back(std::move(thd));
  96. }
  97. // Wait for all three threads to be queuing.
  98. gpr_mu_lock(&params.mu);
  99. while (
  100. params.queuing != THREADS &&
  101. !gpr_cv_wait(&params.cv, &params.mu, gpr_inf_future(GPR_CLOCK_REALTIME)))
  102. ;
  103. gpr_mu_unlock(&params.mu);
  104. // Wait for the mutexes to be released. This indicates that the threads have
  105. // entered the work wait.
  106. //
  107. // At least currently these are essentially all references to the same global
  108. // pollset mutex, but we are still waiting on them once for each thread in
  109. // the case this ever changes.
  110. for (int i = 0; i < THREADS; i++) {
  111. gpr_mu_lock(params.pollset_mu[i]);
  112. gpr_mu_unlock(params.pollset_mu[i]);
  113. }
  114. grpc_iocp_kick();
  115. // Wait for the threads to complete.
  116. gpr_mu_lock(&params.mu);
  117. while (
  118. params.complete != THREADS &&
  119. !gpr_cv_wait(&params.cv, &params.mu, gpr_inf_future(GPR_CLOCK_REALTIME)))
  120. ;
  121. gpr_mu_unlock(&params.mu);
  122. for (auto& t : threads) t.Join();
  123. return EXIT_SUCCESS;
  124. }
  125. #else /* defined(GRPC_WINSOCK_SOCKET) */
  126. int main(int /*argc*/, char** /*argv*/) { return 0; }
  127. #endif