Skip to content

fix: Add type hints for iam api-client samples #9977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions iam/api-client/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import argparse
import os

from google.oauth2 import service_account
import googleapiclient.discovery
from google.oauth2 import service_account # type: ignore
import googleapiclient.discovery # type: ignore


# [START iam_get_policy]
def get_policy(project_id, version=1):
def get_policy(project_id: str, version: int = 1) -> dict:
"""Gets IAM policy for a project."""

credentials = service_account.Credentials.from_service_account_file(
Expand All @@ -52,7 +52,7 @@ def get_policy(project_id, version=1):


# [START iam_modify_policy_add_member]
def modify_policy_add_member(policy, role, member):
def modify_policy_add_member(policy: dict, role: str, member: str) -> dict:
"""Adds a new member to a role binding."""

binding = next(b for b in policy["bindings"] if b["role"] == role)
Expand All @@ -65,7 +65,7 @@ def modify_policy_add_member(policy, role, member):


# [START iam_modify_policy_add_role]
def modify_policy_add_role(policy, role, member):
def modify_policy_add_role(policy: dict, role: str, member: str) -> dict:
"""Adds a new role binding to a policy."""

binding = {"role": role, "members": [member]}
Expand All @@ -78,7 +78,7 @@ def modify_policy_add_role(policy, role, member):


# [START iam_modify_policy_remove_member]
def modify_policy_remove_member(policy, role, member):
def modify_policy_remove_member(policy: dict, role: str, member: str) -> dict:
"""Removes a member from a role binding."""
binding = next(b for b in policy["bindings"] if b["role"] == role)
if "members" in binding and member in binding["members"]:
Expand All @@ -91,7 +91,7 @@ def modify_policy_remove_member(policy, role, member):


# [START iam_set_policy]
def set_policy(project_id, policy):
def set_policy(project_id: str, policy: dict) -> dict:
"""Sets IAM policy for a project."""

credentials = service_account.Credentials.from_service_account_file(
Expand All @@ -115,7 +115,7 @@ def set_policy(project_id, policy):


# [START iam_test_permissions]
def test_permissions(project_id):
def test_permissions(project_id: str) -> dict:
"""Tests IAM permissions of the caller"""

credentials = service_account.Credentials.from_service_account_file(
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_permissions(project_id):
# [END iam_test_permissions]


def main():
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
Expand Down
66 changes: 41 additions & 25 deletions iam/api-client/access_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,85 +13,101 @@
# limitations under the License.

import os
from typing import Iterator
import uuid

from googleapiclient import errors
from googleapiclient import errors # type: ignore
import pytest
from retrying import retry
from retrying import retry # type: ignore

import access
import service_accounts


# Setting up variables for testing
GCLOUD_PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]

# specifying a sample role to be assigned
GCP_ROLE = "roles/owner"


def retry_if_conflict(exception):
return (isinstance(exception, errors.HttpError)
and 'There were concurrent policy changes' in str(exception))
def retry_if_conflict(exception: Exception) -> bool:
return isinstance(
exception, errors.HttpError
) and "There were concurrent policy changes" in str(exception)


@pytest.fixture(scope="module")
def test_member():
def test_member() -> Iterator[str]:
# section to create service account to test policy updates.
# we use the first portion of uuid4 because full version is too long.
name = "python-test-" + str(uuid.uuid4()).split('-')[0]
name = "python-test-" + str(uuid.uuid4()).split("-")[0]
email = name + "@" + GCLOUD_PROJECT + ".iam.gserviceaccount.com"
member = "serviceAccount:" + email
service_accounts.create_service_account(
GCLOUD_PROJECT, name, "Py Test Account"
)
service_accounts.create_service_account(GCLOUD_PROJECT, name, "Py Test Account")

yield member

# deleting the service account created above
service_accounts.delete_service_account(email)


def test_get_policy(capsys):
def test_get_policy(capsys: pytest.LogCaptureFixture) -> None:
access.get_policy(GCLOUD_PROJECT, version=3)
out, _ = capsys.readouterr()
assert "etag" in out


def test_modify_policy_add_role(test_member, capsys):
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_conflict)
def test_call():
def test_modify_policy_add_role(
test_member: str, capsys: pytest.LogCaptureFixture
) -> None:
@retry(
wait_exponential_multiplier=1000,
wait_exponential_max=10000,
stop_max_attempt_number=5,
retry_on_exception=retry_if_conflict,
)
def test_call() -> None:
policy = access.get_policy(GCLOUD_PROJECT, version=3)
access.modify_policy_add_role(policy, GCLOUD_PROJECT, test_member)
out, _ = capsys.readouterr()
assert "etag" in out

test_call()


def test_modify_policy_remove_member(test_member, capsys):
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_conflict)
def test_call():
def test_modify_policy_remove_member(test_member: str, capsys: pytest.LogCaptureFixture) -> None:
@retry(
wait_exponential_multiplier=1000,
wait_exponential_max=10000,
stop_max_attempt_number=5,
retry_on_exception=retry_if_conflict,
)
def test_call() -> None:
policy = access.get_policy(GCLOUD_PROJECT, version=3)
access.modify_policy_remove_member(policy, GCP_ROLE, test_member)
out, _ = capsys.readouterr()
assert "iam.gserviceaccount.com" in out

test_call()


def test_set_policy(capsys):
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000,
stop_max_attempt_number=5, retry_on_exception=retry_if_conflict)
def test_call():
def test_set_policy(capsys: pytest.LogCaptureFixture) -> None:
@retry(
wait_exponential_multiplier=1000,
wait_exponential_max=10000,
stop_max_attempt_number=5,
retry_on_exception=retry_if_conflict,
)
def test_call() -> None:
policy = access.get_policy(GCLOUD_PROJECT, version=3)
access.set_policy(GCLOUD_PROJECT, policy)
out, _ = capsys.readouterr()
assert "etag" in out

test_call()


def test_permissions(capsys):
def test_permissions(capsys: pytest.LogCaptureFixture) -> None:
access.test_permissions(GCLOUD_PROJECT)
out, _ = capsys.readouterr()
assert "permissions" in out
23 changes: 11 additions & 12 deletions iam/api-client/custom_roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import argparse
import os

from google.oauth2 import service_account
import googleapiclient.discovery

from google.oauth2 import service_account # type: ignore
import googleapiclient.discovery # type: ignore

credentials = service_account.Credentials.from_service_account_file(
filename=os.environ['GOOGLE_APPLICATION_CREDENTIALS'],
Expand All @@ -36,7 +35,7 @@


# [START iam_query_testable_permissions]
def query_testable_permissions(resource):
def query_testable_permissions(resource: str) -> None:
"""Lists valid permissions for a resource."""

# pylint: disable=no-member
Expand All @@ -49,7 +48,7 @@ def query_testable_permissions(resource):


# [START iam_get_role]
def get_role(name):
def get_role(name: str) -> None:
"""Gets a role."""

# pylint: disable=no-member
Expand All @@ -61,7 +60,7 @@ def get_role(name):


# [START iam_create_role]
def create_role(name, project, title, description, permissions, stage):
def create_role(name: str, project: str, title: str, description: str, permissions: str, stage: str) -> dict:
"""Creates a role."""

# pylint: disable=no-member
Expand All @@ -83,7 +82,7 @@ def create_role(name, project, title, description, permissions, stage):


# [START iam_edit_role]
def edit_role(name, project, title, description, permissions, stage):
def edit_role(name: str, project: str, title: str, description: str, permissions: str, stage: str) -> dict:
"""Creates a role."""

# pylint: disable=no-member
Expand All @@ -102,7 +101,7 @@ def edit_role(name, project, title, description, permissions, stage):


# [START iam_list_roles]
def list_roles(project_id):
def list_roles(project_id: str) -> None:
"""Lists roles."""

# pylint: disable=no-member
Expand All @@ -114,7 +113,7 @@ def list_roles(project_id):


# [START iam_disable_role]
def disable_role(name, project):
def disable_role(name: str, project: str) -> dict:
"""Disables a role."""

# pylint: disable=no-member
Expand All @@ -130,7 +129,7 @@ def disable_role(name, project):


# [START iam_delete_role]
def delete_role(name, project):
def delete_role(name: str, project: str) -> dict:
"""Deletes a role."""

# pylint: disable=no-member
Expand All @@ -143,7 +142,7 @@ def delete_role(name, project):


# [START iam_undelete_role]
def undelete_role(name, project):
def undelete_role(name: str, project: str) -> dict:
"""Undeletes a role."""

# pylint: disable=no-member
Expand All @@ -158,7 +157,7 @@ def undelete_role(name, project):
# [END iam_undelete_role]


def main():
def main() -> None:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
Expand Down
14 changes: 7 additions & 7 deletions iam/api-client/custom_roles_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
# limitations under the License.

import os
from typing import Iterator
import uuid

import pytest

import custom_roles


GCLOUD_PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]


Expand All @@ -31,7 +31,7 @@
# Since this fixture will throw an exception upon failing to create or delete
# a custom role, there are no separatetests for those activities needed.
@pytest.fixture(scope="module")
def custom_role():
def custom_role() -> Iterator[str]:
role_name = "pythonTestCustomRole" + str(uuid.uuid4().hex)
custom_roles.create_role(
role_name,
Expand All @@ -47,7 +47,7 @@ def custom_role():
custom_roles.delete_role(role_name, GCLOUD_PROJECT)


def test_query_testable_permissions(capsys):
def test_query_testable_permissions(capsys: pytest.CaptureFixture) -> None:
custom_roles.query_testable_permissions(
"//cloudresourcemanager.googleapis.com/projects/" + GCLOUD_PROJECT
)
Expand All @@ -56,19 +56,19 @@ def test_query_testable_permissions(capsys):
assert "\n" in out


def test_list_roles(capsys):
def test_list_roles(capsys: pytest.CaptureFixture) -> None:
custom_roles.list_roles(GCLOUD_PROJECT)
out, _ = capsys.readouterr()
assert "roles/" in out


def test_get_role(capsys):
def test_get_role(capsys: pytest.CaptureFixture) -> None:
custom_roles.get_role("roles/appengine.appViewer")
out, _ = capsys.readouterr()
assert "roles/" in out


def test_edit_role(custom_role, capsys):
def test_edit_role(custom_role: dict, capsys: pytest.CaptureFixture) -> None:
custom_roles.edit_role(
custom_role,
GCLOUD_PROJECT,
Expand All @@ -81,7 +81,7 @@ def test_edit_role(custom_role, capsys):
assert "Updated role:" in out


def test_disable_role(custom_role, capsys):
def test_disable_role(custom_role: dict, capsys: pytest.CaptureFixture) -> None:
custom_roles.disable_role(custom_role, GCLOUD_PROJECT)
out, _ = capsys.readouterr()
assert "Disabled role:" in out
6 changes: 3 additions & 3 deletions iam/api-client/grantable_roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import argparse
import os

from google.oauth2 import service_account
import googleapiclient.discovery
from google.oauth2 import service_account # type: ignore
import googleapiclient.discovery # type: ignore

credentials = service_account.Credentials.from_service_account_file(
filename=os.environ['GOOGLE_APPLICATION_CREDENTIALS'],
Expand All @@ -28,7 +28,7 @@


# [START iam_view_grantable_roles]
def view_grantable_roles(full_resource_name):
def view_grantable_roles(full_resource_name: str) -> None:
roles = service.roles().queryGrantableRoles(body={
'fullResourceName': full_resource_name
}).execute()
Expand Down
4 changes: 3 additions & 1 deletion iam/api-client/grantable_roles_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

import os

import pytest

import grantable_roles


def test_grantable_roles(capsys):
def test_grantable_roles(capsys: pytest.CaptureFixture) -> None:
project = os.environ['GOOGLE_CLOUD_PROJECT']
resource = '//cloudresourcemanager.googleapis.com/projects/' + project
grantable_roles.view_grantable_roles(resource)
Expand Down
Loading