Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions packages/google-auth/google/auth/_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
import re
import stat
import time
from urllib.parse import quote, urlparse

Expand Down Expand Up @@ -58,8 +59,20 @@


def _is_certificate_file_ready(path):
"""Checks if a file exists and is not empty."""
return path and os.path.exists(path) and os.path.getsize(path) > 0
"""Checks if a file exists, is a regular file, and is not empty."""
if not path:
return False
try:
# Check if the path points to a regular file and is not empty.
# stat.S_ISREG is used instead of os.path.isfile to avoid swallowing
# PermissionError exceptions, which the caller needs to propagate.
st = os.stat(path)
return stat.S_ISREG(st.st_mode) and st.st_size > 0
except PermissionError:
# Propagate PermissionError to let caller handle it (fail-fast or fallback)
raise
except OSError:
return False


def get_agent_identity_certificate_path():
Expand Down Expand Up @@ -148,6 +161,13 @@ def get_agent_identity_certificate_path():
)
has_logged_cert_warning = True

except PermissionError as e:
Comment thread
lsirac marked this conversation as resolved.
_LOGGER.warning(
"Permission denied when accessing certificate config or certificate file: %s. "
"Token binding protection cannot be enabled. Falling back to unbound tokens.",
e,
)
return None
Comment thread
lsirac marked this conversation as resolved.
except (IOError, ValueError, KeyError) as e:
if cert_config_path and os.path.exists(cert_config_path):
# If the file exists but has invalid JSON or is unreadable,
Expand Down Expand Up @@ -205,8 +225,17 @@ def get_and_parse_agent_identity_certificate():
if not cert_path:
return None

with open(cert_path, "rb") as cert_file:
cert_bytes = cert_file.read()
try:
with open(cert_path, "rb") as cert_file:
cert_bytes = cert_file.read()
except OSError as e:

@lsirac lsirac Jun 26, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the fallback limited to PermissionError?

_LOGGER.warning(
"Failed to read agent identity certificate file at %s: %s. "
"Token binding protection cannot be enabled. Falling back to unbound tokens.",
cert_path,
e,
)
return None

return parse_certificate(cert_bytes)

Expand Down
6 changes: 3 additions & 3 deletions packages/google-auth/tests/compute_engine/test__mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ def test__parse_mds_mode_invalid(monkeypatch):
_mtls._parse_mds_mode()


@mock.patch("os.path.exists")
@mock.patch("google.auth.compute_engine._mtls.os.path.exists")
def test__certs_exist_true(mock_exists, mock_mds_mtls_config):
mock_exists.return_value = True
assert _mtls._certs_exist(mock_mds_mtls_config) is True


@mock.patch("os.path.exists")
@mock.patch("google.auth.compute_engine._mtls.os.path.exists")
def test__certs_exist_false(mock_exists, mock_mds_mtls_config):
mock_exists.return_value = False
assert _mtls._certs_exist(mock_mds_mtls_config) is False
Expand All @@ -101,7 +101,7 @@ def test__certs_exist_false(mock_exists, mock_mds_mtls_config):
("default", False, False),
],
)
@mock.patch("os.path.exists")
@mock.patch("google.auth.compute_engine._mtls.os.path.exists")
def test_should_use_mds_mtls(
mock_exists, monkeypatch, mtls_mode, certs_exist, expected_result
):
Expand Down
87 changes: 82 additions & 5 deletions packages/google-auth/tests/test_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ def test_parse_certificate(self, mock_load_cert):
mock_load_cert.assert_called_once_with(b"cert_bytes")
assert result == mock_load_cert.return_value

@mock.patch("google.auth._agent_identity_utils.os.stat")
def test_is_certificate_file_ready_permission_error(self, mock_stat):
mock_stat.side_effect = PermissionError("Permission denied")
with pytest.raises(PermissionError):
_agent_identity_utils._is_certificate_file_ready("/path/to/cert")

@mock.patch("google.auth._agent_identity_utils.os.stat")
def test_is_certificate_file_ready_os_error(self, mock_stat):
mock_stat.side_effect = OSError("Not found")
# Should swallow the OSError and return False
result = _agent_identity_utils._is_certificate_file_ready("/path/to/cert")
assert result is False

@mock.patch("google.auth._agent_identity_utils.os.stat")
def test_is_certificate_file_ready_not_a_file(self, mock_stat):
import stat

mock_stat.return_value = mock.MagicMock(st_mode=stat.S_IFDIR, st_size=4096)
result = _agent_identity_utils._is_certificate_file_ready("/path/to/cert")
assert result is False

def test__is_agent_identity_certificate_invalid(self):
cert = _agent_identity_utils.parse_certificate(NON_AGENT_IDENTITY_CERT_BYTES)
assert not _agent_identity_utils._is_agent_identity_certificate(cert)
Expand Down Expand Up @@ -230,7 +251,7 @@ def test_get_agent_identity_certificate_path_workstation_fail_fast(
assert result is None

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_cert_not_found(
self, mock_exists, mock_sleep, tmpdir, monkeypatch
):
Expand Down Expand Up @@ -320,7 +341,7 @@ def test_get_agent_identity_certificate_path_workload_config_missing_cert_path(
mock_sleep.assert_not_called()

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
@mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready")
def test_get_agent_identity_certificate_path_no_config_but_has_well_known_dir(
self, mock_is_ready, mock_exists, mock_sleep, monkeypatch
Expand All @@ -340,7 +361,7 @@ def test_get_agent_identity_certificate_path_no_config_but_has_well_known_dir(
mock_sleep.assert_not_called()

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_no_config_no_well_known_dir(
self, mock_exists, mock_sleep, monkeypatch
):
Expand All @@ -358,7 +379,7 @@ def test_get_agent_identity_certificate_path_no_config_no_well_known_dir(
mock_sleep.assert_not_called()

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
@mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready")
def test_get_agent_identity_certificate_path_no_config_well_known_polling_success(
self, mock_is_ready, mock_exists, mock_sleep, monkeypatch
Expand All @@ -377,7 +398,7 @@ def test_get_agent_identity_certificate_path_no_config_well_known_polling_succes
assert mock_sleep.call_count == 1

@mock.patch("time.sleep")
@mock.patch("os.path.exists")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
@mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready")
def test_get_agent_identity_certificate_path_no_config_well_known_polling_timeout(
self, mock_is_ready, mock_exists, mock_sleep, monkeypatch
Expand All @@ -395,6 +416,45 @@ def test_get_agent_identity_certificate_path_no_config_well_known_polling_timeou

assert mock_sleep.call_count == len(_agent_identity_utils._POLLING_INTERVALS)

@mock.patch("time.sleep")
@mock.patch("google.auth._agent_identity_utils._is_certificate_file_ready")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_permission_error_well_known(
self, mock_exists, mock_is_ready, mock_sleep, monkeypatch
):
monkeypatch.delenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, raising=False
)
mock_exists.return_value = True
mock_is_ready.side_effect = PermissionError("Permission denied")

# It should fail-fast and return None immediately
result = _agent_identity_utils.get_agent_identity_certificate_path()
assert result is None
mock_sleep.assert_not_called()

@mock.patch("time.sleep")
@mock.patch("google.auth._agent_identity_utils.os.path.exists")
def test_get_agent_identity_certificate_path_permission_error_config(
self, mock_exists, mock_sleep, tmpdir, monkeypatch
):
config_path = tmpdir.join("config.json")
monkeypatch.setenv(
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, str(config_path)
)
# Mock os.path.exists so ECP workstation fail-fast is not triggered
mock_exists.return_value = True

# Mocking open to raise PermissionError
mock_open = mock.mock_open()
mock_open.side_effect = PermissionError("Permission denied")

with mock.patch("builtins.open", mock_open):
result = _agent_identity_utils.get_agent_identity_certificate_path()

assert result is None
mock_sleep.assert_not_called()

@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_opted_out(
self, mock_get_path, monkeypatch
Expand Down Expand Up @@ -439,6 +499,23 @@ def test_get_and_parse_agent_identity_certificate_success(
mock_parse_certificate.assert_called_once_with(b"cert_bytes")
assert result == mock_parse_certificate.return_value

@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
def test_get_and_parse_agent_identity_certificate_file_read_error(
self, mock_get_path, monkeypatch
):
monkeypatch.setenv(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
)
mock_get_path.return_value = "/fake/cert.pem"
mock_open = mock.mock_open()
mock_open.side_effect = PermissionError("Permission denied")

with mock.patch("builtins.open", mock_open):
result = _agent_identity_utils.get_and_parse_agent_identity_certificate()

assert result is None

def test_get_cached_cert_fingerprint_no_cert(self):
with pytest.raises(ValueError, match="mTLS connection is not configured."):
_agent_identity_utils.get_cached_cert_fingerprint(None)
Expand Down
Loading