context_allocator_end2end_test.cc 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <algorithm>
  19. #include <atomic>
  20. #include <condition_variable>
  21. #include <functional>
  22. #include <memory>
  23. #include <mutex>
  24. #include <sstream>
  25. #include <thread>
  26. #include <gtest/gtest.h>
  27. #include <grpc/impl/codegen/log.h>
  28. #include <grpcpp/channel.h>
  29. #include <grpcpp/client_context.h>
  30. #include <grpcpp/create_channel.h>
  31. #include <grpcpp/server.h>
  32. #include <grpcpp/server_builder.h>
  33. #include <grpcpp/server_context.h>
  34. #include <grpcpp/support/client_callback.h>
  35. #include <grpcpp/support/message_allocator.h>
  36. #include "src/core/lib/iomgr/iomgr.h"
  37. #include "src/proto/grpc/testing/echo.grpc.pb.h"
  38. #include "test/core/util/port.h"
  39. #include "test/core/util/test_config.h"
  40. #include "test/cpp/end2end/test_service_impl.h"
  41. #include "test/cpp/util/test_credentials_provider.h"
  42. namespace grpc {
  43. namespace testing {
  44. namespace {
  45. enum class Protocol { INPROC, TCP };
  46. class TestScenario {
  47. public:
  48. TestScenario(Protocol protocol, const std::string& creds_type)
  49. : protocol(protocol), credentials_type(creds_type) {}
  50. void Log() const;
  51. Protocol protocol;
  52. const std::string credentials_type;
  53. };
  54. std::ostream& operator<<(std::ostream& out, const TestScenario& scenario) {
  55. return out << "TestScenario{protocol="
  56. << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
  57. << "," << scenario.credentials_type << "}";
  58. }
  59. void TestScenario::Log() const {
  60. std::ostringstream out;
  61. out << *this;
  62. gpr_log(GPR_INFO, "%s", out.str().c_str());
  63. }
  64. class ContextAllocatorEnd2endTestBase
  65. : public ::testing::TestWithParam<TestScenario> {
  66. protected:
  67. static void SetUpTestCase() { grpc_init(); }
  68. static void TearDownTestCase() { grpc_shutdown(); }
  69. ContextAllocatorEnd2endTestBase() {}
  70. ~ContextAllocatorEnd2endTestBase() override = default;
  71. void SetUp() override { GetParam().Log(); }
  72. void CreateServer(std::unique_ptr<grpc::ContextAllocator> context_allocator) {
  73. ServerBuilder builder;
  74. auto server_creds = GetCredentialsProvider()->GetServerCredentials(
  75. GetParam().credentials_type);
  76. if (GetParam().protocol == Protocol::TCP) {
  77. picked_port_ = grpc_pick_unused_port_or_die();
  78. server_address_ << "localhost:" << picked_port_;
  79. builder.AddListeningPort(server_address_.str(), server_creds);
  80. }
  81. builder.SetContextAllocator(std::move(context_allocator));
  82. builder.RegisterService(&callback_service_);
  83. server_ = builder.BuildAndStart();
  84. }
  85. void DestroyServer() {
  86. if (server_) {
  87. server_->Shutdown();
  88. server_.reset();
  89. }
  90. }
  91. void ResetStub() {
  92. ChannelArguments args;
  93. auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
  94. GetParam().credentials_type, &args);
  95. switch (GetParam().protocol) {
  96. case Protocol::TCP:
  97. channel_ = grpc::CreateCustomChannel(server_address_.str(),
  98. channel_creds, args);
  99. break;
  100. case Protocol::INPROC:
  101. channel_ = server_->InProcessChannel(args);
  102. break;
  103. default:
  104. assert(false);
  105. }
  106. stub_ = EchoTestService::NewStub(channel_);
  107. }
  108. void TearDown() override {
  109. DestroyServer();
  110. if (picked_port_ > 0) {
  111. grpc_recycle_unused_port(picked_port_);
  112. }
  113. }
  114. void SendRpcs(int num_rpcs) {
  115. std::string test_string("");
  116. for (int i = 0; i < num_rpcs; i++) {
  117. EchoRequest request;
  118. EchoResponse response;
  119. ClientContext cli_ctx;
  120. test_string += std::string(1024, 'x');
  121. request.set_message(test_string);
  122. std::string val;
  123. cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
  124. std::mutex mu;
  125. std::condition_variable cv;
  126. bool done = false;
  127. stub_->async()->Echo(
  128. &cli_ctx, &request, &response,
  129. [&request, &response, &done, &mu, &cv, val](Status s) {
  130. GPR_ASSERT(s.ok());
  131. EXPECT_EQ(request.message(), response.message());
  132. std::lock_guard<std::mutex> l(mu);
  133. done = true;
  134. cv.notify_one();
  135. });
  136. std::unique_lock<std::mutex> l(mu);
  137. while (!done) {
  138. cv.wait(l);
  139. }
  140. }
  141. }
  142. int picked_port_{0};
  143. std::shared_ptr<Channel> channel_;
  144. std::unique_ptr<EchoTestService::Stub> stub_;
  145. CallbackTestServiceImpl callback_service_;
  146. std::unique_ptr<Server> server_;
  147. std::ostringstream server_address_;
  148. };
  149. class DefaultContextAllocatorTest : public ContextAllocatorEnd2endTestBase {};
  150. TEST_P(DefaultContextAllocatorTest, SimpleRpc) {
  151. const int kRpcCount = 10;
  152. CreateServer(nullptr);
  153. ResetStub();
  154. SendRpcs(kRpcCount);
  155. }
  156. class NullContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
  157. public:
  158. class NullAllocator : public grpc::ContextAllocator {
  159. public:
  160. NullAllocator(std::atomic<int>* allocation_count,
  161. std::atomic<int>* deallocation_count)
  162. : allocation_count_(allocation_count),
  163. deallocation_count_(deallocation_count) {}
  164. grpc::CallbackServerContext* NewCallbackServerContext() override {
  165. allocation_count_->fetch_add(1, std::memory_order_relaxed);
  166. return nullptr;
  167. }
  168. GenericCallbackServerContext* NewGenericCallbackServerContext() override {
  169. allocation_count_->fetch_add(1, std::memory_order_relaxed);
  170. return nullptr;
  171. }
  172. void Release(
  173. grpc::CallbackServerContext* /*callback_server_context*/) override {
  174. deallocation_count_->fetch_add(1, std::memory_order_relaxed);
  175. }
  176. void Release(
  177. GenericCallbackServerContext* /*generic_callback_server_context*/)
  178. override {
  179. deallocation_count_->fetch_add(1, std::memory_order_relaxed);
  180. }
  181. std::atomic<int>* allocation_count_;
  182. std::atomic<int>* deallocation_count_;
  183. };
  184. };
  185. TEST_P(NullContextAllocatorTest, UnaryRpc) {
  186. const int kRpcCount = 10;
  187. std::atomic<int> allocation_count{0};
  188. std::atomic<int> deallocation_count{0};
  189. std::unique_ptr<NullAllocator> allocator(
  190. new NullAllocator(&allocation_count, &deallocation_count));
  191. CreateServer(std::move(allocator));
  192. ResetStub();
  193. SendRpcs(kRpcCount);
  194. // messages_deallocaton_count is updated in Release after server side
  195. // OnDone.
  196. DestroyServer();
  197. EXPECT_EQ(kRpcCount, allocation_count);
  198. EXPECT_EQ(kRpcCount, deallocation_count);
  199. }
  200. class SimpleContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
  201. public:
  202. class SimpleAllocator : public grpc::ContextAllocator {
  203. public:
  204. SimpleAllocator(std::atomic<int>* allocation_count,
  205. std::atomic<int>* deallocation_count)
  206. : allocation_count_(allocation_count),
  207. deallocation_count_(deallocation_count) {}
  208. grpc::CallbackServerContext* NewCallbackServerContext() override {
  209. allocation_count_->fetch_add(1, std::memory_order_relaxed);
  210. return new grpc::CallbackServerContext();
  211. }
  212. GenericCallbackServerContext* NewGenericCallbackServerContext() override {
  213. allocation_count_->fetch_add(1, std::memory_order_relaxed);
  214. return new GenericCallbackServerContext();
  215. }
  216. void Release(
  217. grpc::CallbackServerContext* callback_server_context) override {
  218. deallocation_count_->fetch_add(1, std::memory_order_relaxed);
  219. delete callback_server_context;
  220. }
  221. void Release(GenericCallbackServerContext* generic_callback_server_context)
  222. override {
  223. deallocation_count_->fetch_add(1, std::memory_order_relaxed);
  224. delete generic_callback_server_context;
  225. }
  226. std::atomic<int>* allocation_count_;
  227. std::atomic<int>* deallocation_count_;
  228. };
  229. };
  230. TEST_P(SimpleContextAllocatorTest, UnaryRpc) {
  231. const int kRpcCount = 10;
  232. std::atomic<int> allocation_count{0};
  233. std::atomic<int> deallocation_count{0};
  234. std::unique_ptr<SimpleAllocator> allocator(
  235. new SimpleAllocator(&allocation_count, &deallocation_count));
  236. CreateServer(std::move(allocator));
  237. ResetStub();
  238. SendRpcs(kRpcCount);
  239. // messages_deallocaton_count is updated in Release after server side
  240. // OnDone.
  241. DestroyServer();
  242. EXPECT_EQ(kRpcCount, allocation_count);
  243. EXPECT_EQ(kRpcCount, deallocation_count);
  244. }
  245. std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
  246. std::vector<TestScenario> scenarios;
  247. std::vector<std::string> credentials_types{
  248. GetCredentialsProvider()->GetSecureCredentialsTypeList()};
  249. auto insec_ok = [] {
  250. // Only allow insecure credentials type when it is registered with the
  251. // provider. User may create providers that do not have insecure.
  252. return GetCredentialsProvider()->GetChannelCredentials(
  253. kInsecureCredentialsType, nullptr) != nullptr;
  254. };
  255. if (test_insecure && insec_ok()) {
  256. credentials_types.push_back(kInsecureCredentialsType);
  257. }
  258. GPR_ASSERT(!credentials_types.empty());
  259. Protocol parr[]{Protocol::INPROC, Protocol::TCP};
  260. for (Protocol p : parr) {
  261. for (const auto& cred : credentials_types) {
  262. if (p == Protocol::INPROC &&
  263. (cred != kInsecureCredentialsType || !insec_ok())) {
  264. continue;
  265. }
  266. scenarios.emplace_back(p, cred);
  267. }
  268. }
  269. return scenarios;
  270. }
  271. // TODO(ddyihai): adding client streaming/server streaming/bidi streaming
  272. // test.
  273. INSTANTIATE_TEST_SUITE_P(DefaultContextAllocatorTest,
  274. DefaultContextAllocatorTest,
  275. ::testing::ValuesIn(CreateTestScenarios(true)));
  276. INSTANTIATE_TEST_SUITE_P(NullContextAllocatorTest, NullContextAllocatorTest,
  277. ::testing::ValuesIn(CreateTestScenarios(true)));
  278. INSTANTIATE_TEST_SUITE_P(SimpleContextAllocatorTest, SimpleContextAllocatorTest,
  279. ::testing::ValuesIn(CreateTestScenarios(true)));
  280. } // namespace
  281. } // namespace testing
  282. } // namespace grpc
  283. int main(int argc, char** argv) {
  284. grpc::testing::TestEnvironment env(argc, argv);
  285. ::testing::InitGoogleTest(&argc, argv);
  286. int ret = RUN_ALL_TESTS();
  287. return ret;
  288. }