xds_server.h 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899
  1. //
  2. // Copyright 2017 gRPC authors.
  3. //
  4. // Licensed under the Apache License, Version 2.0 (the "License");
  5. // you may not use this file except in compliance with the License.
  6. // You may obtain a copy of the License at
  7. //
  8. // http://www.apache.org/licenses/LICENSE-2.0
  9. //
  10. // Unless required by applicable law or agreed to in writing, software
  11. // distributed under the License is distributed on an "AS IS" BASIS,
  12. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. // See the License for the specific language governing permissions and
  14. // limitations under the License.
  15. //
  16. #ifndef GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
  17. #define GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H
  18. #include <deque>
  19. #include <set>
  20. #include <string>
  21. #include <thread>
  22. #include <vector>
  23. #include <gmock/gmock.h>
  24. #include <gtest/gtest.h>
  25. #include "absl/types/optional.h"
  26. #include <grpc/support/log.h>
  27. #include "src/core/lib/address_utils/parse_address.h"
  28. #include "src/core/lib/gprpp/sync.h"
  29. #include "src/proto/grpc/testing/xds/ads_for_test.grpc.pb.h"
  30. #include "src/proto/grpc/testing/xds/lrs_for_test.grpc.pb.h"
  31. #include "src/proto/grpc/testing/xds/v3/ads.grpc.pb.h"
  32. #include "src/proto/grpc/testing/xds/v3/cluster.grpc.pb.h"
  33. #include "src/proto/grpc/testing/xds/v3/discovery.grpc.pb.h"
  34. #include "src/proto/grpc/testing/xds/v3/endpoint.grpc.pb.h"
  35. #include "src/proto/grpc/testing/xds/v3/listener.grpc.pb.h"
  36. #include "src/proto/grpc/testing/xds/v3/lrs.grpc.pb.h"
  37. #include "src/proto/grpc/testing/xds/v3/route.grpc.pb.h"
  38. #include "test/core/util/test_config.h"
  39. #include "test/cpp/end2end/counted_service.h"
  40. namespace grpc {
  41. namespace testing {
  42. constexpr char kLdsTypeUrl[] =
  43. "type.googleapis.com/envoy.config.listener.v3.Listener";
  44. constexpr char kRdsTypeUrl[] =
  45. "type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
  46. constexpr char kCdsTypeUrl[] =
  47. "type.googleapis.com/envoy.config.cluster.v3.Cluster";
  48. constexpr char kEdsTypeUrl[] =
  49. "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";
  50. constexpr char kLdsV2TypeUrl[] = "type.googleapis.com/envoy.api.v2.Listener";
  51. constexpr char kRdsV2TypeUrl[] =
  52. "type.googleapis.com/envoy.api.v2.RouteConfiguration";
  53. constexpr char kCdsV2TypeUrl[] = "type.googleapis.com/envoy.api.v2.Cluster";
  54. constexpr char kEdsV2TypeUrl[] =
  55. "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
  56. // An ADS service implementation.
  57. class AdsServiceImpl : public std::enable_shared_from_this<AdsServiceImpl> {
  58. public:
  59. // State for a given xDS resource type.
  60. struct ResponseState {
  61. enum State {
  62. ACKED, // ACK received.
  63. NACKED, // NACK received; error_message will contain the error.
  64. };
  65. State state = ACKED;
  66. std::string error_message;
  67. };
  68. AdsServiceImpl()
  69. : v2_rpc_service_(this, /*is_v2=*/true),
  70. v3_rpc_service_(this, /*is_v2=*/false) {}
  71. bool seen_v2_client() const { return seen_v2_client_; }
  72. bool seen_v3_client() const { return seen_v3_client_; }
  73. ::envoy::service::discovery::v2::AggregatedDiscoveryService::Service*
  74. v2_rpc_service() {
  75. return &v2_rpc_service_;
  76. }
  77. ::envoy::service::discovery::v3::AggregatedDiscoveryService::Service*
  78. v3_rpc_service() {
  79. return &v3_rpc_service_;
  80. }
  81. // Sets a resource to a particular value, overwriting any previous value.
  82. void SetResource(google::protobuf::Any resource, const std::string& type_url,
  83. const std::string& name);
  84. // Removes a resource from the server's state.
  85. void UnsetResource(const std::string& type_url, const std::string& name);
  86. void SetLdsResource(const ::envoy::config::listener::v3::Listener& listener) {
  87. google::protobuf::Any resource;
  88. resource.PackFrom(listener);
  89. SetResource(std::move(resource), kLdsTypeUrl, listener.name());
  90. }
  91. void SetRdsResource(
  92. const ::envoy::config::route::v3::RouteConfiguration& route) {
  93. google::protobuf::Any resource;
  94. resource.PackFrom(route);
  95. SetResource(std::move(resource), kRdsTypeUrl, route.name());
  96. }
  97. void SetCdsResource(const ::envoy::config::cluster::v3::Cluster& cluster) {
  98. google::protobuf::Any resource;
  99. resource.PackFrom(cluster);
  100. SetResource(std::move(resource), kCdsTypeUrl, cluster.name());
  101. }
  102. void SetEdsResource(
  103. const ::envoy::config::endpoint::v3::ClusterLoadAssignment& assignment) {
  104. google::protobuf::Any resource;
  105. resource.PackFrom(assignment);
  106. SetResource(std::move(resource), kEdsTypeUrl, assignment.cluster_name());
  107. }
  108. // Tells the server to ignore requests from the client for a given
  109. // resource type.
  110. void IgnoreResourceType(const std::string& type_url) {
  111. grpc_core::MutexLock lock(&ads_mu_);
  112. resource_types_to_ignore_.emplace(type_url);
  113. }
  114. // Sets the minimum version that the server will accept for a given
  115. // resource type. Will cause a gmock expectation failure if we see a
  116. // lower version.
  117. void SetResourceMinVersion(const std::string& type_url, int version) {
  118. grpc_core::MutexLock lock(&ads_mu_);
  119. resource_type_min_versions_[type_url] = version;
  120. }
  121. // Get the list of response state for each resource type.
  122. absl::optional<ResponseState> GetResponseState(const std::string& type_url) {
  123. grpc_core::MutexLock lock(&ads_mu_);
  124. if (resource_type_response_state_[type_url].empty()) {
  125. return absl::nullopt;
  126. }
  127. auto response = resource_type_response_state_[type_url].front();
  128. resource_type_response_state_[type_url].pop_front();
  129. return response;
  130. }
  131. absl::optional<ResponseState> lds_response_state() {
  132. return GetResponseState(kLdsTypeUrl);
  133. }
  134. absl::optional<ResponseState> rds_response_state() {
  135. return GetResponseState(kRdsTypeUrl);
  136. }
  137. absl::optional<ResponseState> cds_response_state() {
  138. return GetResponseState(kCdsTypeUrl);
  139. }
  140. absl::optional<ResponseState> eds_response_state() {
  141. return GetResponseState(kEdsTypeUrl);
  142. }
  143. // Starts the service.
  144. void Start();
  145. // Shuts down the service.
  146. void Shutdown();
  147. // Returns the peer names of clients currently connected to the service.
  148. std::set<std::string> clients() {
  149. grpc_core::MutexLock lock(&clients_mu_);
  150. return clients_;
  151. }
  152. void ForceADSFailure(Status status) {
  153. grpc_core::MutexLock lock(&ads_mu_);
  154. forced_ads_failure_ = std::move(status);
  155. }
  156. private:
  157. // A queue of resource type/name pairs that have changed since the client
  158. // subscribed to them.
  159. using UpdateQueue = std::deque<
  160. std::pair<std::string /* type url */, std::string /* resource name */>>;
  161. // A struct representing a client's subscription to a particular resource.
  162. struct SubscriptionState {
  163. // The queue upon which to place updates when the resource is updated.
  164. UpdateQueue* update_queue;
  165. };
  166. // A struct representing the a client's subscription to all the resources.
  167. using SubscriptionNameMap =
  168. std::map<std::string /* resource_name */, SubscriptionState>;
  169. using SubscriptionMap =
  170. std::map<std::string /* type_url */, SubscriptionNameMap>;
  171. // Sent state for a given resource type.
  172. struct SentState {
  173. int nonce = 0;
  174. int resource_type_version = 0;
  175. };
  176. // A struct representing the current state for an individual resource.
  177. struct ResourceState {
  178. // The resource itself, if present.
  179. absl::optional<google::protobuf::Any> resource;
  180. // The resource type version that this resource was last updated in.
  181. int resource_type_version = 0;
  182. // A list of subscriptions to this resource.
  183. std::set<SubscriptionState*> subscriptions;
  184. };
  185. // The current state for all individual resources of a given type.
  186. using ResourceNameMap =
  187. std::map<std::string /* resource_name */, ResourceState>;
  188. struct ResourceTypeState {
  189. int resource_type_version = 0;
  190. ResourceNameMap resource_name_map;
  191. };
  192. using ResourceMap = std::map<std::string /* type_url */, ResourceTypeState>;
  193. // Templated RPC service implementation, works for both v2 and v3.
  194. template <class RpcApi, class DiscoveryRequest, class DiscoveryResponse>
  195. class RpcService : public RpcApi::Service {
  196. public:
  197. using Stream = ServerReaderWriter<DiscoveryResponse, DiscoveryRequest>;
  198. RpcService(AdsServiceImpl* parent, bool is_v2)
  199. : parent_(parent), is_v2_(is_v2) {}
  200. Status StreamAggregatedResources(ServerContext* context,
  201. Stream* stream) override {
  202. gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources starts", this);
  203. {
  204. grpc_core::MutexLock lock(&parent_->ads_mu_);
  205. if (parent_->forced_ads_failure_.has_value()) {
  206. gpr_log(GPR_INFO,
  207. "ADS[%p]: StreamAggregatedResources forcing early failure "
  208. "with status code: %d, message: %s",
  209. this, parent_->forced_ads_failure_.value().error_code(),
  210. parent_->forced_ads_failure_.value().error_message().c_str());
  211. return parent_->forced_ads_failure_.value();
  212. }
  213. }
  214. parent_->AddClient(context->peer());
  215. if (is_v2_) {
  216. parent_->seen_v2_client_ = true;
  217. } else {
  218. parent_->seen_v3_client_ = true;
  219. }
  220. // Take a reference of the AdsServiceImpl object, which will go
  221. // out of scope when this request handler returns. This ensures
  222. // that the parent won't be destroyed until this stream is complete.
  223. std::shared_ptr<AdsServiceImpl> ads_service_impl =
  224. parent_->shared_from_this();
  225. // Resources (type/name pairs) that have changed since the client
  226. // subscribed to them.
  227. UpdateQueue update_queue;
  228. // Resources that the client will be subscribed to keyed by resource type
  229. // url.
  230. SubscriptionMap subscription_map;
  231. // Sent state for each resource type.
  232. std::map<std::string /*type_url*/, SentState> sent_state_map;
  233. // Spawn a thread to read requests from the stream.
  234. // Requests will be delivered to this thread in a queue.
  235. std::deque<DiscoveryRequest> requests;
  236. bool stream_closed = false;
  237. std::thread reader(std::bind(&RpcService::BlockingRead, this, stream,
  238. &requests, &stream_closed));
  239. // Main loop to process requests and updates.
  240. while (true) {
  241. // Boolean to keep track if the loop received any work to do: a
  242. // request or an update; regardless whether a response was actually
  243. // sent out.
  244. bool did_work = false;
  245. // Look for new requests and and decide what to handle.
  246. absl::optional<DiscoveryResponse> response;
  247. {
  248. grpc_core::MutexLock lock(&parent_->ads_mu_);
  249. // If the stream has been closed or our parent is being shut
  250. // down, stop immediately.
  251. if (stream_closed || parent_->ads_done_) break;
  252. // Otherwise, see if there's a request to read from the queue.
  253. if (!requests.empty()) {
  254. DiscoveryRequest request = std::move(requests.front());
  255. requests.pop_front();
  256. did_work = true;
  257. gpr_log(GPR_INFO,
  258. "ADS[%p]: Received request for type %s with content %s",
  259. this, request.type_url().c_str(),
  260. request.DebugString().c_str());
  261. const std::string v3_resource_type =
  262. TypeUrlToV3(request.type_url());
  263. SentState& sent_state = sent_state_map[v3_resource_type];
  264. // Process request.
  265. ProcessRequest(request, v3_resource_type, &update_queue,
  266. &subscription_map, &sent_state, &response);
  267. }
  268. }
  269. if (response.has_value()) {
  270. gpr_log(GPR_INFO, "ADS[%p]: Sending response: %s", this,
  271. response->DebugString().c_str());
  272. stream->Write(response.value());
  273. }
  274. response.reset();
  275. // Look for updates and decide what to handle.
  276. {
  277. grpc_core::MutexLock lock(&parent_->ads_mu_);
  278. if (!update_queue.empty()) {
  279. const std::string resource_type =
  280. std::move(update_queue.front().first);
  281. const std::string resource_name =
  282. std::move(update_queue.front().second);
  283. update_queue.pop_front();
  284. did_work = true;
  285. SentState& sent_state = sent_state_map[resource_type];
  286. ProcessUpdate(resource_type, resource_name, &subscription_map,
  287. &sent_state, &response);
  288. }
  289. }
  290. if (response.has_value()) {
  291. gpr_log(GPR_INFO, "ADS[%p]: Sending update response: %s", this,
  292. response->DebugString().c_str());
  293. stream->Write(response.value());
  294. }
  295. {
  296. grpc_core::MutexLock lock(&parent_->ads_mu_);
  297. if (parent_->ads_done_) {
  298. break;
  299. }
  300. }
  301. // If we didn't find anything to do, delay before the next loop
  302. // iteration; otherwise, check whether we should exit and then
  303. // immediately continue.
  304. gpr_sleep_until(
  305. grpc_timeout_milliseconds_to_deadline(did_work ? 0 : 10));
  306. }
  307. // Done with main loop. Clean up before returning.
  308. // Join reader thread.
  309. reader.join();
  310. // Clean up any subscriptions that were still active when the call
  311. // finished.
  312. {
  313. grpc_core::MutexLock lock(&parent_->ads_mu_);
  314. for (auto& p : subscription_map) {
  315. const std::string& type_url = p.first;
  316. SubscriptionNameMap& subscription_name_map = p.second;
  317. for (auto& q : subscription_name_map) {
  318. const std::string& resource_name = q.first;
  319. SubscriptionState& subscription_state = q.second;
  320. ResourceNameMap& resource_name_map =
  321. parent_->resource_map_[type_url].resource_name_map;
  322. ResourceState& resource_state = resource_name_map[resource_name];
  323. resource_state.subscriptions.erase(&subscription_state);
  324. }
  325. }
  326. }
  327. gpr_log(GPR_INFO, "ADS[%p]: StreamAggregatedResources done", this);
  328. parent_->RemoveClient(context->peer());
  329. return Status::OK;
  330. }
  331. private:
  332. // NB: clang's annotalysis is confused by the use of inner template
  333. // classes here and *ignores* the exclusive lock annotation on some
  334. // functions. See https://bugs.llvm.org/show_bug.cgi?id=51368.
  335. //
  336. // This class is used for a dual purpose:
  337. // - it convinces clang that the lock is held in a given scope
  338. // - when used in a function that is annotated to require the inner lock it
  339. // will cause compilation to fail if the upstream bug is fixed!
  340. //
  341. // If you arrive here because of a compilation failure, that might mean the
  342. // clang bug is fixed! Please report that on the ticket.
  343. //
  344. // Since the buggy compiler will still need to be supported, consider
  345. // wrapping this class in a compiler version #if and replace its usage
  346. // with a macro whose expansion is conditional on the compiler version. In
  347. // time (years? decades?) this code can be deleted altogether.
  348. class ABSL_SCOPED_LOCKABLE NoopMutexLock {
  349. public:
  350. explicit NoopMutexLock(grpc_core::Mutex& mu)
  351. ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) {}
  352. ~NoopMutexLock() ABSL_UNLOCK_FUNCTION() {}
  353. };
  354. // Processes a response read from the client.
  355. // Populates response if needed.
  356. void ProcessRequest(const DiscoveryRequest& request,
  357. const std::string& v3_resource_type,
  358. UpdateQueue* update_queue,
  359. SubscriptionMap* subscription_map,
  360. SentState* sent_state,
  361. absl::optional<DiscoveryResponse>* response)
  362. ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->ads_mu_) {
  363. NoopMutexLock mu(parent_->ads_mu_);
  364. // Check the nonce sent by the client, if any.
  365. // (This will be absent on the first request on a stream.)
  366. if (request.response_nonce().empty()) {
  367. int client_resource_type_version = 0;
  368. if (!request.version_info().empty()) {
  369. GPR_ASSERT(absl::SimpleAtoi(request.version_info(),
  370. &client_resource_type_version));
  371. }
  372. EXPECT_GE(client_resource_type_version,
  373. parent_->resource_type_min_versions_[v3_resource_type])
  374. << "resource_type: " << v3_resource_type;
  375. } else {
  376. int client_nonce;
  377. GPR_ASSERT(absl::SimpleAtoi(request.response_nonce(), &client_nonce));
  378. // Check for ACK or NACK.
  379. ResponseState response_state;
  380. if (!request.has_error_detail()) {
  381. response_state.state = ResponseState::ACKED;
  382. gpr_log(GPR_INFO, "ADS[%p]: client ACKed resource_type=%s version=%s",
  383. this, request.type_url().c_str(),
  384. request.version_info().c_str());
  385. } else {
  386. response_state.state = ResponseState::NACKED;
  387. EXPECT_EQ(request.error_detail().code(),
  388. GRPC_STATUS_INVALID_ARGUMENT);
  389. response_state.error_message = request.error_detail().message();
  390. gpr_log(GPR_INFO,
  391. "ADS[%p]: client NACKed resource_type=%s version=%s: %s",
  392. this, request.type_url().c_str(),
  393. request.version_info().c_str(),
  394. response_state.error_message.c_str());
  395. }
  396. parent_->resource_type_response_state_[v3_resource_type].emplace_back(
  397. std::move(response_state));
  398. // Ignore requests with stale nonces.
  399. if (client_nonce < sent_state->nonce) return;
  400. }
  401. // Ignore resource types as requested by tests.
  402. if (parent_->resource_types_to_ignore_.find(v3_resource_type) !=
  403. parent_->resource_types_to_ignore_.end()) {
  404. return;
  405. }
  406. // Look at all the resource names in the request.
  407. auto& subscription_name_map = (*subscription_map)[v3_resource_type];
  408. auto& resource_type_state = parent_->resource_map_[v3_resource_type];
  409. auto& resource_name_map = resource_type_state.resource_name_map;
  410. std::set<std::string> resources_in_current_request;
  411. std::set<std::string> resources_added_to_response;
  412. for (const std::string& resource_name : request.resource_names()) {
  413. resources_in_current_request.emplace(resource_name);
  414. auto& subscription_state = subscription_name_map[resource_name];
  415. auto& resource_state = resource_name_map[resource_name];
  416. // Subscribe if needed.
  417. // Send the resource in the response if either (a) this is
  418. // a new subscription or (b) there is an updated version of
  419. // this resource to send.
  420. if (parent_->MaybeSubscribe(v3_resource_type, resource_name,
  421. &subscription_state, &resource_state,
  422. update_queue) ||
  423. ClientNeedsResourceUpdate(resource_type_state, resource_state,
  424. sent_state->resource_type_version)) {
  425. gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
  426. request.type_url().c_str(), resource_name.c_str());
  427. resources_added_to_response.emplace(resource_name);
  428. if (!response->has_value()) response->emplace();
  429. if (resource_state.resource.has_value()) {
  430. auto* resource = (*response)->add_resources();
  431. resource->CopyFrom(resource_state.resource.value());
  432. if (is_v2_) {
  433. resource->set_type_url(request.type_url());
  434. }
  435. }
  436. } else {
  437. gpr_log(GPR_INFO,
  438. "ADS[%p]: client does not need update for type=%s name=%s",
  439. this, request.type_url().c_str(), resource_name.c_str());
  440. }
  441. }
  442. // Process unsubscriptions for any resource no longer
  443. // present in the request's resource list.
  444. parent_->ProcessUnsubscriptions(
  445. v3_resource_type, resources_in_current_request,
  446. &subscription_name_map, &resource_name_map);
  447. // Construct response if needed.
  448. if (!resources_added_to_response.empty()) {
  449. CompleteBuildingDiscoveryResponse(
  450. v3_resource_type, request.type_url(),
  451. resource_type_state.resource_type_version, subscription_name_map,
  452. resources_added_to_response, sent_state, &response->value());
  453. }
  454. }
  455. // Processes a resource update from the test.
  456. // Populates response if needed.
  457. void ProcessUpdate(const std::string& resource_type,
  458. const std::string& resource_name,
  459. SubscriptionMap* subscription_map, SentState* sent_state,
  460. absl::optional<DiscoveryResponse>* response)
  461. ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->ads_mu_) {
  462. NoopMutexLock mu(parent_->ads_mu_);
  463. const std::string v2_resource_type = TypeUrlToV2(resource_type);
  464. gpr_log(GPR_INFO, "ADS[%p]: Received update for type=%s name=%s", this,
  465. resource_type.c_str(), resource_name.c_str());
  466. auto& subscription_name_map = (*subscription_map)[resource_type];
  467. auto& resource_type_state = parent_->resource_map_[resource_type];
  468. auto& resource_name_map = resource_type_state.resource_name_map;
  469. auto it = subscription_name_map.find(resource_name);
  470. if (it != subscription_name_map.end()) {
  471. ResourceState& resource_state = resource_name_map[resource_name];
  472. if (ClientNeedsResourceUpdate(resource_type_state, resource_state,
  473. sent_state->resource_type_version)) {
  474. gpr_log(GPR_INFO, "ADS[%p]: Sending update for type=%s name=%s", this,
  475. resource_type.c_str(), resource_name.c_str());
  476. response->emplace();
  477. if (resource_state.resource.has_value()) {
  478. auto* resource = (*response)->add_resources();
  479. resource->CopyFrom(resource_state.resource.value());
  480. if (is_v2_) {
  481. resource->set_type_url(v2_resource_type);
  482. }
  483. }
  484. CompleteBuildingDiscoveryResponse(
  485. resource_type, v2_resource_type,
  486. resource_type_state.resource_type_version, subscription_name_map,
  487. {resource_name}, sent_state, &response->value());
  488. }
  489. }
  490. }
  491. // Starting a thread to do blocking read on the stream until cancel.
  492. void BlockingRead(Stream* stream, std::deque<DiscoveryRequest>* requests,
  493. bool* stream_closed) {
  494. DiscoveryRequest request;
  495. bool seen_first_request = false;
  496. while (stream->Read(&request)) {
  497. if (!seen_first_request) {
  498. EXPECT_TRUE(request.has_node());
  499. ASSERT_FALSE(request.node().client_features().empty());
  500. EXPECT_EQ(request.node().client_features(0),
  501. "envoy.lb.does_not_support_overprovisioning");
  502. CheckBuildVersion(request);
  503. seen_first_request = true;
  504. }
  505. {
  506. grpc_core::MutexLock lock(&parent_->ads_mu_);
  507. requests->emplace_back(std::move(request));
  508. }
  509. }
  510. gpr_log(GPR_INFO, "ADS[%p]: Null read, stream closed", this);
  511. grpc_core::MutexLock lock(&parent_->ads_mu_);
  512. *stream_closed = true;
  513. }
  514. // Completing the building a DiscoveryResponse by adding common information
  515. // for all resources and by adding all subscribed resources for LDS and CDS.
  516. void CompleteBuildingDiscoveryResponse(
  517. const std::string& resource_type, const std::string& v2_resource_type,
  518. const int version, const SubscriptionNameMap& subscription_name_map,
  519. const std::set<std::string>& resources_added_to_response,
  520. SentState* sent_state, DiscoveryResponse* response)
  521. ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_->ads_mu_) {
  522. NoopMutexLock mu(parent_->ads_mu_);
  523. response->set_type_url(is_v2_ ? v2_resource_type : resource_type);
  524. response->set_version_info(std::to_string(version));
  525. response->set_nonce(std::to_string(++sent_state->nonce));
  526. if (resource_type == kLdsTypeUrl || resource_type == kCdsTypeUrl) {
  527. // For LDS and CDS we must send back all subscribed resources
  528. // (even the unchanged ones)
  529. for (const auto& p : subscription_name_map) {
  530. const std::string& resource_name = p.first;
  531. if (resources_added_to_response.find(resource_name) ==
  532. resources_added_to_response.end()) {
  533. ResourceNameMap& resource_name_map =
  534. parent_->resource_map_[resource_type].resource_name_map;
  535. const ResourceState& resource_state =
  536. resource_name_map[resource_name];
  537. if (resource_state.resource.has_value()) {
  538. auto* resource = response->add_resources();
  539. resource->CopyFrom(resource_state.resource.value());
  540. if (is_v2_) {
  541. resource->set_type_url(v2_resource_type);
  542. }
  543. }
  544. }
  545. }
  546. }
  547. sent_state->resource_type_version = version;
  548. }
  549. static std::string TypeUrlToV2(const std::string& resource_type) {
  550. if (resource_type == kLdsTypeUrl) return kLdsV2TypeUrl;
  551. if (resource_type == kRdsTypeUrl) return kRdsV2TypeUrl;
  552. if (resource_type == kCdsTypeUrl) return kCdsV2TypeUrl;
  553. if (resource_type == kEdsTypeUrl) return kEdsV2TypeUrl;
  554. return resource_type;
  555. }
  556. static std::string TypeUrlToV3(const std::string& resource_type) {
  557. if (resource_type == kLdsV2TypeUrl) return kLdsTypeUrl;
  558. if (resource_type == kRdsV2TypeUrl) return kRdsTypeUrl;
  559. if (resource_type == kCdsV2TypeUrl) return kCdsTypeUrl;
  560. if (resource_type == kEdsV2TypeUrl) return kEdsTypeUrl;
  561. return resource_type;
  562. }
  563. static void CheckBuildVersion(
  564. const ::envoy::api::v2::DiscoveryRequest& request) {
  565. EXPECT_FALSE(request.node().build_version().empty());
  566. }
  567. static void CheckBuildVersion(
  568. const ::envoy::service::discovery::v3::DiscoveryRequest& /*request*/) {}
  569. AdsServiceImpl* parent_;
  570. const bool is_v2_;
  571. };
  572. // Checks whether the client needs to receive a newer version of
  573. // the resource.
  574. static bool ClientNeedsResourceUpdate(
  575. const ResourceTypeState& resource_type_state,
  576. const ResourceState& resource_state, int client_resource_type_version);
  577. // Subscribes to a resource if not already subscribed:
  578. // 1. Sets the update_queue field in subscription_state.
  579. // 2. Adds subscription_state to resource_state->subscriptions.
  580. bool MaybeSubscribe(const std::string& resource_type,
  581. const std::string& resource_name,
  582. SubscriptionState* subscription_state,
  583. ResourceState* resource_state, UpdateQueue* update_queue);
  584. // Removes subscriptions for resources no longer present in the
  585. // current request.
  586. void ProcessUnsubscriptions(
  587. const std::string& resource_type,
  588. const std::set<std::string>& resources_in_current_request,
  589. SubscriptionNameMap* subscription_name_map,
  590. ResourceNameMap* resource_name_map);
  591. void AddClient(const std::string& client) {
  592. grpc_core::MutexLock lock(&clients_mu_);
  593. clients_.insert(client);
  594. }
  595. void RemoveClient(const std::string& client) {
  596. grpc_core::MutexLock lock(&clients_mu_);
  597. clients_.erase(client);
  598. }
  599. RpcService<::envoy::service::discovery::v2::AggregatedDiscoveryService,
  600. ::envoy::api::v2::DiscoveryRequest,
  601. ::envoy::api::v2::DiscoveryResponse>
  602. v2_rpc_service_;
  603. RpcService<::envoy::service::discovery::v3::AggregatedDiscoveryService,
  604. ::envoy::service::discovery::v3::DiscoveryRequest,
  605. ::envoy::service::discovery::v3::DiscoveryResponse>
  606. v3_rpc_service_;
  607. std::atomic_bool seen_v2_client_{false};
  608. std::atomic_bool seen_v3_client_{false};
  609. grpc_core::CondVar ads_cond_;
  610. grpc_core::Mutex ads_mu_;
  611. bool ads_done_ ABSL_GUARDED_BY(ads_mu_) = false;
  612. std::map<std::string /* type_url */, std::deque<ResponseState>>
  613. resource_type_response_state_ ABSL_GUARDED_BY(ads_mu_);
  614. std::set<std::string /*resource_type*/> resource_types_to_ignore_
  615. ABSL_GUARDED_BY(ads_mu_);
  616. std::map<std::string /*resource_type*/, int> resource_type_min_versions_
  617. ABSL_GUARDED_BY(ads_mu_);
  618. // An instance data member containing the current state of all resources.
  619. // Note that an entry will exist whenever either of the following is true:
  620. // - The resource exists (i.e., has been created by SetResource() and has not
  621. // yet been destroyed by UnsetResource()).
  622. // - There is at least one subscription for the resource.
  623. ResourceMap resource_map_ ABSL_GUARDED_BY(ads_mu_);
  624. absl::optional<Status> forced_ads_failure_ ABSL_GUARDED_BY(ads_mu_);
  625. grpc_core::Mutex clients_mu_;
  626. std::set<std::string> clients_ ABSL_GUARDED_BY(clients_mu_);
  627. };
  628. // An LRS service implementation.
  629. class LrsServiceImpl : public std::enable_shared_from_this<LrsServiceImpl> {
  630. public:
  631. // Stats reported by client.
  632. class ClientStats {
  633. public:
  634. // Stats for a given locality.
  635. struct LocalityStats {
  636. LocalityStats() {}
  637. // Converts from proto message class.
  638. template <class UpstreamLocalityStats>
  639. explicit LocalityStats(
  640. const UpstreamLocalityStats& upstream_locality_stats)
  641. : total_successful_requests(
  642. upstream_locality_stats.total_successful_requests()),
  643. total_requests_in_progress(
  644. upstream_locality_stats.total_requests_in_progress()),
  645. total_error_requests(
  646. upstream_locality_stats.total_error_requests()),
  647. total_issued_requests(
  648. upstream_locality_stats.total_issued_requests()) {}
  649. LocalityStats& operator+=(const LocalityStats& other) {
  650. total_successful_requests += other.total_successful_requests;
  651. total_requests_in_progress += other.total_requests_in_progress;
  652. total_error_requests += other.total_error_requests;
  653. total_issued_requests += other.total_issued_requests;
  654. return *this;
  655. }
  656. uint64_t total_successful_requests = 0;
  657. uint64_t total_requests_in_progress = 0;
  658. uint64_t total_error_requests = 0;
  659. uint64_t total_issued_requests = 0;
  660. };
  661. ClientStats() {}
  662. // Converts from proto message class.
  663. template <class ClusterStats>
  664. explicit ClientStats(const ClusterStats& cluster_stats)
  665. : cluster_name_(cluster_stats.cluster_name()),
  666. total_dropped_requests_(cluster_stats.total_dropped_requests()) {
  667. for (const auto& input_locality_stats :
  668. cluster_stats.upstream_locality_stats()) {
  669. locality_stats_.emplace(input_locality_stats.locality().sub_zone(),
  670. LocalityStats(input_locality_stats));
  671. }
  672. for (const auto& input_dropped_requests :
  673. cluster_stats.dropped_requests()) {
  674. dropped_requests_.emplace(input_dropped_requests.category(),
  675. input_dropped_requests.dropped_count());
  676. }
  677. }
  678. const std::string& cluster_name() const { return cluster_name_; }
  679. const std::map<std::string, LocalityStats>& locality_stats() const {
  680. return locality_stats_;
  681. }
  682. uint64_t total_successful_requests() const;
  683. uint64_t total_requests_in_progress() const;
  684. uint64_t total_error_requests() const;
  685. uint64_t total_issued_requests() const;
  686. uint64_t total_dropped_requests() const { return total_dropped_requests_; }
  687. uint64_t dropped_requests(const std::string& category) const;
  688. ClientStats& operator+=(const ClientStats& other);
  689. private:
  690. std::string cluster_name_;
  691. std::map<std::string, LocalityStats> locality_stats_;
  692. uint64_t total_dropped_requests_ = 0;
  693. std::map<std::string, uint64_t> dropped_requests_;
  694. };
  695. LrsServiceImpl(int client_load_reporting_interval_seconds,
  696. std::set<std::string> cluster_names)
  697. : v2_rpc_service_(this),
  698. v3_rpc_service_(this),
  699. client_load_reporting_interval_seconds_(
  700. client_load_reporting_interval_seconds),
  701. cluster_names_(std::move(cluster_names)) {}
  702. ::envoy::service::load_stats::v2::LoadReportingService::Service*
  703. v2_rpc_service() {
  704. return &v2_rpc_service_;
  705. }
  706. ::envoy::service::load_stats::v3::LoadReportingService::Service*
  707. v3_rpc_service() {
  708. return &v3_rpc_service_;
  709. }
  710. size_t request_count() {
  711. return v2_rpc_service_.request_count() + v3_rpc_service_.request_count();
  712. }
  713. size_t response_count() {
  714. return v2_rpc_service_.response_count() + v3_rpc_service_.response_count();
  715. }
  716. // Must be called before the LRS call is started.
  717. void set_send_all_clusters(bool send_all_clusters) {
  718. send_all_clusters_ = send_all_clusters;
  719. }
  720. void set_cluster_names(const std::set<std::string>& cluster_names) {
  721. cluster_names_ = cluster_names;
  722. }
  723. void Start() ABSL_LOCKS_EXCLUDED(lrs_mu_, load_report_mu_);
  724. void Shutdown();
  725. std::vector<ClientStats> WaitForLoadReport();
  726. private:
  727. // Templated RPC service implementation, works for both v2 and v3.
  728. template <class RpcApi, class LoadStatsRequest, class LoadStatsResponse>
  729. class RpcService : public CountedService<typename RpcApi::Service> {
  730. public:
  731. using Stream = ServerReaderWriter<LoadStatsResponse, LoadStatsRequest>;
  732. explicit RpcService(LrsServiceImpl* parent) : parent_(parent) {}
  733. Status StreamLoadStats(ServerContext* /*context*/,
  734. Stream* stream) override {
  735. gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats starts", this);
  736. EXPECT_GT(parent_->client_load_reporting_interval_seconds_, 0);
  737. // Take a reference of the LrsServiceImpl object, reference will go
  738. // out of scope after this method exits.
  739. std::shared_ptr<LrsServiceImpl> lrs_service_impl =
  740. parent_->shared_from_this();
  741. // Read initial request.
  742. LoadStatsRequest request;
  743. if (stream->Read(&request)) {
  744. CountedService<typename RpcApi::Service>::IncreaseRequestCount();
  745. // Verify client features.
  746. EXPECT_THAT(
  747. request.node().client_features(),
  748. ::testing::Contains("envoy.lrs.supports_send_all_clusters"));
  749. // Send initial response.
  750. LoadStatsResponse response;
  751. if (parent_->send_all_clusters_) {
  752. response.set_send_all_clusters(true);
  753. } else {
  754. for (const std::string& cluster_name : parent_->cluster_names_) {
  755. response.add_clusters(cluster_name);
  756. }
  757. }
  758. response.mutable_load_reporting_interval()->set_seconds(
  759. parent_->client_load_reporting_interval_seconds_);
  760. stream->Write(response);
  761. CountedService<typename RpcApi::Service>::IncreaseResponseCount();
  762. // Wait for report.
  763. request.Clear();
  764. while (stream->Read(&request)) {
  765. gpr_log(GPR_INFO, "LRS[%p]: received client load report message: %s",
  766. this, request.DebugString().c_str());
  767. std::vector<ClientStats> stats;
  768. for (const auto& cluster_stats : request.cluster_stats()) {
  769. stats.emplace_back(cluster_stats);
  770. }
  771. grpc_core::MutexLock lock(&parent_->load_report_mu_);
  772. parent_->result_queue_.emplace_back(std::move(stats));
  773. if (parent_->load_report_cond_ != nullptr) {
  774. parent_->load_report_cond_->Signal();
  775. }
  776. }
  777. // Wait until notified done.
  778. grpc_core::MutexLock lock(&parent_->lrs_mu_);
  779. while (!parent_->lrs_done_) {
  780. parent_->lrs_cv_.Wait(&parent_->lrs_mu_);
  781. }
  782. }
  783. gpr_log(GPR_INFO, "LRS[%p]: StreamLoadStats done", this);
  784. return Status::OK;
  785. }
  786. private:
  787. LrsServiceImpl* parent_;
  788. };
  789. RpcService<::envoy::service::load_stats::v2::LoadReportingService,
  790. ::envoy::service::load_stats::v2::LoadStatsRequest,
  791. ::envoy::service::load_stats::v2::LoadStatsResponse>
  792. v2_rpc_service_;
  793. RpcService<::envoy::service::load_stats::v3::LoadReportingService,
  794. ::envoy::service::load_stats::v3::LoadStatsRequest,
  795. ::envoy::service::load_stats::v3::LoadStatsResponse>
  796. v3_rpc_service_;
  797. const int client_load_reporting_interval_seconds_;
  798. bool send_all_clusters_ = false;
  799. std::set<std::string> cluster_names_;
  800. grpc_core::CondVar lrs_cv_;
  801. grpc_core::Mutex lrs_mu_;
  802. bool lrs_done_ ABSL_GUARDED_BY(lrs_mu_) = false;
  803. grpc_core::Mutex load_report_mu_;
  804. grpc_core::CondVar* load_report_cond_ ABSL_GUARDED_BY(load_report_mu_) =
  805. nullptr;
  806. std::deque<std::vector<ClientStats>> result_queue_
  807. ABSL_GUARDED_BY(load_report_mu_);
  808. };
  809. } // namespace testing
  810. } // namespace grpc
  811. #endif // GRPC_TEST_CPP_END2END_XDS_XDS_SERVER_H