forked from aws-controllers-k8s/sagemaker-controller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_feature_group.py
159 lines (133 loc) · 5.26 KB
/
test_feature_group.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may
# not use this file except in compliance with the License. A copy of the
# License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Integration tests for the SageMaker Endpoint API.
"""
import boto3
import botocore
import pytest
import logging
from typing import Dict
from acktest.resources import random_suffix_name
from acktest.k8s import resource as k8s
from e2e import (
service_marker,
create_sagemaker_resource,
delete_custom_resource,
wait_for_status,
sagemaker_client,
assert_tags_in_sync,
)
from e2e.replacement_values import REPLACEMENT_VALUES
RESOURCE_NAME_PREFIX = "feature-group"
RESOURCE_PLURAL = "featuregroups"
SPEC_FILE = "feature_group"
FEATURE_GROUP_STATUS_CREATING = "Creating"
FEATURE_GROUP_STATUS_CREATED = "Created"
# longer wait is used because we sometimes see server taking time to create/delete
WAIT_PERIOD_COUNT = 9
WAIT_PERIOD_LENGTH = 30
STATUS = "status"
RESOURCE_STATUS = "featureGroupStatus"
@pytest.fixture(scope="module")
def feature_group():
"""Creates a feature group from a SPEC_FILE."""
feature_group_name = random_suffix_name(RESOURCE_NAME_PREFIX, 32)
replacements = REPLACEMENT_VALUES.copy()
replacements["FEATURE_GROUP_NAME"] = feature_group_name
reference, spec, resource = create_sagemaker_resource(
resource_plural=RESOURCE_PLURAL,
resource_name=feature_group_name,
spec_file=SPEC_FILE,
replacements=replacements,
)
assert resource is not None
yield (reference, resource)
# Delete the k8s resource if not already deleted by tests
assert delete_custom_resource(reference, WAIT_PERIOD_COUNT, WAIT_PERIOD_LENGTH)
def get_sagemaker_feature_group(feature_group_name: str):
"""Used to check if there is an existing feature group with a given feature_group_name."""
try:
return sagemaker_client().describe_feature_group(
FeatureGroupName=feature_group_name
)
except botocore.exceptions.ClientError as error:
logging.error(
f"SageMaker could not find a feature group with the name {feature_group_name}. Error {error}"
)
return None
def get_feature_group_status(feature_group_name: str):
feature_group_describe_response = get_sagemaker_feature_group(feature_group_name)
return feature_group_describe_response["FeatureGroupStatus"]
def get_resource_feature_group_status(reference: k8s.CustomResourceReference):
resource = k8s.get_resource(reference)
assert RESOURCE_STATUS in resource[STATUS]
return resource[STATUS][RESOURCE_STATUS]
@service_marker
@pytest.mark.canary
class TestFeatureGroup:
def _wait_feature_group_status(
self,
feature_group_name,
expected_status: str,
wait_periods: int = WAIT_PERIOD_COUNT,
period_length: int = WAIT_PERIOD_LENGTH,
):
return wait_for_status(
expected_status,
wait_periods,
period_length,
get_feature_group_status,
feature_group_name,
)
def _wait_resource_feature_group_status(
self,
reference: k8s.CustomResourceReference,
expected_status: str,
wait_periods: int = WAIT_PERIOD_COUNT,
period_length: int = WAIT_PERIOD_LENGTH,
):
return wait_for_status(
expected_status,
wait_periods,
period_length,
get_resource_feature_group_status,
reference,
)
def _assert_feature_group_status_in_sync(
self, feature_group_name, reference, expected_status
):
assert (
self._wait_feature_group_status(feature_group_name, expected_status)
== self._wait_resource_feature_group_status(reference, expected_status)
== expected_status
)
def test_create_feature_group(self, feature_group):
"""Tests that a feature group can be created and deleted
using the Feature Group Controller.
"""
(reference, resource) = feature_group
assert k8s.get_resource_exists(reference)
feature_group_name = resource["spec"].get("featureGroupName", None)
assert feature_group_name is not None
feature_group_sm_desc = get_sagemaker_feature_group(feature_group_name)
feature_group_arn = feature_group_sm_desc["FeatureGroupArn"]
assert k8s.get_resource_arn(resource) == feature_group_arn
self._assert_feature_group_status_in_sync(
feature_group_name, reference, FEATURE_GROUP_STATUS_CREATED
)
assert k8s.wait_on_condition(reference, "ACK.ResourceSynced", "True")
resource_tags = resource["spec"].get("tags", None)
assert_tags_in_sync(feature_group_arn, resource_tags)
# Delete the k8s resource.
assert delete_custom_resource(reference, WAIT_PERIOD_COUNT, WAIT_PERIOD_LENGTH)
assert get_sagemaker_feature_group(feature_group_name) is None