bm_threadpool.cc 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <condition_variable>
  19. #include <mutex>
  20. #include <benchmark/benchmark.h>
  21. #include <grpc/grpc.h>
  22. #include "src/core/lib/iomgr/executor/threadpool.h"
  23. #include "test/core/util/test_config.h"
  24. #include "test/cpp/microbenchmarks/helpers.h"
  25. #include "test/cpp/util/test_config.h"
  26. namespace grpc {
  27. namespace testing {
  28. // This helper class allows a thread to block for a pre-specified number of
  29. // actions. BlockingCounter has an initial non-negative count on initialization.
  30. // Each call to DecrementCount will decrease the count by 1. When making a call
  31. // to Wait, if the count is greater than 0, the thread will be blocked, until
  32. // the count reaches 0.
  33. class BlockingCounter {
  34. public:
  35. explicit BlockingCounter(int count) : count_(count) {}
  36. void DecrementCount() {
  37. std::lock_guard<std::mutex> l(mu_);
  38. count_--;
  39. if (count_ == 0) cv_.notify_all();
  40. }
  41. void Wait() {
  42. std::unique_lock<std::mutex> l(mu_);
  43. while (count_ > 0) {
  44. cv_.wait(l);
  45. }
  46. }
  47. private:
  48. int count_;
  49. std::mutex mu_;
  50. std::condition_variable cv_;
  51. };
  52. // This is a functor/closure class for threadpool microbenchmark.
  53. // This functor (closure) class will add another functor into pool if the
  54. // number passed in (num_add) is greater than 0. Otherwise, it will decrement
  55. // the counter to indicate that task is finished. This functor will suicide at
  56. // the end, therefore, no need for caller to do clean-ups.
  57. class AddAnotherFunctor : public grpc_completion_queue_functor {
  58. public:
  59. AddAnotherFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
  60. int num_add)
  61. : pool_(pool), counter_(counter), num_add_(num_add) {
  62. functor_run = &AddAnotherFunctor::Run;
  63. inlineable = false;
  64. internal_next = this;
  65. internal_success = 0;
  66. }
  67. // When the functor gets to run in thread pool, it will take itself as first
  68. // argument and internal_success as second one.
  69. static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
  70. auto* callback = static_cast<AddAnotherFunctor*>(cb);
  71. if (--callback->num_add_ > 0) {
  72. callback->pool_->Add(new AddAnotherFunctor(
  73. callback->pool_, callback->counter_, callback->num_add_));
  74. } else {
  75. callback->counter_->DecrementCount();
  76. }
  77. // Suicides.
  78. delete callback;
  79. }
  80. private:
  81. grpc_core::ThreadPool* pool_;
  82. BlockingCounter* counter_;
  83. int num_add_;
  84. };
  85. template <int kConcurrentFunctor>
  86. static void ThreadPoolAddAnother(benchmark::State& state) {
  87. const int num_iterations = state.range(0);
  88. const int num_threads = state.range(1);
  89. // Number of adds done by each closure.
  90. const int num_add = num_iterations / kConcurrentFunctor;
  91. grpc_core::ThreadPool pool(num_threads);
  92. while (state.KeepRunningBatch(num_iterations)) {
  93. BlockingCounter counter(kConcurrentFunctor);
  94. for (int i = 0; i < kConcurrentFunctor; ++i) {
  95. pool.Add(new AddAnotherFunctor(&pool, &counter, num_add));
  96. }
  97. counter.Wait();
  98. }
  99. state.SetItemsProcessed(state.iterations());
  100. }
  101. // First pair of arguments is range for number of iterations (num_iterations).
  102. // Second pair of arguments is range for thread pool size (num_threads).
  103. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 1)->RangePair(524288, 524288, 1, 1024);
  104. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 4)->RangePair(524288, 524288, 1, 1024);
  105. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 8)->RangePair(524288, 524288, 1, 1024);
  106. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 16)
  107. ->RangePair(524288, 524288, 1, 1024);
  108. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 32)
  109. ->RangePair(524288, 524288, 1, 1024);
  110. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 64)
  111. ->RangePair(524288, 524288, 1, 1024);
  112. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 128)
  113. ->RangePair(524288, 524288, 1, 1024);
  114. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 512)
  115. ->RangePair(524288, 524288, 1, 1024);
  116. BENCHMARK_TEMPLATE(ThreadPoolAddAnother, 2048)
  117. ->RangePair(524288, 524288, 1, 1024);
  118. // A functor class that will delete self on end of running.
  119. class SuicideFunctorForAdd : public grpc_completion_queue_functor {
  120. public:
  121. explicit SuicideFunctorForAdd(BlockingCounter* counter) : counter_(counter) {
  122. functor_run = &SuicideFunctorForAdd::Run;
  123. inlineable = false;
  124. internal_next = this;
  125. internal_success = 0;
  126. }
  127. static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
  128. // On running, the first argument would be itself.
  129. auto* callback = static_cast<SuicideFunctorForAdd*>(cb);
  130. callback->counter_->DecrementCount();
  131. delete callback;
  132. }
  133. private:
  134. BlockingCounter* counter_;
  135. };
  136. // Performs the scenario of external thread(s) adding closures into pool.
  137. static void BM_ThreadPoolExternalAdd(benchmark::State& state) {
  138. static grpc_core::ThreadPool* external_add_pool = nullptr;
  139. int thread_idx = state.thread_index();
  140. // Setup for each run of test.
  141. if (thread_idx == 0) {
  142. const int num_threads = state.range(1);
  143. external_add_pool = new grpc_core::ThreadPool(num_threads);
  144. }
  145. const int num_iterations = state.range(0) / state.threads();
  146. while (state.KeepRunningBatch(num_iterations)) {
  147. BlockingCounter counter(num_iterations);
  148. for (int i = 0; i < num_iterations; ++i) {
  149. external_add_pool->Add(new SuicideFunctorForAdd(&counter));
  150. }
  151. counter.Wait();
  152. }
  153. // Teardown at the end of each test run.
  154. if (thread_idx == 0) {
  155. state.SetItemsProcessed(state.range(0));
  156. delete external_add_pool;
  157. }
  158. }
  159. BENCHMARK(BM_ThreadPoolExternalAdd)
  160. // First pair is range for number of iterations (num_iterations).
  161. // Second pair is range for thread pool size (num_threads).
  162. ->RangePair(524288, 524288, 1, 1024)
  163. ->ThreadRange(1, 256); // Concurrent external thread(s) up to 256
  164. // Functor (closure) that adds itself into pool repeatedly. By adding self, the
  165. // overhead would be low and can measure the time of add more accurately.
  166. class AddSelfFunctor : public grpc_completion_queue_functor {
  167. public:
  168. AddSelfFunctor(grpc_core::ThreadPool* pool, BlockingCounter* counter,
  169. int num_add)
  170. : pool_(pool), counter_(counter), num_add_(num_add) {
  171. functor_run = &AddSelfFunctor::Run;
  172. inlineable = false;
  173. internal_next = this;
  174. internal_success = 0;
  175. }
  176. // When the functor gets to run in thread pool, it will take itself as first
  177. // argument and internal_success as second one.
  178. static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
  179. auto* callback = static_cast<AddSelfFunctor*>(cb);
  180. if (--callback->num_add_ > 0) {
  181. callback->pool_->Add(cb);
  182. } else {
  183. callback->counter_->DecrementCount();
  184. // Suicides.
  185. delete callback;
  186. }
  187. }
  188. private:
  189. grpc_core::ThreadPool* pool_;
  190. BlockingCounter* counter_;
  191. int num_add_;
  192. };
  193. template <int kConcurrentFunctor>
  194. static void ThreadPoolAddSelf(benchmark::State& state) {
  195. const int num_iterations = state.range(0);
  196. const int num_threads = state.range(1);
  197. // Number of adds done by each closure.
  198. const int num_add = num_iterations / kConcurrentFunctor;
  199. grpc_core::ThreadPool pool(num_threads);
  200. while (state.KeepRunningBatch(num_iterations)) {
  201. BlockingCounter counter(kConcurrentFunctor);
  202. for (int i = 0; i < kConcurrentFunctor; ++i) {
  203. pool.Add(new AddSelfFunctor(&pool, &counter, num_add));
  204. }
  205. counter.Wait();
  206. }
  207. state.SetItemsProcessed(state.iterations());
  208. }
  209. // First pair of arguments is range for number of iterations (num_iterations).
  210. // Second pair of arguments is range for thread pool size (num_threads).
  211. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 1)->RangePair(524288, 524288, 1, 1024);
  212. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 4)->RangePair(524288, 524288, 1, 1024);
  213. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 8)->RangePair(524288, 524288, 1, 1024);
  214. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 16)->RangePair(524288, 524288, 1, 1024);
  215. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 32)->RangePair(524288, 524288, 1, 1024);
  216. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 64)->RangePair(524288, 524288, 1, 1024);
  217. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 128)->RangePair(524288, 524288, 1, 1024);
  218. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 512)->RangePair(524288, 524288, 1, 1024);
  219. BENCHMARK_TEMPLATE(ThreadPoolAddSelf, 2048)->RangePair(524288, 524288, 1, 1024);
  220. #if defined(__GNUC__) && !defined(SWIG)
  221. #if defined(__i386__) || defined(__x86_64__)
  222. #define CACHELINE_SIZE 64
  223. #elif defined(__powerpc64__)
  224. #define CACHELINE_SIZE 128
  225. #elif defined(__aarch64__)
  226. #define CACHELINE_SIZE 64
  227. #elif defined(__arm__)
  228. #if defined(__ARM_ARCH_5T__)
  229. #define CACHELINE_SIZE 32
  230. #elif defined(__ARM_ARCH_7A__)
  231. #define CACHELINE_SIZE 64
  232. #endif
  233. #endif
  234. #ifndef CACHELINE_SIZE
  235. #define CACHELINE_SIZE 64
  236. #endif
  237. #endif
  238. // A functor (closure) that simulates closures with small but non-trivial amount
  239. // of work.
  240. class ShortWorkFunctorForAdd : public grpc_completion_queue_functor {
  241. public:
  242. BlockingCounter* counter_;
  243. ShortWorkFunctorForAdd() {
  244. functor_run = &ShortWorkFunctorForAdd::Run;
  245. inlineable = false;
  246. internal_next = this;
  247. internal_success = 0;
  248. val_ = 0;
  249. }
  250. static void Run(grpc_completion_queue_functor* cb, int /*ok*/) {
  251. auto* callback = static_cast<ShortWorkFunctorForAdd*>(cb);
  252. // Uses pad to avoid compiler complaining unused variable error.
  253. callback->pad[0] = 0;
  254. for (int i = 0; i < 1000; ++i) {
  255. callback->val_++;
  256. }
  257. callback->counter_->DecrementCount();
  258. }
  259. private:
  260. char pad[CACHELINE_SIZE];
  261. volatile int val_;
  262. };
  263. // Simulates workloads where many short running callbacks are added to the
  264. // threadpool. The callbacks are not enough to keep all the workers busy
  265. // continuously so the number of workers running changes overtime.
  266. //
  267. // In effect this tests how well the threadpool avoids spurious wakeups.
  268. static void BM_SpikyLoad(benchmark::State& state) {
  269. const int num_threads = state.range(0);
  270. const int kNumSpikes = 1000;
  271. const int batch_size = 3 * num_threads;
  272. std::vector<ShortWorkFunctorForAdd> work_vector(batch_size);
  273. grpc_core::ThreadPool pool(num_threads);
  274. while (state.KeepRunningBatch(kNumSpikes * batch_size)) {
  275. for (int i = 0; i != kNumSpikes; ++i) {
  276. BlockingCounter counter(batch_size);
  277. for (auto& w : work_vector) {
  278. w.counter_ = &counter;
  279. pool.Add(&w);
  280. }
  281. counter.Wait();
  282. }
  283. }
  284. state.SetItemsProcessed(state.iterations() * batch_size);
  285. }
  286. BENCHMARK(BM_SpikyLoad)->Arg(1)->Arg(2)->Arg(4)->Arg(8)->Arg(16);
  287. } // namespace testing
  288. } // namespace grpc
  289. // Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
  290. // and others do not. This allows us to support both modes.
  291. namespace benchmark {
  292. void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
  293. } // namespace benchmark
  294. int main(int argc, char* argv[]) {
  295. grpc::testing::TestEnvironment env(argc, argv);
  296. LibraryInitializer libInit;
  297. ::benchmark::Initialize(&argc, argv);
  298. grpc::testing::InitTest(&argc, &argv, false);
  299. benchmark::RunTheBenchmarksNamespaced();
  300. return 0;
  301. }