baseline_test.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. # Copyright 2020 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from absl import flags
  16. from absl.testing import absltest
  17. from framework import xds_k8s_testcase
  18. logger = logging.getLogger(__name__)
  19. flags.adopt_module_key_flags(xds_k8s_testcase)
  20. # Type aliases
  21. _XdsTestServer = xds_k8s_testcase.XdsTestServer
  22. _XdsTestClient = xds_k8s_testcase.XdsTestClient
  23. class BaselineTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
  24. def test_traffic_director_grpc_setup(self):
  25. with self.subTest('0_create_health_check'):
  26. self.td.create_health_check()
  27. with self.subTest('1_create_backend_service'):
  28. self.td.create_backend_service()
  29. with self.subTest('2_create_url_map'):
  30. self.td.create_url_map(self.server_xds_host, self.server_xds_port)
  31. with self.subTest('3_create_target_proxy'):
  32. self.td.create_target_proxy()
  33. with self.subTest('4_create_forwarding_rule'):
  34. self.td.create_forwarding_rule(self.server_xds_port)
  35. with self.subTest('5_start_test_server'):
  36. test_servers: _XdsTestServer = self.startTestServers()
  37. with self.subTest('6_add_server_backends_to_backend_service'):
  38. self.setupServerBackends()
  39. with self.subTest('7_start_test_client'):
  40. test_client: _XdsTestClient = self.startTestClient(test_servers[0])
  41. with self.subTest('8_test_client_xds_config_exists'):
  42. self.assertXdsConfigExists(test_client)
  43. with self.subTest('9_test_server_received_rpcs_from_test_client'):
  44. self.assertSuccessfulRpcs(test_client)
  45. if __name__ == '__main__':
  46. absltest.main(failfast=True)