-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathmanager.go
384 lines (353 loc) · 13.6 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.
// Code generated by ack-generate. DO NOT EDIT.
package global_table
import (
"context"
"fmt"
"time"
ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
ackcondition "github.com/aws-controllers-k8s/runtime/pkg/condition"
ackcfg "github.com/aws-controllers-k8s/runtime/pkg/config"
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
ackmetrics "github.com/aws-controllers-k8s/runtime/pkg/metrics"
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
ackrt "github.com/aws-controllers-k8s/runtime/pkg/runtime"
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
acktags "github.com/aws-controllers-k8s/runtime/pkg/tags"
acktypes "github.com/aws-controllers-k8s/runtime/pkg/types"
ackutil "github.com/aws-controllers-k8s/runtime/pkg/util"
"github.com/aws/aws-sdk-go-v2/aws"
svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
)
var (
_ = ackutil.InStrings
_ = acktags.NewTags()
_ = ackrt.MissingImageTagValue
_ = svcapitypes.GlobalTable{}
)
// +kubebuilder:rbac:groups=dynamodb.services.k8s.aws,resources=globaltables,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=dynamodb.services.k8s.aws,resources=globaltables/status,verbs=get;update;patch
var lateInitializeFieldNames = []string{}
// resourceManager is responsible for providing a consistent way to perform
// CRUD operations in a backend AWS service API for Book custom resources.
type resourceManager struct {
// cfg is a copy of the ackcfg.Config object passed on start of the service
// controller
cfg ackcfg.Config
// clientcfg is a copy of the client configuration passed on start of the
// service controller
clientcfg aws.Config
// log refers to the logr.Logger object handling logging for the service
// controller
log logr.Logger
// metrics contains a collection of Prometheus metric objects that the
// service controller and its reconcilers track
metrics *ackmetrics.Metrics
// rr is the Reconciler which can be used for various utility
// functions such as querying for Secret values given a SecretReference
rr acktypes.Reconciler
// awsAccountID is the AWS account identifier that contains the resources
// managed by this resource manager
awsAccountID ackv1alpha1.AWSAccountID
// The AWS Region that this resource manager targets
awsRegion ackv1alpha1.AWSRegion
// sdk is a pointer to the AWS service API client exposed by the
// aws-sdk-go-v2/services/{alias} package.
sdkapi *svcsdk.Client
}
// concreteResource returns a pointer to a resource from the supplied
// generic AWSResource interface
func (rm *resourceManager) concreteResource(
res acktypes.AWSResource,
) *resource {
// cast the generic interface into a pointer type specific to the concrete
// implementing resource type managed by this resource manager
return res.(*resource)
}
// ReadOne returns the currently-observed state of the supplied AWSResource in
// the backend AWS service API.
func (rm *resourceManager) ReadOne(
ctx context.Context,
res acktypes.AWSResource,
) (acktypes.AWSResource, error) {
r := rm.concreteResource(res)
if r.ko == nil {
// Should never happen... if it does, it's buggy code.
panic("resource manager's ReadOne() method received resource with nil CR object")
}
observed, err := rm.sdkFind(ctx, r)
mirrorAWSTags(r, observed)
if err != nil {
if observed != nil {
return rm.onError(observed, err)
}
return rm.onError(r, err)
}
return rm.onSuccess(observed)
}
// Create attempts to create the supplied AWSResource in the backend AWS
// service API, returning an AWSResource representing the newly-created
// resource
func (rm *resourceManager) Create(
ctx context.Context,
res acktypes.AWSResource,
) (acktypes.AWSResource, error) {
r := rm.concreteResource(res)
if r.ko == nil {
// Should never happen... if it does, it's buggy code.
panic("resource manager's Create() method received resource with nil CR object")
}
created, err := rm.sdkCreate(ctx, r)
if err != nil {
if created != nil {
return rm.onError(created, err)
}
return rm.onError(r, err)
}
return rm.onSuccess(created)
}
// Update attempts to mutate the supplied desired AWSResource in the backend AWS
// service API, returning an AWSResource representing the newly-mutated
// resource.
// Note for specialized logic implementers can check to see how the latest
// observed resource differs from the supplied desired state. The
// higher-level reonciler determines whether or not the desired differs
// from the latest observed and decides whether to call the resource
// manager's Update method
func (rm *resourceManager) Update(
ctx context.Context,
resDesired acktypes.AWSResource,
resLatest acktypes.AWSResource,
delta *ackcompare.Delta,
) (acktypes.AWSResource, error) {
desired := rm.concreteResource(resDesired)
latest := rm.concreteResource(resLatest)
if desired.ko == nil || latest.ko == nil {
// Should never happen... if it does, it's buggy code.
panic("resource manager's Update() method received resource with nil CR object")
}
updated, err := rm.sdkUpdate(ctx, desired, latest, delta)
if err != nil {
if updated != nil {
return rm.onError(updated, err)
}
return rm.onError(latest, err)
}
return rm.onSuccess(updated)
}
// Delete attempts to destroy the supplied AWSResource in the backend AWS
// service API, returning an AWSResource representing the
// resource being deleted (if delete is asynchronous and takes time)
func (rm *resourceManager) Delete(
ctx context.Context,
res acktypes.AWSResource,
) (acktypes.AWSResource, error) {
r := rm.concreteResource(res)
if r.ko == nil {
// Should never happen... if it does, it's buggy code.
panic("resource manager's Update() method received resource with nil CR object")
}
observed, err := rm.sdkDelete(ctx, r)
if err != nil {
if observed != nil {
return rm.onError(observed, err)
}
return rm.onError(r, err)
}
return rm.onSuccess(observed)
}
// ARNFromName returns an AWS Resource Name from a given string name. This
// is useful for constructing ARNs for APIs that require ARNs in their
// GetAttributes operations but all we have (for new CRs at least) is a
// name for the resource
func (rm *resourceManager) ARNFromName(name string) string {
return fmt.Sprintf(
"arn:aws:dynamodb:%s:%s:%s",
rm.awsRegion,
rm.awsAccountID,
name,
)
}
// LateInitialize returns an acktypes.AWSResource after setting the late initialized
// fields from the readOne call. This method will initialize the optional fields
// which were not provided by the k8s user but were defaulted by the AWS service.
// If there are no such fields to be initialized, the returned object is similar to
// object passed in the parameter.
func (rm *resourceManager) LateInitialize(
ctx context.Context,
latest acktypes.AWSResource,
) (acktypes.AWSResource, error) {
rlog := ackrtlog.FromContext(ctx)
// If there are no fields to late initialize, do nothing
if len(lateInitializeFieldNames) == 0 {
rlog.Debug("no late initialization required.")
return latest, nil
}
latestCopy := latest.DeepCopy()
lateInitConditionReason := ""
lateInitConditionMessage := ""
observed, err := rm.ReadOne(ctx, latestCopy)
if err != nil {
lateInitConditionMessage = "Unable to complete Read operation required for late initialization"
lateInitConditionReason = "Late Initialization Failure"
ackcondition.SetLateInitialized(latestCopy, corev1.ConditionFalse, &lateInitConditionMessage, &lateInitConditionReason)
ackcondition.SetSynced(latestCopy, corev1.ConditionFalse, nil, nil)
return latestCopy, err
}
lateInitializedRes := rm.lateInitializeFromReadOneOutput(observed, latestCopy)
incompleteInitialization := rm.incompleteLateInitialization(lateInitializedRes)
if incompleteInitialization {
// Add the condition with LateInitialized=False
lateInitConditionMessage = "Late initialization did not complete, requeuing with delay of 5 seconds"
lateInitConditionReason = "Delayed Late Initialization"
ackcondition.SetLateInitialized(lateInitializedRes, corev1.ConditionFalse, &lateInitConditionMessage, &lateInitConditionReason)
ackcondition.SetSynced(lateInitializedRes, corev1.ConditionFalse, nil, nil)
return lateInitializedRes, ackrequeue.NeededAfter(nil, time.Duration(5)*time.Second)
}
// Set LateInitialized condition to True
lateInitConditionMessage = "Late initialization successful"
lateInitConditionReason = "Late initialization successful"
ackcondition.SetLateInitialized(lateInitializedRes, corev1.ConditionTrue, &lateInitConditionMessage, &lateInitConditionReason)
return lateInitializedRes, nil
}
// incompleteLateInitialization return true if there are fields which were supposed to be
// late initialized but are not. If all the fields are late initialized, false is returned
func (rm *resourceManager) incompleteLateInitialization(
res acktypes.AWSResource,
) bool {
return false
}
// lateInitializeFromReadOneOutput late initializes the 'latest' resource from the 'observed'
// resource and returns 'latest' resource
func (rm *resourceManager) lateInitializeFromReadOneOutput(
observed acktypes.AWSResource,
latest acktypes.AWSResource,
) acktypes.AWSResource {
return latest
}
// IsSynced returns true if the resource is synced.
func (rm *resourceManager) IsSynced(ctx context.Context, res acktypes.AWSResource) (bool, error) {
r := rm.concreteResource(res)
if r.ko == nil {
// Should never happen... if it does, it's buggy code.
panic("resource manager's IsSynced() method received resource with nil CR object")
}
if r.ko.Status.GlobalTableStatus == nil {
return false, nil
}
globalTableStatusCandidates := []string{"ACTIVE"}
if !ackutil.InStrings(*r.ko.Status.GlobalTableStatus, globalTableStatusCandidates) {
return false, nil
}
return true, nil
}
// EnsureTags ensures that tags are present inside the AWSResource.
// If the AWSResource does not have any existing resource tags, the 'tags'
// field is initialized and the controller tags are added.
// If the AWSResource has existing resource tags, then controller tags are
// added to the existing resource tags without overriding them.
// If the AWSResource does not support tags, only then the controller tags
// will not be added to the AWSResource.
func (rm *resourceManager) EnsureTags(
ctx context.Context,
res acktypes.AWSResource,
md acktypes.ServiceControllerMetadata,
) error {
return nil
}
// FilterAWSTags ignores tags that have keys that start with "aws:"
// is needed to ensure the controller does not attempt to remove
// tags set by AWS. This function needs to be called after each Read
// operation.
// Eg. resources created with cloudformation have tags that cannot be
// removed by an ACK controller
func (rm *resourceManager) FilterSystemTags(res acktypes.AWSResource) {
}
// mirrorAWSTags ensures that AWS tags are included in the desired resource
// if they are present in the latest resource. This will ensure that the
// aws tags are not present in a diff. The logic of the controller will
// ensure these tags aren't patched to the resource in the cluster, and
// will only be present to make sure we don't try to remove these tags.
//
// Although there are a lot of similarities between this function and
// EnsureTags, they are very much different.
// While EnsureTags tries to make sure the resource contains the controller
// tags, mirrowAWSTags tries to make sure tags injected by AWS are mirrored
// from the latest resoruce to the desired resource.
func mirrorAWSTags(a *resource, b *resource) {
}
// newResourceManager returns a new struct implementing
// acktypes.AWSResourceManager
// This is for AWS-SDK-GO-V2 - Created newResourceManager With AWS sdk-Go-ClientV2
func newResourceManager(
cfg ackcfg.Config,
clientcfg aws.Config,
log logr.Logger,
metrics *ackmetrics.Metrics,
rr acktypes.Reconciler,
id ackv1alpha1.AWSAccountID,
region ackv1alpha1.AWSRegion,
) (*resourceManager, error) {
return &resourceManager{
cfg: cfg,
clientcfg: clientcfg,
log: log,
metrics: metrics,
rr: rr,
awsAccountID: id,
awsRegion: region,
sdkapi: svcsdk.NewFromConfig(clientcfg),
}, nil
}
// onError updates resource conditions and returns updated resource
// it returns nil if no condition is updated.
func (rm *resourceManager) onError(
r *resource,
err error,
) (acktypes.AWSResource, error) {
if r == nil {
return nil, err
}
r1, updated := rm.updateConditions(r, false, err)
if !updated {
return r, err
}
for _, condition := range r1.Conditions() {
if condition.Type == ackv1alpha1.ConditionTypeTerminal &&
condition.Status == corev1.ConditionTrue {
// resource is in Terminal condition
// return Terminal error
return r1, ackerr.Terminal
}
}
return r1, err
}
// onSuccess updates resource conditions and returns updated resource
// it returns the supplied resource if no condition is updated.
func (rm *resourceManager) onSuccess(
r *resource,
) (acktypes.AWSResource, error) {
if r == nil {
return nil, nil
}
r1, updated := rm.updateConditions(r, true, nil)
if !updated {
return r, nil
}
return r1, nil
}