observable_test.cc 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Copyright 2021 gRPC authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. #include "src/core/lib/promise/observable.h"
  15. #include <gmock/gmock.h>
  16. #include <gtest/gtest.h>
  17. #include "src/core/lib/promise/promise.h"
  18. #include "src/core/lib/promise/seq.h"
  19. #include "test/core/promise/test_wakeup_schedulers.h"
  20. using testing::MockFunction;
  21. using testing::StrictMock;
  22. namespace grpc_core {
  23. // A simple Barrier type: stalls progress until it is 'cleared'.
  24. class Barrier {
  25. public:
  26. struct Result {};
  27. Promise<Result> Wait() {
  28. return [this]() -> Poll<Result> {
  29. absl::MutexLock lock(&mu_);
  30. if (cleared_) {
  31. return Result{};
  32. } else {
  33. return wait_set_.AddPending(Activity::current()->MakeOwningWaker());
  34. }
  35. };
  36. }
  37. void Clear() {
  38. mu_.Lock();
  39. cleared_ = true;
  40. auto wakeup = wait_set_.TakeWakeupSet();
  41. mu_.Unlock();
  42. wakeup.Wakeup();
  43. }
  44. private:
  45. absl::Mutex mu_;
  46. WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
  47. bool cleared_ ABSL_GUARDED_BY(mu_) = false;
  48. };
  49. TEST(ObservableTest, CanPushAndGet) {
  50. StrictMock<MockFunction<void(absl::Status)>> on_done;
  51. Observable<int> observable;
  52. auto observer = observable.MakeObserver();
  53. auto activity = MakeActivity(
  54. [&observer]() {
  55. return Seq(observer.Get(), [](absl::optional<int> i) {
  56. return i == 42 ? absl::OkStatus() : absl::UnknownError("expected 42");
  57. });
  58. },
  59. InlineWakeupScheduler(),
  60. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  61. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  62. observable.Push(42);
  63. }
  64. TEST(ObservableTest, CanNext) {
  65. StrictMock<MockFunction<void(absl::Status)>> on_done;
  66. Observable<int> observable;
  67. auto observer = observable.MakeObserver();
  68. auto activity = MakeActivity(
  69. [&observer]() {
  70. return Seq(
  71. observer.Get(),
  72. [&observer](absl::optional<int> i) {
  73. EXPECT_EQ(i, 42);
  74. return observer.Next();
  75. },
  76. [](absl::optional<int> i) {
  77. return i == 1 ? absl::OkStatus()
  78. : absl::UnknownError("expected 1");
  79. });
  80. },
  81. InlineWakeupScheduler(),
  82. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  83. observable.Push(42);
  84. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  85. observable.Push(1);
  86. }
  87. TEST(ObservableTest, CanWatch) {
  88. StrictMock<MockFunction<void(absl::Status)>> on_done;
  89. Observable<int> observable;
  90. Barrier barrier;
  91. auto activity = MakeActivity(
  92. [&observable, &barrier]() {
  93. return observable.Watch(
  94. [&barrier](int x,
  95. WatchCommitter* committer) -> Promise<absl::Status> {
  96. if (x == 3) {
  97. committer->Commit();
  98. return Seq(barrier.Wait(), Immediate(absl::OkStatus()));
  99. } else {
  100. return Never<absl::Status>();
  101. }
  102. });
  103. },
  104. InlineWakeupScheduler(),
  105. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  106. observable.Push(1);
  107. observable.Push(2);
  108. observable.Push(3);
  109. observable.Push(4);
  110. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  111. barrier.Clear();
  112. }
  113. } // namespace grpc_core
  114. int main(int argc, char** argv) {
  115. ::testing::InitGoogleTest(&argc, argv);
  116. return RUN_ALL_TESTS();
  117. }