Skip to content

feat(auth): Update get_client_ssl_credentials to support X.509 workload certs #1558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
Expand All @@ -48,21 +48,21 @@
)


def _check_dca_metadata_path(metadata_path):
"""Checks for context aware metadata. If it exists, returns the absolute path;
def _check_config_path(config_path):
"""Checks for config file path. If it exists, returns the absolute path with user expansion;
otherwise returns None.

Args:
metadata_path (str): context aware metadata path.
config_path (str): The config file path for either context_aware_metadata.json or certificate_config.json for example

Returns:
str: absolute path if exists and None otherwise.
"""
metadata_path = path.expanduser(metadata_path)
if not path.exists(metadata_path):
_LOGGER.debug("%s is not found, skip client SSL authentication.", metadata_path)
config_path = path.expanduser(config_path)
if not path.exists(config_path):
_LOGGER.debug("%s is not found.", config_path)
return None
return metadata_path
return config_path


def _load_json_file(path):
Expand Down Expand Up @@ -136,7 +136,7 @@ def _get_cert_config_path(certificate_config_path=None):
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH
certificate_config_path = CERTIFICATE_CONFIGURATION_DEFAULT_PATH

certificate_config_path = path.expanduser(certificate_config_path)
if not path.exists(certificate_config_path):
Expand Down Expand Up @@ -279,14 +279,22 @@ def _run_cert_provider_command(command, expect_encrypted_key=False):
def get_client_ssl_credentials(
generate_encrypted_key=False,
context_aware_metadata_path=CONTEXT_AWARE_METADATA_PATH,
certificate_config_path=CERTIFICATE_CONFIGURATION_DEFAULT_PATH,
):
"""Returns the client side certificate, private key and passphrase.

We look for certificates and keys with the following order of priority:
1. Certificate and key specified by certificate_config.json.
Currently, only X.509 workload certificates are supported.
2. Certificate and key specified by context aware metadata (i.e. SecureConnect).

Args:
generate_encrypted_key (bool): If set to True, encrypted private key
and passphrase will be generated; otherwise, unencrypted private key
will be generated and passphrase will be None.
will be generated and passphrase will be None. This option only
affects keys obtained via context_aware_metadata.json.
context_aware_metadata_path (str): The context_aware_metadata.json file path.
certificate_config_path (str): The certificate_config.json file path.

Returns:
Tuple[bool, bytes, bytes, bytes]:
Expand All @@ -297,7 +305,17 @@ def get_client_ssl_credentials(
google.auth.exceptions.ClientCertError: if problems occurs when getting
the cert, key and passphrase.
"""
metadata_path = _check_dca_metadata_path(context_aware_metadata_path)

# 1. Check for certificate config json.
cert_config_path = _check_config_path(certificate_config_path)
if cert_config_path:
# Attempt to retrieve X.509 Workload cert and key.
cert, key = _get_workload_cert_and_key(cert_config_path)
if cert and key:
return True, cert, key, None

# 2. Check for context aware metadata json
metadata_path = _check_config_path(context_aware_metadata_path)

if metadata_path:
metadata_json = _load_json_file(metadata_path)
Expand Down
2 changes: 1 addition & 1 deletion google/auth/transport/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def __init__(self):
self._is_mtls = False
else:
# Load client SSL credentials.
metadata_path = _mtls_helper._check_dca_metadata_path(
metadata_path = _mtls_helper._check_config_path(
_mtls_helper.CONTEXT_AWARE_METADATA_PATH
)
self._is_mtls = metadata_path is not None
Expand Down
17 changes: 13 additions & 4 deletions google/auth/transport/mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,19 @@ def has_default_client_cert_source():
Returns:
bool: indicating if the default client cert source exists.
"""
metadata_path = _mtls_helper._check_dca_metadata_path(
_mtls_helper.CONTEXT_AWARE_METADATA_PATH
)
return metadata_path is not None
if (
_mtls_helper._check_config_path(_mtls_helper.CONTEXT_AWARE_METADATA_PATH)
is not None
):
return True
if (
_mtls_helper._check_config_path(
_mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH
)
is not None
):
return True
return False


def default_client_cert_source():
Expand Down
97 changes: 69 additions & 28 deletions tests/transport/test__mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,15 @@ def test_key(self):
)


class TestCheckaMetadataPath(object):
class TestCheckConfigPath(object):
def test_success(self):
metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
returned_path = _mtls_helper._check_config_path(metadata_path)
assert returned_path is not None

def test_failure(self):
metadata_path = os.path.join(pytest.data_dir, "not_exists.json")
returned_path = _mtls_helper._check_dca_metadata_path(metadata_path)
returned_path = _mtls_helper._check_config_path(metadata_path)
assert returned_path is None


Expand Down Expand Up @@ -275,54 +275,92 @@ def test_popen_raise_exception(self, mock_popen):

class TestGetClientSslCredentials(object):
@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
def test_success(
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_context_aware_metadata(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)
mock_get_workload_cert_and_key.return_value = (None, None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert has_cert
assert cert == b"cert"
assert key == b"key"
assert passphrase is None

@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True
)
def test_success_without_metadata(self, mock_check_dca_metadata_path):
mock_check_dca_metadata_path.return_value = False
@mock.patch(
"google.auth.transport._mtls_helper._get_cert_config_path", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_certificate_config(
self,
mock_check_config_path,
mock_load_json_file,
mock_get_cert_config_path,
mock_read_cert_and_key_files,
):
cert_config_path = "/path/to/config"
mock_check_config_path.return_value = cert_config_path
mock_load_json_file.return_value = {
"cert_configs": {
"workload": {"cert_path": "cert/path", "key_path": "key/path"}
}
}
mock_get_cert_config_path.return_value = cert_config_path
mock_read_cert_and_key_files.return_value = (
pytest.public_cert_bytes,
pytest.private_key_bytes,
)

has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert has_cert
assert cert == pytest.public_cert_bytes
assert key == pytest.private_key_bytes
assert passphrase is None

@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_without_metadata(self, mock_check_config_path):
mock_check_config_path.return_value = False
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials()
assert not has_cert
assert cert is None
assert key is None
assert passphrase is None

@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_success_with_encrypted_key(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase")
mock_get_workload_cert_and_key.return_value = (None, None)
has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials(
generate_encrypted_key=True
)
Expand All @@ -334,33 +372,36 @@ def test_success_with_encrypted_key(
["command", "--with_passphrase"], expect_encrypted_key=True
)

@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
"google.auth.transport._mtls_helper._get_workload_cert_and_key", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_missing_cert_command(
self, mock_check_dca_metadata_path, mock_load_json_file
self,
mock_check_config_path,
mock_load_json_file,
mock_get_workload_cert_and_key,
):
mock_check_dca_metadata_path.return_value = True
mock_check_config_path.return_value = "/path/to/config"
mock_load_json_file.return_value = {}
mock_get_workload_cert_and_key.return_value = (None, None)
with pytest.raises(exceptions.ClientCertError):
_mtls_helper.get_client_ssl_credentials()

@mock.patch(
"google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True)
@mock.patch(
"google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True
)
@mock.patch("google.auth.transport._mtls_helper._check_config_path", autospec=True)
def test_customize_context_aware_metadata_path(
self,
mock_check_dca_metadata_path,
mock_check_config_path,
mock_load_json_file,
mock_run_cert_provider_command,
):
context_aware_metadata_path = "/path/to/metata/data"
mock_check_dca_metadata_path.return_value = context_aware_metadata_path
mock_check_config_path.return_value = context_aware_metadata_path
mock_load_json_file.return_value = {"cert_provider_command": ["command"]}
mock_run_cert_provider_command.return_value = (b"cert", b"key", None)

Expand All @@ -372,7 +413,7 @@ def test_customize_context_aware_metadata_path(
assert cert == b"cert"
assert key == b"key"
assert passphrase is None
mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path)
mock_check_config_path.assert_called_with(context_aware_metadata_path)
mock_load_json_file.assert_called_with(context_aware_metadata_path)


Expand Down Expand Up @@ -520,7 +561,7 @@ def test_default(self, mock_path_exists):
mock_path_exists.return_value = True
returned_path = _mtls_helper._get_cert_config_path()
expected_path = os.path.expanduser(
_mtls_helper._CERTIFICATE_CONFIGURATION_DEFAULT_PATH
_mtls_helper.CERTIFICATE_CONFIGURATION_DEFAULT_PATH
)
assert returned_path == expected_path

Expand Down
Loading