123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- # Copyright 2020 gRPC authors.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from absl import flags
- from absl.testing import absltest
- from framework import xds_k8s_testcase
- from framework.helpers import rand
- from framework.helpers import skips
- logger = logging.getLogger(__name__)
- flags.adopt_module_key_flags(xds_k8s_testcase)
- # Type aliases
- _XdsTestServer = xds_k8s_testcase.XdsTestServer
- _XdsTestClient = xds_k8s_testcase.XdsTestClient
- _SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
- class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
- @staticmethod
- def isSupported(config: skips.TestConfig) -> bool:
- if config.client_lang in ['cpp', 'python', 'go']:
- return config.version_ge('v1.41.x')
- return False
- def test_mtls(self):
- """mTLS test.
- Both client and server configured to use TLS and mTLS.
- """
- self.setupTrafficDirectorGrpc()
- self.setupSecurityPolicies(server_tls=True,
- server_mtls=True,
- client_tls=True,
- client_mtls=True)
- test_server: _XdsTestServer = self.startSecureTestServer()
- self.setupServerBackends()
- test_client: _XdsTestClient = self.startSecureTestClient(test_server)
- self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server)
- self.assertSuccessfulRpcs(test_client)
- logger.info('[SUCCESS] mTLS security mode confirmed.')
- def test_tls(self):
- """TLS test.
- Both client and server configured to use TLS and not use mTLS.
- """
- self.setupTrafficDirectorGrpc()
- self.setupSecurityPolicies(server_tls=True,
- server_mtls=False,
- client_tls=True,
- client_mtls=False)
- test_server: _XdsTestServer = self.startSecureTestServer()
- self.setupServerBackends()
- test_client: _XdsTestClient = self.startSecureTestClient(test_server)
- self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server)
- self.assertSuccessfulRpcs(test_client)
- logger.info('[SUCCESS] TLS security mode confirmed.')
- def test_plaintext_fallback(self):
- """Plain-text fallback test.
- Control plane provides no security config so both client and server
- fallback to plaintext based on fallback-credentials.
- """
- self.setupTrafficDirectorGrpc()
- self.setupSecurityPolicies(server_tls=False,
- server_mtls=False,
- client_tls=False,
- client_mtls=False)
- test_server: _XdsTestServer = self.startSecureTestServer()
- self.setupServerBackends()
- test_client: _XdsTestClient = self.startSecureTestClient(test_server)
- self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client,
- test_server)
- self.assertSuccessfulRpcs(test_client)
- logger.info('[SUCCESS] Plaintext security mode confirmed.')
- def test_mtls_error(self):
- """Negative test: mTLS Error.
- Server expects client mTLS cert, but client configured only for TLS.
- Note: because this is a negative test we need to make sure the mTLS
- failure happens after receiving the correct configuration at the
- client. To ensure that we will perform the following steps in that
- sequence:
- - Creation of a backendService, and attaching the backend (NEG)
- - Creation of the Server mTLS Policy, and attaching to the ECS
- - Creation of the Client TLS Policy, and attaching to the backendService
- - Creation of the urlMap, targetProxy, and forwardingRule
- With this sequence we are sure that when the client receives the
- endpoints of the backendService the security-config would also have
- been received as confirmed by the TD team.
- """
- # Create backend service
- self.td.setup_backend_for_grpc(
- health_check_port=self.server_maintenance_port)
- # Start server and attach its NEGs to the backend service, but
- # until they become healthy.
- test_server: _XdsTestServer = self.startSecureTestServer()
- self.setupServerBackends(wait_for_healthy_status=False)
- # Setup policies and attach them.
- self.setupSecurityPolicies(server_tls=True,
- server_mtls=True,
- client_tls=True,
- client_mtls=False)
- # Create the routing rule map.
- self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
- self.server_xds_port)
- # Now that TD setup is complete, Backend Service can be populated
- # with healthy backends (NEGs).
- self.td.wait_for_backends_healthy_status()
- # Start the client, but don't wait for it to report a healthy channel.
- test_client: _XdsTestClient = self.startSecureTestClient(
- test_server, wait_for_active_server_channel=False)
- self.assertClientCannotReachServerRepeatedly(test_client)
- logger.info(
- "[SUCCESS] Client's connectivity state is consistent with a mTLS "
- "error caused by not presenting mTLS certificate to the server.")
- def test_server_authz_error(self):
- """Negative test: AuthZ error.
- Client does not authorize server because of mismatched SAN name.
- The order of operations is the same as in `test_mtls_error`.
- """
- # Create backend service
- self.td.setup_backend_for_grpc(
- health_check_port=self.server_maintenance_port)
- # Start server and attach its NEGs to the backend service, but
- # until they become healthy.
- test_server: _XdsTestServer = self.startSecureTestServer()
- self.setupServerBackends(wait_for_healthy_status=False)
- # Regular TLS setup, but with client policy configured using
- # intentionality incorrect server_namespace.
- self.td.setup_server_security(server_namespace=self.server_namespace,
- server_name=self.server_name,
- server_port=self.server_port,
- tls=True,
- mtls=False)
- incorrect_namespace = f'incorrect-namespace-{rand.rand_string()}'
- self.td.setup_client_security(server_namespace=incorrect_namespace,
- server_name=self.server_name,
- tls=True,
- mtls=False)
- # Create the routing rule map.
- self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
- self.server_xds_port)
- # Now that TD setup is complete, Backend Service can be populated
- # with healthy backends (NEGs).
- self.td.wait_for_backends_healthy_status()
- # Start the client, but don't wait for it to report a healthy channel.
- test_client: _XdsTestClient = self.startSecureTestClient(
- test_server, wait_for_active_server_channel=False)
- self.assertClientCannotReachServerRepeatedly(test_client)
- logger.info("[SUCCESS] Client's connectivity state is consistent with "
- "AuthZ error caused by server presenting incorrect SAN.")
- if __name__ == '__main__':
- absltest.main()
|