sequence_lock_test.cc 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // Copyright 2020 The Abseil Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // https://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. #include "absl/flags/internal/sequence_lock.h"
  15. #include <algorithm>
  16. #include <atomic>
  17. #include <thread> // NOLINT(build/c++11)
  18. #include <tuple>
  19. #include <vector>
  20. #include "gtest/gtest.h"
  21. #include "absl/base/internal/sysinfo.h"
  22. #include "absl/container/fixed_array.h"
  23. #include "absl/time/clock.h"
  24. namespace {
  25. namespace flags = absl::flags_internal;
  26. class ConcurrentSequenceLockTest
  27. : public testing::TestWithParam<std::tuple<int, int>> {
  28. public:
  29. ConcurrentSequenceLockTest()
  30. : buf_bytes_(std::get<0>(GetParam())),
  31. num_threads_(std::get<1>(GetParam())) {}
  32. protected:
  33. const int buf_bytes_;
  34. const int num_threads_;
  35. };
  36. TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) {
  37. const int buf_words =
  38. flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t);
  39. // The buffer that will be protected by the SequenceLock.
  40. absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words);
  41. for (auto& v : protected_buf) v = -1;
  42. flags::SequenceLock seq_lock;
  43. std::atomic<bool> stop{false};
  44. std::atomic<int64_t> bad_reads{0};
  45. std::atomic<int64_t> good_reads{0};
  46. std::atomic<int64_t> unsuccessful_reads{0};
  47. // Start a bunch of threads which read 'protected_buf' under the sequence
  48. // lock. The main thread will concurrently update 'protected_buf'. The updates
  49. // always consist of an array of identical integers. The reader ensures that
  50. // any data it reads matches that pattern (i.e. the reads are not "torn").
  51. std::vector<std::thread> threads;
  52. for (int i = 0; i < num_threads_; i++) {
  53. threads.emplace_back([&]() {
  54. absl::FixedArray<char> local_buf(buf_bytes_);
  55. while (!stop.load(std::memory_order_relaxed)) {
  56. if (seq_lock.TryRead(local_buf.data(), protected_buf.data(),
  57. buf_bytes_)) {
  58. bool good = true;
  59. for (const auto& v : local_buf) {
  60. if (v != local_buf[0]) good = false;
  61. }
  62. if (good) {
  63. good_reads.fetch_add(1, std::memory_order_relaxed);
  64. } else {
  65. bad_reads.fetch_add(1, std::memory_order_relaxed);
  66. }
  67. } else {
  68. unsuccessful_reads.fetch_add(1, std::memory_order_relaxed);
  69. }
  70. }
  71. });
  72. }
  73. while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) {
  74. absl::SleepFor(absl::Milliseconds(1));
  75. }
  76. seq_lock.MarkInitialized();
  77. // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems
  78. // somewhat unfair and without an explicit timeout for this loop, the tests
  79. // can run a long time.
  80. absl::Time deadline = absl::Now() + absl::Seconds(5);
  81. for (int i = 0; i < 100 && absl::Now() < deadline; i++) {
  82. absl::FixedArray<char> writer_buf(buf_bytes_);
  83. for (auto& v : writer_buf) v = i;
  84. seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_);
  85. absl::SleepFor(absl::Microseconds(10));
  86. }
  87. stop.store(true, std::memory_order_relaxed);
  88. for (auto& t : threads) t.join();
  89. ASSERT_GE(good_reads, 0);
  90. ASSERT_EQ(bad_reads, 0);
  91. }
  92. // Simple helper for generating a range of thread counts.
  93. // Generates [low, low*scale, low*scale^2, ...high)
  94. // (even if high is between low*scale^k and low*scale^(k+1)).
  95. std::vector<int> MultiplicativeRange(int low, int high, int scale) {
  96. std::vector<int> result;
  97. for (int current = low; current < high; current *= scale) {
  98. result.push_back(current);
  99. }
  100. result.push_back(high);
  101. return result;
  102. }
  103. #ifndef ABSL_HAVE_THREAD_SANITIZER
  104. const int kMaxThreads = absl::base_internal::NumCPUs();
  105. #else
  106. // With TSAN, a lot of threads contending for atomic access on the sequence
  107. // lock make this test run too slowly.
  108. const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4);
  109. #endif
  110. // Return all of the interesting buffer sizes worth testing:
  111. // powers of two and adjacent values.
  112. std::vector<int> InterestingBufferSizes() {
  113. std::vector<int> ret;
  114. for (int v : MultiplicativeRange(1, 128, 2)) {
  115. ret.push_back(v);
  116. if (v > 1) {
  117. ret.push_back(v - 1);
  118. }
  119. ret.push_back(v + 1);
  120. }
  121. return ret;
  122. }
  123. INSTANTIATE_TEST_SUITE_P(
  124. TestManyByteSizes, ConcurrentSequenceLockTest,
  125. testing::Combine(
  126. // Buffer size (bytes).
  127. testing::ValuesIn(InterestingBufferSizes()),
  128. // Number of reader threads.
  129. testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2))));
  130. // Simple single-threaded test, parameterized by the size of the buffer to be
  131. // protected.
  132. class SequenceLockTest : public testing::TestWithParam<int> {};
  133. TEST_P(SequenceLockTest, SingleThreaded) {
  134. const int size = GetParam();
  135. absl::FixedArray<std::atomic<uint64_t>> protected_buf(
  136. flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t));
  137. flags::SequenceLock seq_lock;
  138. seq_lock.MarkInitialized();
  139. std::vector<char> src_buf(size, 'x');
  140. seq_lock.Write(protected_buf.data(), src_buf.data(), size);
  141. std::vector<char> dst_buf(size, '0');
  142. ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size));
  143. ASSERT_EQ(src_buf, dst_buf);
  144. }
  145. INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest,
  146. // Buffer size (bytes).
  147. testing::Range(1, 128));
  148. } // namespace