mpscq_test.cc 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. /*
  2. *
  3. * Copyright 2016 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include "src/core/lib/gprpp/mpscq.h"
  19. #include <inttypes.h>
  20. #include <stdlib.h>
  21. #include <grpc/support/alloc.h>
  22. #include <grpc/support/log.h>
  23. #include <grpc/support/sync.h>
  24. #include "src/core/lib/gpr/useful.h"
  25. #include "src/core/lib/gprpp/thd.h"
  26. #include "test/core/util/test_config.h"
  27. using grpc_core::MultiProducerSingleConsumerQueue;
  28. typedef struct test_node {
  29. MultiProducerSingleConsumerQueue::Node node;
  30. size_t i;
  31. size_t* ctr;
  32. } test_node;
  33. static test_node* new_node(size_t i, size_t* ctr) {
  34. test_node* n = new test_node();
  35. n->i = i;
  36. n->ctr = ctr;
  37. return n;
  38. }
  39. static void test_serial(void) {
  40. gpr_log(GPR_DEBUG, "test_serial");
  41. MultiProducerSingleConsumerQueue q;
  42. for (size_t i = 0; i < 10000000; i++) {
  43. q.Push(&new_node(i, nullptr)->node);
  44. }
  45. for (size_t i = 0; i < 10000000; i++) {
  46. test_node* n = reinterpret_cast<test_node*>(q.Pop());
  47. GPR_ASSERT(n);
  48. GPR_ASSERT(n->i == i);
  49. delete n;
  50. }
  51. }
  52. typedef struct {
  53. size_t ctr;
  54. MultiProducerSingleConsumerQueue* q;
  55. gpr_event* start;
  56. } thd_args;
  57. #define THREAD_ITERATIONS 10000
  58. static void test_thread(void* args) {
  59. thd_args* a = static_cast<thd_args*>(args);
  60. gpr_event_wait(a->start, gpr_inf_future(GPR_CLOCK_REALTIME));
  61. for (size_t i = 1; i <= THREAD_ITERATIONS; i++) {
  62. a->q->Push(&new_node(i, &a->ctr)->node);
  63. }
  64. }
  65. static void test_mt(void) {
  66. gpr_log(GPR_DEBUG, "test_mt");
  67. gpr_event start;
  68. gpr_event_init(&start);
  69. grpc_core::Thread thds[100];
  70. thd_args ta[GPR_ARRAY_SIZE(thds)];
  71. MultiProducerSingleConsumerQueue q;
  72. for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
  73. ta[i].ctr = 0;
  74. ta[i].q = &q;
  75. ta[i].start = &start;
  76. thds[i] = grpc_core::Thread("grpc_mt_test", test_thread, &ta[i]);
  77. thds[i].Start();
  78. }
  79. size_t num_done = 0;
  80. size_t spins = 0;
  81. gpr_event_set(&start, reinterpret_cast<void*>(1));
  82. while (num_done != GPR_ARRAY_SIZE(thds)) {
  83. MultiProducerSingleConsumerQueue::Node* n;
  84. while ((n = q.Pop()) == nullptr) {
  85. spins++;
  86. }
  87. test_node* tn = reinterpret_cast<test_node*>(n);
  88. GPR_ASSERT(*tn->ctr == tn->i - 1);
  89. *tn->ctr = tn->i;
  90. if (tn->i == THREAD_ITERATIONS) num_done++;
  91. delete tn;
  92. }
  93. gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, spins);
  94. for (auto& th : thds) {
  95. th.Join();
  96. }
  97. }
  98. typedef struct {
  99. thd_args* ta;
  100. size_t num_thds;
  101. gpr_mu mu;
  102. size_t num_done;
  103. size_t spins;
  104. MultiProducerSingleConsumerQueue* q;
  105. gpr_event* start;
  106. } pull_args;
  107. static void pull_thread(void* arg) {
  108. pull_args* pa = static_cast<pull_args*>(arg);
  109. gpr_event_wait(pa->start, gpr_inf_future(GPR_CLOCK_REALTIME));
  110. for (;;) {
  111. gpr_mu_lock(&pa->mu);
  112. if (pa->num_done == pa->num_thds) {
  113. gpr_mu_unlock(&pa->mu);
  114. return;
  115. }
  116. MultiProducerSingleConsumerQueue::Node* n;
  117. while ((n = pa->q->Pop()) == nullptr) {
  118. pa->spins++;
  119. }
  120. test_node* tn = reinterpret_cast<test_node*>(n);
  121. GPR_ASSERT(*tn->ctr == tn->i - 1);
  122. *tn->ctr = tn->i;
  123. if (tn->i == THREAD_ITERATIONS) pa->num_done++;
  124. delete tn;
  125. gpr_mu_unlock(&pa->mu);
  126. }
  127. }
  128. static void test_mt_multipop(void) {
  129. gpr_log(GPR_DEBUG, "test_mt_multipop");
  130. gpr_event start;
  131. gpr_event_init(&start);
  132. grpc_core::Thread thds[50];
  133. grpc_core::Thread pull_thds[50];
  134. thd_args ta[GPR_ARRAY_SIZE(thds)];
  135. MultiProducerSingleConsumerQueue q;
  136. for (size_t i = 0; i < GPR_ARRAY_SIZE(thds); i++) {
  137. ta[i].ctr = 0;
  138. ta[i].q = &q;
  139. ta[i].start = &start;
  140. thds[i] = grpc_core::Thread("grpc_multipop_test", test_thread, &ta[i]);
  141. thds[i].Start();
  142. }
  143. pull_args pa;
  144. pa.ta = ta;
  145. pa.num_thds = GPR_ARRAY_SIZE(thds);
  146. pa.spins = 0;
  147. pa.num_done = 0;
  148. pa.q = &q;
  149. pa.start = &start;
  150. gpr_mu_init(&pa.mu);
  151. for (size_t i = 0; i < GPR_ARRAY_SIZE(pull_thds); i++) {
  152. pull_thds[i] = grpc_core::Thread("grpc_multipop_pull", pull_thread, &pa);
  153. pull_thds[i].Start();
  154. }
  155. gpr_event_set(&start, reinterpret_cast<void*>(1));
  156. for (auto& pth : pull_thds) {
  157. pth.Join();
  158. }
  159. gpr_log(GPR_DEBUG, "spins: %" PRIdPTR, pa.spins);
  160. for (auto& th : thds) {
  161. th.Join();
  162. }
  163. gpr_mu_destroy(&pa.mu);
  164. }
  165. int main(int argc, char** argv) {
  166. grpc::testing::TestEnvironment env(argc, argv);
  167. test_serial();
  168. test_mt();
  169. test_mt_multipop();
  170. return 0;
  171. }