affinity_test.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # Copyright 2021 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import time
  16. from typing import List, Optional
  17. from absl import flags
  18. from absl.testing import absltest
  19. from google.protobuf import json_format
  20. from framework import xds_k8s_testcase
  21. from framework import xds_url_map_testcase
  22. from framework.helpers import skips
  23. from framework.infrastructure import k8s
  24. from framework.rpc import grpc_channelz
  25. from framework.test_app import server_app
  26. logger = logging.getLogger(__name__)
  27. flags.adopt_module_key_flags(xds_k8s_testcase)
  28. # Type aliases
  29. _XdsTestServer = xds_k8s_testcase.XdsTestServer
  30. _XdsTestClient = xds_k8s_testcase.XdsTestClient
  31. _ChannelzChannelState = grpc_channelz.ChannelState
  32. # Testing consts
  33. _TEST_AFFINITY_METADATA_KEY = 'xds_md'
  34. _TD_PROPAGATE_CHECK_INTERVAL_SEC = 10
  35. _TD_PROPAGATE_TIMEOUT = 600
  36. _REPLICA_COUNT = 3
  37. _RPC_COUNT = 100
  38. class AffinityTest(xds_k8s_testcase.RegularXdsKubernetesTestCase):
  39. @staticmethod
  40. def isSupported(config: skips.TestConfig) -> bool:
  41. if config.client_lang in ['cpp', 'java', 'python', 'go']:
  42. return config.version_ge('v1.40.x')
  43. return False
  44. def test_affinity(self) -> None:
  45. with self.subTest('00_create_health_check'):
  46. self.td.create_health_check()
  47. with self.subTest('01_create_backend_services'):
  48. self.td.create_backend_service(
  49. affinity_header=_TEST_AFFINITY_METADATA_KEY)
  50. with self.subTest('02_create_url_map'):
  51. self.td.create_url_map(self.server_xds_host, self.server_xds_port)
  52. with self.subTest('03_create_target_proxy'):
  53. self.td.create_target_proxy()
  54. with self.subTest('04_create_forwarding_rule'):
  55. self.td.create_forwarding_rule(self.server_xds_port)
  56. with self.subTest('05_start_test_servers'):
  57. self.test_servers: List[_XdsTestServer] = self.startTestServers(
  58. replica_count=_REPLICA_COUNT)
  59. with self.subTest('06_add_server_backends_to_backend_services'):
  60. self.setupServerBackends()
  61. with self.subTest('07_start_test_client'):
  62. self.test_client: _XdsTestClient = self.startTestClient(
  63. self.test_servers[0],
  64. rpc='EmptyCall',
  65. metadata='EmptyCall:%s:123' % _TEST_AFFINITY_METADATA_KEY)
  66. # Validate the number of received endpoints and affinity configs.
  67. config = self.test_client.csds.fetch_client_status(
  68. log_level=logging.INFO)
  69. self.assertIsNotNone(config)
  70. json_config = json_format.MessageToDict(config)
  71. parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
  72. logging.info('Client received CSDS response: %s', parsed)
  73. self.assertLen(parsed.endpoints, _REPLICA_COUNT)
  74. self.assertEqual(
  75. parsed.rds['virtualHosts'][0]['routes'][0]['route']
  76. ['hashPolicy'][0]['header']['headerName'],
  77. _TEST_AFFINITY_METADATA_KEY)
  78. self.assertEqual(parsed.cds[0]['lbPolicy'], 'RING_HASH')
  79. with self.subTest('08_test_client_xds_config_exists'):
  80. self.assertXdsConfigExists(self.test_client)
  81. with self.subTest('09_test_server_received_rpcs_from_test_client'):
  82. self.assertSuccessfulRpcs(self.test_client)
  83. with self.subTest('10_first_100_affinity_rpcs_pick_same_backend'):
  84. rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT)
  85. json_lb_stats = json_format.MessageToDict(rpc_stats)
  86. rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
  87. json_lb_stats)
  88. self.assertEqual(1, rpc_distribution.num_peers)
  89. self.assertLen(
  90. self.test_client.find_subchannels_with_state(
  91. _ChannelzChannelState.READY),
  92. 1,
  93. )
  94. self.assertLen(
  95. self.test_client.find_subchannels_with_state(
  96. _ChannelzChannelState.IDLE),
  97. 2,
  98. )
  99. # Remember the backend inuse, and turn it down later.
  100. self.first_backend_inuse = list(
  101. rpc_distribution.raw['rpcsByPeer'].keys())[0]
  102. with self.subTest('11_turn_down_server_in_use'):
  103. for s in self.test_servers:
  104. if s.pod_name == self.first_backend_inuse:
  105. logging.info('setting backend %s to NOT_SERVING',
  106. s.pod_name)
  107. s.set_not_serving()
  108. with self.subTest('12_wait_for_unhealth_status_propagation'):
  109. deadline = time.time() + _TD_PROPAGATE_TIMEOUT
  110. parsed = None
  111. try:
  112. while time.time() < deadline:
  113. config = self.test_client.csds.fetch_client_status(
  114. log_level=logging.INFO)
  115. self.assertIsNotNone(config)
  116. json_config = json_format.MessageToDict(config)
  117. parsed = xds_url_map_testcase.DumpedXdsConfig(json_config)
  118. if len(parsed.endpoints) == _REPLICA_COUNT - 1:
  119. break
  120. logging.info(
  121. 'CSDS got unexpected endpoints, will retry after %d seconds',
  122. _TD_PROPAGATE_CHECK_INTERVAL_SEC)
  123. time.sleep(_TD_PROPAGATE_CHECK_INTERVAL_SEC)
  124. else:
  125. self.fail(
  126. 'unhealthy status did not propagate after 600 seconds')
  127. finally:
  128. logging.info('Client received CSDS response: %s', parsed)
  129. with self.subTest('12_next_100_affinity_rpcs_pick_different_backend'):
  130. rpc_stats = self.getClientRpcStats(self.test_client, _RPC_COUNT)
  131. json_lb_stats = json_format.MessageToDict(rpc_stats)
  132. rpc_distribution = xds_url_map_testcase.RpcDistributionStats(
  133. json_lb_stats)
  134. self.assertEqual(1, rpc_distribution.num_peers)
  135. new_backend_inuse = list(
  136. rpc_distribution.raw['rpcsByPeer'].keys())[0]
  137. self.assertNotEqual(new_backend_inuse, self.first_backend_inuse)
  138. if __name__ == '__main__':
  139. absltest.main(failfast=True)