activity_test.cc 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Copyright 2021 gRPC authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. #include "src/core/lib/promise/activity.h"
  15. #include <gmock/gmock.h>
  16. #include <gtest/gtest.h>
  17. #include "src/core/lib/promise/join.h"
  18. #include "src/core/lib/promise/promise.h"
  19. #include "src/core/lib/promise/seq.h"
  20. #include "src/core/lib/promise/wait_set.h"
  21. #include "test/core/promise/test_wakeup_schedulers.h"
  22. using testing::_;
  23. using testing::Mock;
  24. using testing::MockFunction;
  25. using testing::SaveArg;
  26. using testing::StrictMock;
  27. namespace grpc_core {
  28. // A simple Barrier type: stalls progress until it is 'cleared'.
  29. class Barrier {
  30. public:
  31. struct Result {};
  32. Promise<Result> Wait() {
  33. return [this]() -> Poll<Result> {
  34. absl::MutexLock lock(&mu_);
  35. if (cleared_) {
  36. return Result{};
  37. } else {
  38. return wait_set_.AddPending(Activity::current()->MakeOwningWaker());
  39. }
  40. };
  41. }
  42. void Clear() {
  43. mu_.Lock();
  44. cleared_ = true;
  45. auto wakeup = wait_set_.TakeWakeupSet();
  46. mu_.Unlock();
  47. wakeup.Wakeup();
  48. }
  49. private:
  50. absl::Mutex mu_;
  51. WaitSet wait_set_ ABSL_GUARDED_BY(mu_);
  52. bool cleared_ ABSL_GUARDED_BY(mu_) = false;
  53. };
  54. // A simple Barrier type: stalls progress until it is 'cleared'.
  55. // This variant supports only a single waiter.
  56. class SingleBarrier {
  57. public:
  58. struct Result {};
  59. Promise<Result> Wait() {
  60. return [this]() -> Poll<Result> {
  61. absl::MutexLock lock(&mu_);
  62. if (cleared_) {
  63. return Result{};
  64. } else {
  65. waker_ = Activity::current()->MakeOwningWaker();
  66. return Pending();
  67. }
  68. };
  69. }
  70. void Clear() {
  71. mu_.Lock();
  72. cleared_ = true;
  73. auto waker = std::move(waker_);
  74. mu_.Unlock();
  75. waker.Wakeup();
  76. }
  77. private:
  78. absl::Mutex mu_;
  79. Waker waker_ ABSL_GUARDED_BY(mu_);
  80. bool cleared_ ABSL_GUARDED_BY(mu_) = false;
  81. };
  82. TEST(ActivityTest, ImmediatelyCompleteWithSuccess) {
  83. StrictMock<MockFunction<void(absl::Status)>> on_done;
  84. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  85. MakeActivity(
  86. [] { return [] { return absl::OkStatus(); }; }, NoWakeupScheduler(),
  87. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  88. }
  89. TEST(ActivityTest, ImmediatelyCompleteWithFailure) {
  90. StrictMock<MockFunction<void(absl::Status)>> on_done;
  91. EXPECT_CALL(on_done, Call(absl::CancelledError()));
  92. MakeActivity(
  93. [] { return [] { return absl::CancelledError(); }; }, NoWakeupScheduler(),
  94. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  95. }
  96. TEST(ActivityTest, DropImmediately) {
  97. StrictMock<MockFunction<void(absl::Status)>> on_done;
  98. EXPECT_CALL(on_done, Call(absl::CancelledError()));
  99. MakeActivity(
  100. [] { return []() -> Poll<absl::Status> { return Pending(); }; },
  101. NoWakeupScheduler(),
  102. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  103. }
  104. template <typename B>
  105. class BarrierTest : public testing::Test {
  106. public:
  107. using Type = B;
  108. };
  109. using BarrierTestTypes = testing::Types<Barrier, SingleBarrier>;
  110. TYPED_TEST_SUITE(BarrierTest, BarrierTestTypes);
  111. TYPED_TEST(BarrierTest, Barrier) {
  112. typename TestFixture::Type b;
  113. StrictMock<MockFunction<void(absl::Status)>> on_done;
  114. auto activity = MakeActivity(
  115. [&b] {
  116. return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
  117. return absl::OkStatus();
  118. });
  119. },
  120. InlineWakeupScheduler(),
  121. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  122. // Clearing the barrier should let the activity proceed to return a result.
  123. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  124. b.Clear();
  125. }
  126. TYPED_TEST(BarrierTest, BarrierPing) {
  127. typename TestFixture::Type b1;
  128. typename TestFixture::Type b2;
  129. StrictMock<MockFunction<void(absl::Status)>> on_done1;
  130. StrictMock<MockFunction<void(absl::Status)>> on_done2;
  131. MockCallbackScheduler scheduler1;
  132. MockCallbackScheduler scheduler2;
  133. auto activity1 = MakeActivity(
  134. [&b1, &b2] {
  135. return Seq(b1.Wait(), [&b2](typename TestFixture::Type::Result) {
  136. // Clear the barrier whilst executing an activity
  137. b2.Clear();
  138. return absl::OkStatus();
  139. });
  140. },
  141. UseMockCallbackScheduler{&scheduler1},
  142. [&on_done1](absl::Status status) { on_done1.Call(std::move(status)); });
  143. auto activity2 = MakeActivity(
  144. [&b2] {
  145. return Seq(b2.Wait(), [](typename TestFixture::Type::Result) {
  146. return absl::OkStatus();
  147. });
  148. },
  149. UseMockCallbackScheduler{&scheduler2},
  150. [&on_done2](absl::Status status) { on_done2.Call(std::move(status)); });
  151. // Since barrier triggers inside activity1 promise, activity2 wakeup will be
  152. // scheduled from a callback.
  153. std::function<void()> cb1;
  154. std::function<void()> cb2;
  155. EXPECT_CALL(scheduler1, Schedule(_)).WillOnce(SaveArg<0>(&cb1));
  156. b1.Clear();
  157. Mock::VerifyAndClearExpectations(&scheduler1);
  158. EXPECT_CALL(on_done1, Call(absl::OkStatus()));
  159. EXPECT_CALL(scheduler2, Schedule(_)).WillOnce(SaveArg<0>(&cb2));
  160. cb1();
  161. Mock::VerifyAndClearExpectations(&on_done1);
  162. EXPECT_CALL(on_done2, Call(absl::OkStatus()));
  163. cb2();
  164. }
  165. TYPED_TEST(BarrierTest, WakeSelf) {
  166. typename TestFixture::Type b;
  167. StrictMock<MockFunction<void(absl::Status)>> on_done;
  168. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  169. MakeActivity(
  170. [&b] {
  171. return Seq(Join(b.Wait(),
  172. [&b] {
  173. b.Clear();
  174. return 1;
  175. }),
  176. [](std::tuple<typename TestFixture::Type::Result, int>) {
  177. return absl::OkStatus();
  178. });
  179. },
  180. NoWakeupScheduler(),
  181. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  182. }
  183. TYPED_TEST(BarrierTest, WakeAfterDestruction) {
  184. typename TestFixture::Type b;
  185. {
  186. StrictMock<MockFunction<void(absl::Status)>> on_done;
  187. EXPECT_CALL(on_done, Call(absl::CancelledError()));
  188. MakeActivity(
  189. [&b] {
  190. return Seq(b.Wait(), [](typename TestFixture::Type::Result) {
  191. return absl::OkStatus();
  192. });
  193. },
  194. InlineWakeupScheduler(),
  195. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  196. }
  197. b.Clear();
  198. }
  199. TEST(ActivityTest, ForceWakeup) {
  200. StrictMock<MockFunction<void(absl::Status)>> on_done;
  201. int run_count = 0;
  202. auto activity = MakeActivity(
  203. [&run_count]() -> Poll<absl::Status> {
  204. ++run_count;
  205. switch (run_count) {
  206. case 1:
  207. return Pending{};
  208. case 2:
  209. return absl::OkStatus();
  210. default:
  211. abort();
  212. }
  213. },
  214. InlineWakeupScheduler(),
  215. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  216. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  217. activity->ForceWakeup();
  218. }
  219. struct TestContext {
  220. bool* done;
  221. };
  222. template <>
  223. struct ContextType<TestContext> {};
  224. TEST(ActivityTest, WithContext) {
  225. bool done = false;
  226. StrictMock<MockFunction<void(absl::Status)>> on_done;
  227. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  228. MakeActivity(
  229. [] {
  230. *GetContext<TestContext>()->done = true;
  231. return Immediate(absl::OkStatus());
  232. },
  233. NoWakeupScheduler(),
  234. [&on_done](absl::Status status) { on_done.Call(std::move(status)); },
  235. TestContext{&done});
  236. EXPECT_TRUE(done);
  237. }
  238. TEST(ActivityTest, CanCancelDuringExecution) {
  239. ActivityPtr activity;
  240. StrictMock<MockFunction<void(absl::Status)>> on_done;
  241. int run_count = 0;
  242. activity = MakeActivity(
  243. [&activity, &run_count]() -> Poll<absl::Status> {
  244. ++run_count;
  245. switch (run_count) {
  246. case 1:
  247. return Pending{};
  248. case 2:
  249. activity.reset();
  250. return Pending{};
  251. default:
  252. abort();
  253. }
  254. },
  255. InlineWakeupScheduler(),
  256. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  257. EXPECT_CALL(on_done, Call(absl::CancelledError()));
  258. activity->ForceWakeup();
  259. }
  260. TEST(ActivityTest, CanCancelDuringSuccessfulExecution) {
  261. ActivityPtr activity;
  262. StrictMock<MockFunction<void(absl::Status)>> on_done;
  263. int run_count = 0;
  264. activity = MakeActivity(
  265. [&activity, &run_count]() -> Poll<absl::Status> {
  266. ++run_count;
  267. switch (run_count) {
  268. case 1:
  269. return Pending{};
  270. case 2:
  271. activity.reset();
  272. return absl::OkStatus();
  273. default:
  274. abort();
  275. }
  276. },
  277. InlineWakeupScheduler(),
  278. [&on_done](absl::Status status) { on_done.Call(std::move(status)); });
  279. EXPECT_CALL(on_done, Call(absl::OkStatus()));
  280. activity->ForceWakeup();
  281. }
  282. TEST(WakerTest, CanWakeupEmptyWaker) {
  283. // Empty wakers should not do anything upon wakeup.
  284. Waker().Wakeup();
  285. }
  286. } // namespace grpc_core
  287. int main(int argc, char** argv) {
  288. ::testing::InitGoogleTest(&argc, argv);
  289. return RUN_ALL_TESTS();
  290. }