123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- # Copyright 2021 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from typing import List
- from absl import flags
- from absl.testing import absltest
- from framework import xds_k8s_testcase
- from framework.infrastructure import k8s
- from framework.test_app import server_app
- logger = logging.getLogger(__name__)
- flags.adopt_module_key_flags(xds_k8s_testcase)
- # Type aliases
- _XdsTestServer = xds_k8s_testcase.XdsTestServer
- _XdsTestClient = xds_k8s_testcase.XdsTestClient
- class FailoverTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
- REPLICA_COUNT = 3
- MAX_RATE_PER_ENDPOINT = 100
- def setUp(self):
- super().setUp()
- self.secondary_server_runner = server_app.KubernetesServerRunner(
- k8s.KubernetesNamespace(self.secondary_k8s_api_manager,
- self.server_namespace),
- deployment_name=self.server_name + '-alt',
- image_name=self.server_image,
- gcp_service_account=self.gcp_service_account,
- td_bootstrap_image=self.td_bootstrap_image,
- gcp_project=self.project,
- gcp_api_manager=self.gcp_api_manager,
- xds_server_uri=self.xds_server_uri,
- network=self.network,
- debug_use_port_forwarding=self.debug_use_port_forwarding,
- reuse_namespace=True)
- def tearDown(self):
- if hasattr(self, 'secondary_server_runner'):
- self.secondary_server_runner.cleanup()
- super().tearDown()
- def test_failover(self) -> None:
- with self.subTest('00_create_health_check'):
- self.td.create_health_check()
- with self.subTest('01_create_backend_services'):
- self.td.create_backend_service()
- with self.subTest('02_create_url_map'):
- self.td.create_url_map(self.server_xds_host, self.server_xds_port)
- with self.subTest('03_create_target_proxy'):
- self.td.create_target_proxy()
- with self.subTest('04_create_forwarding_rule'):
- self.td.create_forwarding_rule(self.server_xds_port)
- with self.subTest('05_start_test_servers'):
- self.default_test_servers: List[
- _XdsTestServer] = self.startTestServers(
- replica_count=self.REPLICA_COUNT)
- self.alternate_test_servers: List[
- _XdsTestServer] = self.startTestServers(
- server_runner=self.secondary_server_runner)
- with self.subTest('06_add_server_backends_to_backend_services'):
- self.setupServerBackends(
- max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
- self.setupServerBackends(
- server_runner=self.secondary_server_runner,
- max_rate_per_endpoint=self.MAX_RATE_PER_ENDPOINT)
- with self.subTest('07_start_test_client'):
- self.test_client: _XdsTestClient = self.startTestClient(
- self.default_test_servers[0])
- with self.subTest('08_test_client_xds_config_exists'):
- self.assertXdsConfigExists(self.test_client)
- with self.subTest('09_primary_locality_receives_requests'):
- self.assertRpcsEventuallyGoToGivenServers(self.test_client,
- self.default_test_servers)
- with self.subTest(
- '10_secondary_locality_receives_no_requests_on_partial_primary_failure'
- ):
- self.default_test_servers[0].set_not_serving()
- self.assertRpcsEventuallyGoToGivenServers(
- self.test_client, self.default_test_servers[1:])
- with self.subTest('11_gentle_failover'):
- self.default_test_servers[1].set_not_serving()
- self.assertRpcsEventuallyGoToGivenServers(
- self.test_client,
- self.default_test_servers[2:] + self.alternate_test_servers)
- with self.subTest(
- '12_secondary_locality_receives_requests_on_primary_failure'):
- self.default_test_servers[2].set_not_serving()
- self.assertRpcsEventuallyGoToGivenServers(
- self.test_client, self.alternate_test_servers)
- with self.subTest('13_traffic_resumes_to_healthy_backends'):
- for i in range(self.REPLICA_COUNT):
- self.default_test_servers[i].set_serving()
- self.assertRpcsEventuallyGoToGivenServers(self.test_client,
- self.default_test_servers)
- if __name__ == '__main__':
- absltest.main(failfast=True)
|