123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- // Copyright 2020 The Abseil Authors.
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // https://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- #include "absl/flags/internal/sequence_lock.h"
- #include <algorithm>
- #include <atomic>
- #include <thread> // NOLINT(build/c++11)
- #include <tuple>
- #include <vector>
- #include "gtest/gtest.h"
- #include "absl/base/internal/sysinfo.h"
- #include "absl/container/fixed_array.h"
- #include "absl/time/clock.h"
- namespace {
- namespace flags = absl::flags_internal;
- class ConcurrentSequenceLockTest
- : public testing::TestWithParam<std::tuple<int, int>> {
- public:
- ConcurrentSequenceLockTest()
- : buf_bytes_(std::get<0>(GetParam())),
- num_threads_(std::get<1>(GetParam())) {}
- protected:
- const int buf_bytes_;
- const int num_threads_;
- };
- TEST_P(ConcurrentSequenceLockTest, ReadAndWrite) {
- const int buf_words =
- flags::AlignUp(buf_bytes_, sizeof(uint64_t)) / sizeof(uint64_t);
- // The buffer that will be protected by the SequenceLock.
- absl::FixedArray<std::atomic<uint64_t>> protected_buf(buf_words);
- for (auto& v : protected_buf) v = -1;
- flags::SequenceLock seq_lock;
- std::atomic<bool> stop{false};
- std::atomic<int64_t> bad_reads{0};
- std::atomic<int64_t> good_reads{0};
- std::atomic<int64_t> unsuccessful_reads{0};
- // Start a bunch of threads which read 'protected_buf' under the sequence
- // lock. The main thread will concurrently update 'protected_buf'. The updates
- // always consist of an array of identical integers. The reader ensures that
- // any data it reads matches that pattern (i.e. the reads are not "torn").
- std::vector<std::thread> threads;
- for (int i = 0; i < num_threads_; i++) {
- threads.emplace_back([&]() {
- absl::FixedArray<char> local_buf(buf_bytes_);
- while (!stop.load(std::memory_order_relaxed)) {
- if (seq_lock.TryRead(local_buf.data(), protected_buf.data(),
- buf_bytes_)) {
- bool good = true;
- for (const auto& v : local_buf) {
- if (v != local_buf[0]) good = false;
- }
- if (good) {
- good_reads.fetch_add(1, std::memory_order_relaxed);
- } else {
- bad_reads.fetch_add(1, std::memory_order_relaxed);
- }
- } else {
- unsuccessful_reads.fetch_add(1, std::memory_order_relaxed);
- }
- }
- });
- }
- while (unsuccessful_reads.load(std::memory_order_relaxed) < num_threads_) {
- absl::SleepFor(absl::Milliseconds(1));
- }
- seq_lock.MarkInitialized();
- // Run a maximum of 5 seconds. On Windows, the scheduler behavior seems
- // somewhat unfair and without an explicit timeout for this loop, the tests
- // can run a long time.
- absl::Time deadline = absl::Now() + absl::Seconds(5);
- for (int i = 0; i < 100 && absl::Now() < deadline; i++) {
- absl::FixedArray<char> writer_buf(buf_bytes_);
- for (auto& v : writer_buf) v = i;
- seq_lock.Write(protected_buf.data(), writer_buf.data(), buf_bytes_);
- absl::SleepFor(absl::Microseconds(10));
- }
- stop.store(true, std::memory_order_relaxed);
- for (auto& t : threads) t.join();
- ASSERT_GE(good_reads, 0);
- ASSERT_EQ(bad_reads, 0);
- }
- // Simple helper for generating a range of thread counts.
- // Generates [low, low*scale, low*scale^2, ...high)
- // (even if high is between low*scale^k and low*scale^(k+1)).
- std::vector<int> MultiplicativeRange(int low, int high, int scale) {
- std::vector<int> result;
- for (int current = low; current < high; current *= scale) {
- result.push_back(current);
- }
- result.push_back(high);
- return result;
- }
- #ifndef ABSL_HAVE_THREAD_SANITIZER
- const int kMaxThreads = absl::base_internal::NumCPUs();
- #else
- // With TSAN, a lot of threads contending for atomic access on the sequence
- // lock make this test run too slowly.
- const int kMaxThreads = std::min(absl::base_internal::NumCPUs(), 4);
- #endif
- // Return all of the interesting buffer sizes worth testing:
- // powers of two and adjacent values.
- std::vector<int> InterestingBufferSizes() {
- std::vector<int> ret;
- for (int v : MultiplicativeRange(1, 128, 2)) {
- ret.push_back(v);
- if (v > 1) {
- ret.push_back(v - 1);
- }
- ret.push_back(v + 1);
- }
- return ret;
- }
- INSTANTIATE_TEST_SUITE_P(
- TestManyByteSizes, ConcurrentSequenceLockTest,
- testing::Combine(
- // Buffer size (bytes).
- testing::ValuesIn(InterestingBufferSizes()),
- // Number of reader threads.
- testing::ValuesIn(MultiplicativeRange(1, kMaxThreads, 2))));
- // Simple single-threaded test, parameterized by the size of the buffer to be
- // protected.
- class SequenceLockTest : public testing::TestWithParam<int> {};
- TEST_P(SequenceLockTest, SingleThreaded) {
- const int size = GetParam();
- absl::FixedArray<std::atomic<uint64_t>> protected_buf(
- flags::AlignUp(size, sizeof(uint64_t)) / sizeof(uint64_t));
- flags::SequenceLock seq_lock;
- seq_lock.MarkInitialized();
- std::vector<char> src_buf(size, 'x');
- seq_lock.Write(protected_buf.data(), src_buf.data(), size);
- std::vector<char> dst_buf(size, '0');
- ASSERT_TRUE(seq_lock.TryRead(dst_buf.data(), protected_buf.data(), size));
- ASSERT_EQ(src_buf, dst_buf);
- }
- INSTANTIATE_TEST_SUITE_P(TestManyByteSizes, SequenceLockTest,
- // Buffer size (bytes).
- testing::Range(1, 128));
- } // namespace
|