xds_server.cc 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. //
  2. // Copyright 2017 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. //
  16. #include "test/cpp/end2end/xds/xds_server.h"
  17. #include <deque>
  18. #include <set>
  19. #include <string>
  20. #include <thread>
  21. #include <vector>
  22. #include <gmock/gmock.h>
  23. #include <gtest/gtest.h>
  24. #include "absl/types/optional.h"
  25. #include <grpc/support/log.h>
  26. #include "src/core/lib/address_utils/parse_address.h"
  27. #include "src/core/lib/gprpp/sync.h"
  28. #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
  29. #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
  30. #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
  31. #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
  32. #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
  33. namespace grpc {
  34. namespace testing {
  35. //
  36. // AdsServiceImpl
  37. //
  38. void AdsServiceImpl::SetResource(google::protobuf::Any resource,
  39. const std::string& type_url,
  40. const std::string& name) {
  41. grpc_core::MutexLock lock(&ads_mu_);
  42. ResourceTypeState& resource_type_state = resource_map_[type_url];
  43. ++resource_type_state.resource_type_version;
  44. ResourceState& resource_state = resource_type_state.resource_name_map[name];
  45. resource_state.resource_type_version =
  46. resource_type_state.resource_type_version;
  47. resource_state.resource = std::move(resource);
  48. gpr_log(GPR_INFO,
  49. "ADS[%p]: Updating %s resource %s; resource_type_version now %u",
  50. this, type_url.c_str(), name.c_str(),
  51. resource_type_state.resource_type_version);
  52. for (SubscriptionState* subscription : resource_state.subscriptions) {
  53. subscription->update_queue->emplace_back(type_url, name);
  54. }
  55. }
  56. void AdsServiceImpl::UnsetResource(const std::string& type_url,
  57. const std::string& name) {
  58. grpc_core::MutexLock lock(&ads_mu_);
  59. ResourceTypeState& resource_type_state = resource_map_[type_url];
  60. ++resource_type_state.resource_type_version;
  61. ResourceState& resource_state = resource_type_state.resource_name_map[name];
  62. resource_state.resource_type_version =
  63. resource_type_state.resource_type_version;
  64. resource_state.resource.reset();
  65. gpr_log(GPR_INFO,
  66. "ADS[%p]: Unsetting %s resource %s; resource_type_version now %u",
  67. this, type_url.c_str(), name.c_str(),
  68. resource_type_state.resource_type_version);
  69. for (SubscriptionState* subscription : resource_state.subscriptions) {
  70. subscription->update_queue->emplace_back(type_url, name);
  71. }
  72. }
  73. // Checks whether the client needs to receive a newer version of
  74. // the resource.
  75. bool AdsServiceImpl::ClientNeedsResourceUpdate(
  76. const ResourceTypeState& resource_type_state,
  77. const ResourceState& resource_state, int client_resource_type_version) {
  78. return client_resource_type_version <
  79. resource_type_state.resource_type_version &&
  80. resource_state.resource_type_version <=
  81. resource_type_state.resource_type_version;
  82. }
  83. // Subscribes to a resource if not already subscribed:
  84. // 1. Sets the update_queue field in subscription_state.
  85. // 2. Adds subscription_state to resource_state->subscriptions.
  86. bool AdsServiceImpl::MaybeSubscribe(const std::string& resource_type,
  87. const std::string& resource_name,
  88. SubscriptionState* subscription_state,
  89. ResourceState* resource_state,
  90. UpdateQueue* update_queue) {
  91. // The update_queue will be null if we were not previously subscribed.
  92. if (subscription_state->update_queue != nullptr) return false;
  93. subscription_state->update_queue = update_queue;
  94. resource_state->subscriptions.emplace(subscription_state);
  95. gpr_log(GPR_INFO, "ADS[%p]: subscribe to resource type %s name %s state %p",
  96. this, resource_type.c_str(), resource_name.c_str(),
  97. &subscription_state);
  98. return true;
  99. }
  100. // Removes subscriptions for resources no longer present in the
  101. // current request.
  102. void AdsServiceImpl::ProcessUnsubscriptions(
  103. const std::string& resource_type,
  104. const std::set<std::string>& resources_in_current_request,
  105. SubscriptionNameMap* subscription_name_map,
  106. ResourceNameMap* resource_name_map) {
  107. for (auto it = subscription_name_map->begin();
  108. it != subscription_name_map->end();) {
  109. const std::string& resource_name = it->first;
  110. SubscriptionState& subscription_state = it->second;
  111. if (resources_in_current_request.find(resource_name) !=
  112. resources_in_current_request.end()) {
  113. ++it;
  114. continue;
  115. }
  116. gpr_log(GPR_INFO, "ADS[%p]: Unsubscribe to type=%s name=%s state=%p", this,
  117. resource_type.c_str(), resource_name.c_str(), &subscription_state);
  118. auto resource_it = resource_name_map->find(resource_name);
  119. GPR_ASSERT(resource_it != resource_name_map->end());
  120. auto& resource_state = resource_it->second;
  121. resource_state.subscriptions.erase(&subscription_state);
  122. if (resource_state.subscriptions.empty() &&
  123. !resource_state.resource.has_value()) {
  124. resource_name_map->erase(resource_it);
  125. }
  126. it = subscription_name_map->erase(it);
  127. }
  128. }
  129. void AdsServiceImpl::Start() {
  130. grpc_core::MutexLock lock(&ads_mu_);
  131. ads_done_ = false;
  132. }
  133. void AdsServiceImpl::Shutdown() {
  134. {
  135. grpc_core::MutexLock lock(&ads_mu_);
  136. if (!ads_done_) {
  137. ads_done_ = true;
  138. ads_cond_.SignalAll();
  139. }
  140. resource_type_response_state_.clear();
  141. }
  142. gpr_log(GPR_INFO, "ADS[%p]: shut down", this);
  143. }
  144. //
  145. // LrsServiceImpl::ClientStats
  146. //
  147. uint64_t LrsServiceImpl::ClientStats::total_successful_requests() const {
  148. uint64_t sum = 0;
  149. for (auto& p : locality_stats_) {
  150. sum += p.second.total_successful_requests;
  151. }
  152. return sum;
  153. }
  154. uint64_t LrsServiceImpl::ClientStats::total_requests_in_progress() const {
  155. uint64_t sum = 0;
  156. for (auto& p : locality_stats_) {
  157. sum += p.second.total_requests_in_progress;
  158. }
  159. return sum;
  160. }
  161. uint64_t LrsServiceImpl::ClientStats::total_error_requests() const {
  162. uint64_t sum = 0;
  163. for (auto& p : locality_stats_) {
  164. sum += p.second.total_error_requests;
  165. }
  166. return sum;
  167. }
  168. uint64_t LrsServiceImpl::ClientStats::total_issued_requests() const {
  169. uint64_t sum = 0;
  170. for (auto& p : locality_stats_) {
  171. sum += p.second.total_issued_requests;
  172. }
  173. return sum;
  174. }
  175. uint64_t LrsServiceImpl::ClientStats::dropped_requests(
  176. const std::string& category) const {
  177. auto iter = dropped_requests_.find(category);
  178. GPR_ASSERT(iter != dropped_requests_.end());
  179. return iter->second;
  180. }
  181. LrsServiceImpl::ClientStats& LrsServiceImpl::ClientStats::operator+=(
  182. const ClientStats& other) {
  183. for (const auto& p : other.locality_stats_) {
  184. locality_stats_[p.first] += p.second;
  185. }
  186. total_dropped_requests_ += other.total_dropped_requests_;
  187. for (const auto& p : other.dropped_requests_) {
  188. dropped_requests_[p.first] += p.second;
  189. }
  190. return *this;
  191. }
  192. //
  193. // LrsServiceImpl
  194. //
  195. void LrsServiceImpl::Start() {
  196. {
  197. grpc_core::MutexLock lock(&lrs_mu_);
  198. lrs_done_ = false;
  199. }
  200. {
  201. grpc_core::MutexLock lock(&load_report_mu_);
  202. result_queue_.clear();
  203. }
  204. }
  205. void LrsServiceImpl::Shutdown() {
  206. {
  207. grpc_core::MutexLock lock(&lrs_mu_);
  208. if (!lrs_done_) {
  209. lrs_done_ = true;
  210. lrs_cv_.SignalAll();
  211. }
  212. }
  213. gpr_log(GPR_INFO, "LRS[%p]: shut down", this);
  214. }
  215. std::vector<LrsServiceImpl::ClientStats> LrsServiceImpl::WaitForLoadReport() {
  216. grpc_core::MutexLock lock(&load_report_mu_);
  217. grpc_core::CondVar cv;
  218. if (result_queue_.empty()) {
  219. load_report_cond_ = &cv;
  220. while (result_queue_.empty()) {
  221. cv.Wait(&load_report_mu_);
  222. }
  223. load_report_cond_ = nullptr;
  224. }
  225. std::vector<ClientStats> result = std::move(result_queue_.front());
  226. result_queue_.pop_front();
  227. return result;
  228. }
  229. } // namespace testing
  230. } // namespace grpc