diff --git a/test/e2e/tests/helper.py b/test/e2e/tests/helper.py new file mode 100644 index 00000000..61bb700d --- /dev/null +++ b/test/e2e/tests/helper.py @@ -0,0 +1,106 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the +# License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Helper functions for ec2 tests +""" + +class EC2Validator: + def __init__(self, ec2_client): + self.ec2_client = ec2_client + + def assert_dhcp_options(self, dhcp_options_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_dhcp_options(DhcpOptionsIds=[dhcp_options_id]) + res_found = len(aws_res["DhcpOptions"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_internet_gateway(self, ig_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_internet_gateways(InternetGatewayIds=[ig_id]) + res_found = len(aws_res["InternetGateways"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_route(self, route_table_id: str, gateway_id: str, origin: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_route_tables(RouteTableIds=[route_table_id]) + routes = aws_res["RouteTables"][0]["Routes"] + for route in routes: + if route["Origin"] == origin and route["GatewayId"] == gateway_id: + res_found = True + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_route_table(self, route_table_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_route_tables(RouteTableIds=[route_table_id]) + res_found = len(aws_res["RouteTables"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_security_group(self, sg_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_security_groups(GroupIds=[sg_id]) + res_found = len(aws_res["SecurityGroups"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_subnet(self, subnet_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_subnets(SubnetIds=[subnet_id]) + res_found = len(aws_res["Subnets"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_transit_gateway(self, tgw_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_transit_gateways(TransitGatewayIds=[tgw_id]) + tgw = aws_res["TransitGateways"][0] + # TransitGateway may take awhile to be removed server-side, so + # treat 'deleting' and 'deleted' states as resource no longer existing + res_found = tgw is not None and tgw['State'] != "deleting" and tgw['State'] != "deleted" + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_vpc(self, vpc_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_vpcs(VpcIds=[vpc_id]) + res_found = len(aws_res["Vpcs"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists + + def assert_vpc_endpoint(self, vpc_endpoint_id: str, exists=True): + res_found = False + try: + aws_res = self.ec2_client.describe_vpc_endpoints(VpcEndpointIds=[vpc_endpoint_id]) + res_found = len(aws_res["VpcEndpoints"]) > 0 + except self.ec2_client.exceptions.ClientError: + pass + assert res_found is exists \ No newline at end of file diff --git a/test/e2e/tests/test_dhcp_options.py b/test/e2e/tests/test_dhcp_options.py index 098da813..4d444b25 100755 --- a/test/e2e/tests/test_dhcp_options.py +++ b/test/e2e/tests/test_dhcp_options.py @@ -22,6 +22,7 @@ from acktest.k8s import resource as k8s from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "dhcpoptions" @@ -29,24 +30,6 @@ CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_dhcp_options(ec2_client, dhcp_options_id: str) -> dict: - try: - resp = ec2_client.describe_dhcp_options( - Filters=[{"Name": "dhcp-options-id", "Values": [dhcp_options_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["DhcpOptions"]) == 0: - return None - return resp["DhcpOptions"][0] - - -def dhcp_options_exist(ec2_client, dhcp_options_id: str) -> bool: - return get_dhcp_options(ec2_client, dhcp_options_id) is not None - @service_marker @pytest.mark.canary class TestDhcpOptions: @@ -84,8 +67,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check DHCP Options exists - assert dhcp_options_exist(ec2_client, resource_id) + # Check DHCP Options exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_dhcp_options(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref) @@ -93,5 +77,5 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check DHCP Options doesn't exist - assert not dhcp_options_exist(ec2_client, resource_id) + # Check DHCP Options no longer exists in AWS + ec2_validator.assert_dhcp_options(resource_id, exists=False) \ No newline at end of file diff --git a/test/e2e/tests/test_internet_gateway.py b/test/e2e/tests/test_internet_gateway.py index 0146f328..35b91a68 100644 --- a/test/e2e/tests/test_internet_gateway.py +++ b/test/e2e/tests/test_internet_gateway.py @@ -22,30 +22,13 @@ from acktest.k8s import resource as k8s from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "internetgateways" CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_internet_gateway(ec2_client, ig_id: str) -> dict: - try: - resp = ec2_client.describe_internet_gateways( - Filters=[{"Name": "internet-gateway-id", "Values": [ig_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["InternetGateways"]) == 0: - return None - return resp["InternetGateways"][0] - - -def internet_gateway_exists(ec2_client, ig_id: str) -> bool: - return get_internet_gateway(ec2_client, ig_id) is not None - @service_marker @pytest.mark.canary class TestInternetGateway: @@ -77,9 +60,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check Internet Gateway exists - exists = internet_gateway_exists(ec2_client, resource_id) - assert exists + # Check Internet Gateway exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_internet_gateway(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref, 2, 5) @@ -87,6 +70,5 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check Internet Gateway doesn't exist - exists = internet_gateway_exists(ec2_client, resource_id) - assert not exists \ No newline at end of file + # Check Internet Gateway no longer exists in AWS + ec2_validator.assert_internet_gateway(resource_id, exists=False) \ No newline at end of file diff --git a/test/e2e/tests/test_route_table.py b/test/e2e/tests/test_route_table.py index d322a549..1982c9ab 100644 --- a/test/e2e/tests/test_route_table.py +++ b/test/e2e/tests/test_route_table.py @@ -23,6 +23,7 @@ from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES from e2e.bootstrap_resources import get_bootstrap_resources +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "routetables" @@ -30,44 +31,6 @@ CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_route_table(ec2_client, route_table_id: str) -> dict: - try: - resp = ec2_client.describe_route_tables( - Filters=[{"Name": "route-table-id", "Values": [route_table_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["RouteTables"]) == 0: - return None - return resp["RouteTables"][0] - - -def route_table_exists(ec2_client, route_table_id: str) -> bool: - return get_route_table(ec2_client, route_table_id) is not None - -def get_routes(ec2_client, route_table_id: str) -> list: - try: - resp = ec2_client.describe_route_tables( - Filters=[{"Name": "route-table-id", "Values": [route_table_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["RouteTables"]) == 0: - return None - return resp["RouteTables"][0]["Routes"] - -def route_exists(ec2_client, route_table_id: str, gateway_id: str, origin: str) -> bool: - routes = get_routes(ec2_client, route_table_id) - for route in routes: - if route["Origin"] == origin and route["GatewayId"] == gateway_id: - return True - return False - @service_marker @pytest.mark.canary class TestRouteTable: @@ -107,8 +70,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check Route Table exists - assert route_table_exists(ec2_client, resource_id) + # Check Route Table exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_route_table(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref) @@ -116,9 +80,8 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check Route Table doesn't exist - exists = route_table_exists(ec2_client, resource_id) - assert not exists + # Check Route Table no longer exists in AWS + ec2_validator.assert_route_table(resource_id, exists=False) def test_terminal_condition(self): @@ -189,21 +152,16 @@ def test_crud_route(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check Route Table exists - assert route_table_exists(ec2_client, resource_id) - - # Check Routes exist (default and desired) - routes = get_routes(ec2_client, resource_id) - for route in routes: - if route["GatewayId"] == "local": - default_cidr = route["DestinationCidrBlock"] - assert route["Origin"] == "CreateRouteTable" - elif route["GatewayId"] == igw_id: - assert route["Origin"] == "CreateRoute" - else: - assert False + # Check Route Table exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_route_table(resource_id) + + # Check Routes exist (default and desired) in AWS + ec2_validator.assert_route(resource_id, "local", "CreateRouteTable") + ec2_validator.assert_route(resource_id, igw_id, "CreateRoute") # Update Route + default_cidr = "10.0.0.0/16" updated_cidr = "192.168.1.0/24" patch = {"spec": {"routes": [ { @@ -224,15 +182,6 @@ def test_crud_route(self, ec2_client): # assert patched state resource = k8s.get_resource(ref) assert len(resource['status']['routeStatuses']) == 2 - for route in resource['status']['routeStatuses']: - if route["gatewayID"] == "local": - assert route_exists(ec2_client, resource_id, "local", "CreateRouteTable") - elif route["gatewayID"] == igw_id: - # origin and state are set server-side - assert route_exists(ec2_client, resource_id, igw_id, "CreateRoute") - assert route["state"] == "active" - else: - assert False # Delete Route patch = {"spec": {"routes": [ @@ -248,12 +197,10 @@ def test_crud_route(self, ec2_client): resource = k8s.get_resource(ref) assert len(resource['spec']['routes']) == 1 - for route in resource['spec']['routes']: - if route["gatewayID"] == "local": - assert route_exists(ec2_client, resource_id, "local", "CreateRouteTable") - else: - assert False + # Route should no longer exist in AWS (default will remain) + ec2_validator.assert_route(resource_id, "local", "CreateRouteTable") + ec2_validator.assert_route(resource_id, igw_id, "CreateRoute", exists=False) # Should not be able to delete default route patch = {"spec": {"routes": [ @@ -273,6 +220,5 @@ def test_crud_route(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check Route Table doesn't exist - exists = route_table_exists(ec2_client, resource_id) - assert not exists \ No newline at end of file + # Check Route Table no longer exists in AWS + ec2_validator.assert_route_table(resource_id, exists=False) \ No newline at end of file diff --git a/test/e2e/tests/test_security_group.py b/test/e2e/tests/test_security_group.py index d69f08a5..d7261588 100644 --- a/test/e2e/tests/test_security_group.py +++ b/test/e2e/tests/test_security_group.py @@ -23,31 +23,13 @@ from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES from e2e.bootstrap_resources import get_bootstrap_resources +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "securitygroups" CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_security_group(ec2_client, sg_id: str) -> dict: - try: - resp = ec2_client.describe_security_groups( - GroupIds=[sg_id] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["SecurityGroups"]) == 0: - return None - return resp["SecurityGroups"][0] - - -def security_group_exists(ec2_client, sg_id: str) -> bool: - return get_security_group(ec2_client, sg_id) is not None - - @service_marker @pytest.mark.canary class TestSecurityGroup: @@ -84,8 +66,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check Security Group exists - assert security_group_exists(ec2_client, resource_id) + # Check Security Group exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_security_group(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref) @@ -93,8 +76,8 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check Security Group doesn't exist - assert not security_group_exists(ec2_client, resource_id) + # Check Security Group no longer exists in AWS + ec2_validator.assert_security_group(resource_id, exists=False) def test_terminal_condition(self): test_resource_values = REPLACEMENT_VALUES.copy() diff --git a/test/e2e/tests/test_subnet.py b/test/e2e/tests/test_subnet.py index 433d2358..d8ac2fcb 100644 --- a/test/e2e/tests/test_subnet.py +++ b/test/e2e/tests/test_subnet.py @@ -23,31 +23,13 @@ from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES from e2e.bootstrap_resources import get_bootstrap_resources +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "subnets" CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_subnet(ec2_client, subnet_id: str) -> dict: - try: - resp = ec2_client.describe_subnets( - Filters=[{"Name": "subnet-id", "Values": [subnet_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["Subnets"]) == 0: - return None - return resp["Subnets"][0] - - -def subnet_exists(ec2_client, subnet_id: str) -> bool: - return get_subnet(ec2_client, subnet_id) is not None - - @service_marker @pytest.mark.canary class TestSubnet: @@ -85,9 +67,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check Subnet exists - exists = subnet_exists(ec2_client, resource_id) - assert exists + # Check Subnet exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_subnet(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref) @@ -95,9 +77,8 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check Subnet doesn't exist - exists = subnet_exists(ec2_client, resource_id) - assert not exists + # Check Subnet no longer exists in AWS + ec2_validator.assert_subnet(resource_id, exists=False) def test_terminal_condition(self): test_resource_values = REPLACEMENT_VALUES.copy() diff --git a/test/e2e/tests/test_transit_gateway.py b/test/e2e/tests/test_transit_gateway.py index 0688619f..76fad706 100644 --- a/test/e2e/tests/test_transit_gateway.py +++ b/test/e2e/tests/test_transit_gateway.py @@ -23,6 +23,7 @@ from acktest.k8s import resource as k8s from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "transitgateways" @@ -31,29 +32,6 @@ CREATE_WAIT_AFTER_SECONDS = 90 DELETE_WAIT_AFTER_SECONDS = 10 -@pytest.fixture(scope="module") -def ec2_client(): - return boto3.client("ec2") - - -def get_tgw(ec2_client, tgw_id: str) -> dict: - try: - resp = ec2_client.describe_transit_gateways( - TransitGatewayIds=[tgw_id] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["TransitGateways"]) == 0: - return None - return resp["TransitGateways"][0] - - -def tgw_exists(ec2_client, tgw_id: str) -> bool: - tgw = get_tgw(ec2_client, tgw_id) - return tgw is not None and tgw['State'] != "deleting" and tgw['State'] != "deleted" - @service_marker @pytest.mark.canary class TestTGW: @@ -85,9 +63,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check TGW exists - exists = tgw_exists(ec2_client, resource_id) - assert exists + # Check TGW exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_transit_gateway(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref, 2, 5) @@ -95,6 +73,5 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check TGW doesn't exist - exists = tgw_exists(ec2_client, resource_id) - assert not exists \ No newline at end of file + # Check TGW no longer exists in AWS + ec2_validator.assert_transit_gateway(resource_id, exists=False) \ No newline at end of file diff --git a/test/e2e/tests/test_vpc.py b/test/e2e/tests/test_vpc.py index 069ab34f..7d5763d5 100644 --- a/test/e2e/tests/test_vpc.py +++ b/test/e2e/tests/test_vpc.py @@ -22,6 +22,7 @@ from acktest.k8s import resource as k8s from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES +from e2e.tests.helper import EC2Validator RESOURCE_PLURAL = "vpcs" @@ -29,24 +30,6 @@ DELETE_WAIT_AFTER_SECONDS = 10 MODIFY_WAIT_AFTER_SECONDS = 5 - -def get_vpc(ec2_client, vpc_id: str) -> dict: - try: - resp = ec2_client.describe_vpcs( - Filters=[{"Name": "vpc-id", "Values": [vpc_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["Vpcs"]) == 0: - return None - return resp["Vpcs"][0] - - -def vpc_exists(ec2_client, vpc_id: str) -> bool: - return get_vpc(ec2_client, vpc_id) is not None - def get_vpc_attribute(ec2_client, vpc_id: str, attribute_name: str) -> dict: return ec2_client.describe_vpc_attribute(Attribute=attribute_name, VpcId=vpc_id) @@ -90,9 +73,9 @@ def test_create_delete(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check VPC exists - exists = vpc_exists(ec2_client, resource_id) - assert exists + # Check VPC exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_vpc(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref, 2, 5) @@ -100,9 +83,8 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check VPC doesn't exist - exists = vpc_exists(ec2_client, resource_id) - assert not exists + # Check VPC no longer exists in AWS + ec2_validator.assert_vpc(resource_id, exists=False) def test_enable_attributes(self, ec2_client): resource_name = random_suffix_name("vpc-ack-test", 24) @@ -135,9 +117,9 @@ def test_enable_attributes(self, ec2_client): time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check VPC exists - exists = vpc_exists(ec2_client, resource_id) - assert exists + # Check VPC exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_vpc(resource_id) # Assert the attributes are set correctly assert get_dns_support(ec2_client, resource_id) @@ -171,6 +153,5 @@ def test_enable_attributes(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check VPC doesn't exist - exists = vpc_exists(ec2_client, resource_id) - assert not exists \ No newline at end of file + # Check VPC no longer exists in AWS + ec2_validator.assert_vpc(resource_id, exists=False) diff --git a/test/e2e/tests/test_vpc_endpoint.py b/test/e2e/tests/test_vpc_endpoint.py index 938df9bb..054d17e0 100644 --- a/test/e2e/tests/test_vpc_endpoint.py +++ b/test/e2e/tests/test_vpc_endpoint.py @@ -24,6 +24,7 @@ from e2e import service_marker, CRD_GROUP, CRD_VERSION, load_ec2_resource from e2e.replacement_values import REPLACEMENT_VALUES from e2e.bootstrap_resources import get_bootstrap_resources +from e2e.tests.helper import EC2Validator # Default to us-west-2 since that's where prow is deployed REGION = "us-west-2" if environ.get('AWS_DEFAULT_REGION') is None else environ.get('AWS_DEFAULT_REGION') @@ -33,24 +34,6 @@ CREATE_WAIT_AFTER_SECONDS = 10 DELETE_WAIT_AFTER_SECONDS = 10 - -def get_vpc_endpoint(ec2_client, vpc_endpoint_id: str) -> dict: - try: - resp = ec2_client.describe_vpc_endpoints( - Filters=[{"Name": "vpc-endpoint-id", "Values": [vpc_endpoint_id]}] - ) - except Exception as e: - logging.debug(e) - return None - - if len(resp["VpcEndpoints"]) == 0: - return None - return resp["VpcEndpoints"][0] - - -def vpc_endpoint_exists(ec2_client, vpc_endpoint_id: str) -> bool: - return get_vpc_endpoint(ec2_client, vpc_endpoint_id) is not None - @service_marker @pytest.mark.canary class TestVpcEndpoint: @@ -83,14 +66,13 @@ def test_create_delete(self, ec2_client): assert k8s.get_resource_exists(ref) resource = k8s.get_resource(ref) - vpc_endpoint_services = ec2_client.describe_vpc_endpoint_services() resource_id = resource["status"]["vpcEndpointID"] time.sleep(CREATE_WAIT_AFTER_SECONDS) - # Check VPC Endpoint exists - exists = vpc_endpoint_exists(ec2_client, resource_id) - assert exists + # Check VPC Endpoint exists in AWS + ec2_validator = EC2Validator(ec2_client) + ec2_validator.assert_vpc_endpoint(resource_id) # Delete k8s resource _, deleted = k8s.delete_custom_resource(ref) @@ -98,9 +80,8 @@ def test_create_delete(self, ec2_client): time.sleep(DELETE_WAIT_AFTER_SECONDS) - # Check VPC Endpoint doesn't exist - exists = vpc_endpoint_exists(ec2_client, resource_id) - assert not exists + # Check VPC Endpoint no longer exists in AWS + ec2_validator.assert_vpc_endpoint(resource_id, exists=False) def test_terminal_condition_malformed_vpc(self): test_resource_values = REPLACEMENT_VALUES.copy()