-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathcondition.py
138 lines (113 loc) · 4.73 KB
/
condition.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may
# not use this file except in compliance with the License. A copy of the
# License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is distributed
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Utility functions to help processing Kubernetes resource conditions"""
import pytest
from . import resource
CONDITION_TYPE_ADOPTED = "ACK.Adopted"
CONDITION_TYPE_RESOURCE_SYNCED = "ACK.ResourceSynced"
CONDITION_TYPE_TERMINAL = "ACK.Terminal"
CONDITION_TYPE_RECOVERABLE = "ACK.Recoverable"
CONDITION_TYPE_ADVISORY = "ACK.Advisory"
CONDITION_TYPE_LATE_INITIALIZED = "ACK.LateInitialized"
CONDITION_TYPE_REFERENCES_RESOLVED = "ACK.ReferencesResolved"
def assert_type_status(
ref: resource.CustomResourceReference,
cond_type_match: str = CONDITION_TYPE_RESOURCE_SYNCED,
cond_status_match: bool = True,
):
"""Asserts that the supplied resource has a condition of type
ACK.ResourceSynced and that the Status of this condition is True.
Usage:
from acktest.k8s import resource
from acktest.k8s import condition
ref = resource.CustomResourceReference(
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL,
db_cluster_id, namespace="default",
)
resource.create_custom_resource(ref, resource_data)
resource.wait_resource_consumed_by_controller(ref)
condition.assert_type_status(
ref,
condition.CONDITION_TYPE_RESOURCE_SYNCED,
False)
Raises:
pytest.fail when condition of the specified type is not found or is not
in the supplied status.
"""
cond = resource.get_resource_condition(ref, cond_type_match)
if cond is None:
msg = (f"Failed to find {cond_type_match} condition in "
f"resource {ref}")
pytest.fail(msg)
cond_status = cond.get('status', None)
if str(cond_status) != str(cond_status_match):
msg = (f"Expected {cond_type_match} condition to "
f"have status {cond_status_match} but found {cond_status}")
pytest.fail(msg)
def assert_synced_status(
ref: resource.CustomResourceReference,
cond_status_match: bool,
):
"""Asserts that the supplied resource has a condition of type
ACK.ResourceSynced and that the Status of this condition is True.
Usage:
from acktest.k8s import resource
from acktest.k8s import condition
ref = resource.CustomResourceReference(
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL,
db_cluster_id, namespace="default",
)
resource.create_custom_resource(ref, resource_data)
resource.wait_resource_consumed_by_controller(ref)
condition.assert_synced_status(ref, False)
Raises:
pytest.fail when ACK.ResourceSynced condition is not found or is not in
a True status.
"""
assert_type_status(ref, CONDITION_TYPE_RESOURCE_SYNCED, cond_status_match)
def assert_synced(ref: resource.CustomResourceReference):
"""Asserts that the supplied resource has a condition of type
ACK.ResourceSynced and that the Status of this condition is True.
Usage:
from acktest.k8s import resource
from acktest.k8s import condition
ref = resource.CustomResourceReference(
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL,
db_cluster_id, namespace="default",
)
resource.create_custom_resource(ref, resource_data)
resource.wait_resource_consumed_by_controller(ref)
condition.assert_synced(ref)
Raises:
pytest.fail when ACK.ResourceSynced condition is not found or is not in
a True status.
"""
return assert_synced_status(ref, True)
def assert_not_synced(ref: resource.CustomResourceReference):
"""Asserts that the supplied resource has a condition of type
ACK.ResourceSynced and that the Status of this condition is False.
Usage:
from acktest.k8s import resource
from acktest.k8s import condition
ref = resource.CustomResourceReference(
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL,
db_cluster_id, namespace="default",
)
resource.create_custom_resource(ref, resource_data)
resource.wait_resource_consumed_by_controller(ref)
condition.assert_not_synced(ref)
Raises:
pytest.fail when ACK.ResourceSynced condition is not found or is not in
a False status.
"""
return assert_synced_status(ref, False)