Skip to content

Commit 30e8852

Browse files
committed
add continuous backup support (#1790)
1 parent 148510a commit 30e8852

File tree

3 files changed

+184
-28
lines changed

3 files changed

+184
-28
lines changed

Diff for: generator.yaml

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ resources:
2323
from:
2424
operation: UpdateTimeToLive
2525
path: TimeToLiveSpecification
26+
ContinuousBackups:
27+
is_required: false
28+
from:
29+
operation: UpdateContinuousBackups
30+
path: PointInTimeRecoverySpecification
2631
AttributeDefinitions:
2732
compare:
2833
is_ignored: true
@@ -134,4 +139,4 @@ resources:
134139
type: string
135140
- name: STATUS
136141
json_path: .status.backupStatus
137-
type: string
142+
type: string

Diff for: pkg/resource/table/hooks.go

+95-27
Original file line numberDiff line numberDiff line change
@@ -32,23 +32,36 @@ import (
3232
)
3333

3434
var (
35-
ErrTableDeleting = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusDeleting)
36-
ErrTableCreating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusCreating)
37-
ErrTableUpdating = fmt.Errorf("Table in '%v' state, cannot be modified or deleted", svcsdk.TableStatusUpdating)
38-
ErrTableGSIsUpdating = fmt.Errorf("Table GSIs in '%v' state, cannot be modified or deleted", svcsdk.IndexStatusCreating)
35+
ErrTableDeleting = fmt.Errorf(
36+
"Table in '%v' state, cannot be modified or deleted",
37+
svcsdk.TableStatusDeleting,
38+
)
39+
ErrTableCreating = fmt.Errorf(
40+
"Table in '%v' state, cannot be modified or deleted",
41+
svcsdk.TableStatusCreating,
42+
)
43+
ErrTableUpdating = fmt.Errorf(
44+
"Table in '%v' state, cannot be modified or deleted",
45+
svcsdk.TableStatusUpdating,
46+
)
47+
ErrTableGSIsUpdating = fmt.Errorf(
48+
"Table GSIs in '%v' state, cannot be modified or deleted",
49+
svcsdk.IndexStatusCreating,
50+
)
3951
)
4052

53+
// TerminalStatuses are the status strings that are terminal states for a
54+
// DynamoDB table
55+
var TerminalStatuses = []v1alpha1.TableStatus_SDK{
56+
v1alpha1.TableStatus_SDK_ARCHIVING,
57+
v1alpha1.TableStatus_SDK_DELETING,
58+
}
59+
4160
var (
42-
// TerminalStatuses are the status strings that are terminal states for a
43-
// DynamoDB table
44-
TerminalStatuses = []v1alpha1.TableStatus_SDK{
45-
v1alpha1.TableStatus_SDK_ARCHIVING,
46-
v1alpha1.TableStatus_SDK_DELETING,
47-
}
61+
DefaultTTLEnabledValue = false
62+
DefaultPITREnabledValue = false
4863
)
4964

50-
var DefaultTTLEnabledValue = false
51-
5265
var (
5366
requeueWaitWhileDeleting = ackrequeue.NeededAfter(
5467
ErrTableDeleting,
@@ -124,7 +137,10 @@ func (rm *resourceManager) customUpdateTable(
124137
defer func(err error) { exit(err) }(err)
125138

126139
if immutableFieldChanges := rm.getImmutableFieldChanges(delta); len(immutableFieldChanges) > 0 {
127-
msg := fmt.Sprintf("Immutable Spec fields have been modified: %s", strings.Join(immutableFieldChanges, ","))
140+
msg := fmt.Sprintf(
141+
"Immutable Spec fields have been modified: %s",
142+
strings.Join(immutableFieldChanges, ","),
143+
)
128144
return nil, ackerr.NewTerminalError(fmt.Errorf(msg))
129145
}
130146

@@ -187,6 +203,14 @@ func (rm *resourceManager) customUpdateTable(
187203
}
188204
}
189205

206+
if delta.DifferentAt("Spec.ContinuousBackups") ||
207+
delta.DifferentAt("Spec.ContinuousBackups.PointInTimeRecoveryEnabled") {
208+
err = rm.syncContinuousBackup(ctx, desired)
209+
if err != nil {
210+
return nil, fmt.Errorf("cannot update table %v", err)
211+
}
212+
}
213+
190214
// We want to update fast fields first
191215
// Then attributes
192216
// then GSI
@@ -202,7 +226,8 @@ func (rm *resourceManager) customUpdateTable(
202226
}
203227
case delta.DifferentAt("Spec.GlobalSecondaryIndexes") && delta.DifferentAt("Spec.AttributeDefinitions"):
204228
if err := rm.syncTableGlobalSecondaryIndexes(ctx, latest, desired); err != nil {
205-
if awsErr, ok := ackerr.AWSError(err); ok && awsErr.Code() == "LimitExceededException" {
229+
if awsErr, ok := ackerr.AWSError(err); ok &&
230+
awsErr.Code() == "LimitExceededException" {
206231
return nil, requeueWaitGSIReady
207232
}
208233
return nil, err
@@ -257,13 +282,17 @@ func (rm *resourceManager) newUpdateTablePayload(
257282
input.ProvisionedThroughput = &svcsdk.ProvisionedThroughput{}
258283
if r.ko.Spec.ProvisionedThroughput != nil {
259284
if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
260-
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits)
285+
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(
286+
*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
287+
)
261288
} else {
262289
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0)
263290
}
264291

265292
if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
266-
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits)
293+
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(
294+
*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
295+
)
267296
} else {
268297
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0)
269298
}
@@ -277,8 +306,11 @@ func (rm *resourceManager) newUpdateTablePayload(
277306
StreamEnabled: aws.Bool(*r.ko.Spec.StreamSpecification.StreamEnabled),
278307
}
279308
// Only set streamViewType when streamSpefication is enabled and streamViewType is non-nil.
280-
if *r.ko.Spec.StreamSpecification.StreamEnabled && r.ko.Spec.StreamSpecification.StreamViewType != nil {
281-
input.StreamSpecification.StreamViewType = aws.String(*r.ko.Spec.StreamSpecification.StreamViewType)
309+
if *r.ko.Spec.StreamSpecification.StreamEnabled &&
310+
r.ko.Spec.StreamSpecification.StreamViewType != nil {
311+
input.StreamSpecification.StreamViewType = aws.String(
312+
*r.ko.Spec.StreamSpecification.StreamViewType,
313+
)
282314
}
283315
} else {
284316
input.StreamSpecification = &svcsdk.StreamSpecification{
@@ -317,7 +349,9 @@ func (rm *resourceManager) syncTableSSESpecification(
317349
input.SSESpecification.SSEType = aws.String(*r.ko.Spec.SSESpecification.SSEType)
318350
}
319351
if r.ko.Spec.SSESpecification.KMSMasterKeyID != nil {
320-
input.SSESpecification.KMSMasterKeyId = aws.String(*r.ko.Spec.SSESpecification.KMSMasterKeyID)
352+
input.SSESpecification.KMSMasterKeyId = aws.String(
353+
*r.ko.Spec.SSESpecification.KMSMasterKeyID,
354+
)
321355
}
322356
}
323357
} else {
@@ -350,13 +384,17 @@ func (rm *resourceManager) syncTableProvisionedThroughput(
350384
}
351385
if r.ko.Spec.ProvisionedThroughput != nil {
352386
if r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits != nil {
353-
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits)
387+
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(
388+
*r.ko.Spec.ProvisionedThroughput.ReadCapacityUnits,
389+
)
354390
} else {
355391
input.ProvisionedThroughput.ReadCapacityUnits = aws.Int64(0)
356392
}
357393

358394
if r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits != nil {
359-
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits)
395+
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(
396+
*r.ko.Spec.ProvisionedThroughput.WriteCapacityUnits,
397+
)
360398
} else {
361399
input.ProvisionedThroughput.WriteCapacityUnits = aws.Int64(0)
362400
}
@@ -395,6 +433,12 @@ func (rm *resourceManager) setResourceAdditionalFields(
395433
ko.Spec.TimeToLive = ttlSpec
396434
}
397435

436+
if pitrSpec, err := rm.getResourcePointInTimeRecoveryWithContext(ctx, ko.Spec.TableName); err != nil {
437+
return err
438+
} else {
439+
ko.Spec.ContinuousBackups = pitrSpec
440+
}
441+
398442
return nil
399443
}
400444

@@ -403,11 +447,14 @@ func customPreCompare(
403447
a *resource,
404448
b *resource,
405449
) {
406-
407450
if ackcompare.HasNilDifference(a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification) {
408451
if a.ko.Spec.SSESpecification != nil && b.ko.Spec.SSESpecification == nil {
409452
if *a.ko.Spec.SSESpecification.Enabled {
410-
delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification)
453+
delta.Add(
454+
"Spec.SSESpecification",
455+
a.ko.Spec.SSESpecification,
456+
b.ko.Spec.SSESpecification,
457+
)
411458
}
412459
} else {
413460
delta.Add("Spec.SSESpecification", a.ko.Spec.SSESpecification, b.ko.Spec.SSESpecification)
@@ -447,23 +494,35 @@ func customPreCompare(
447494
}
448495

449496
if len(a.ko.Spec.AttributeDefinitions) != len(b.ko.Spec.AttributeDefinitions) {
450-
delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions)
497+
delta.Add(
498+
"Spec.AttributeDefinitions",
499+
a.ko.Spec.AttributeDefinitions,
500+
b.ko.Spec.AttributeDefinitions,
501+
)
451502
} else if a.ko.Spec.AttributeDefinitions != nil && b.ko.Spec.AttributeDefinitions != nil {
452503
if !equalAttributeDefinitions(a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions) {
453504
delta.Add("Spec.AttributeDefinitions", a.ko.Spec.AttributeDefinitions, b.ko.Spec.AttributeDefinitions)
454505
}
455506
}
456507

457508
if len(a.ko.Spec.GlobalSecondaryIndexes) != len(b.ko.Spec.GlobalSecondaryIndexes) {
458-
delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes)
509+
delta.Add(
510+
"Spec.GlobalSecondaryIndexes",
511+
a.ko.Spec.GlobalSecondaryIndexes,
512+
b.ko.Spec.GlobalSecondaryIndexes,
513+
)
459514
} else if a.ko.Spec.GlobalSecondaryIndexes != nil && b.ko.Spec.GlobalSecondaryIndexes != nil {
460515
if !equalGlobalSecondaryIndexesArrays(a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes) {
461516
delta.Add("Spec.GlobalSecondaryIndexes", a.ko.Spec.GlobalSecondaryIndexes, b.ko.Spec.GlobalSecondaryIndexes)
462517
}
463518
}
464519

465520
if len(a.ko.Spec.LocalSecondaryIndexes) != len(b.ko.Spec.LocalSecondaryIndexes) {
466-
delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes)
521+
delta.Add(
522+
"Spec.LocalSecondaryIndexes",
523+
a.ko.Spec.LocalSecondaryIndexes,
524+
b.ko.Spec.LocalSecondaryIndexes,
525+
)
467526
} else if a.ko.Spec.LocalSecondaryIndexes != nil && b.ko.Spec.LocalSecondaryIndexes != nil {
468527
if !equalLocalSecondaryIndexesArrays(a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes) {
469528
delta.Add("Spec.LocalSecondaryIndexes", a.ko.Spec.LocalSecondaryIndexes, b.ko.Spec.LocalSecondaryIndexes)
@@ -496,6 +555,12 @@ func customPreCompare(
496555
Enabled: &DefaultTTLEnabledValue,
497556
}
498557
}
558+
if a.ko.Spec.ContinuousBackups == nil && b.ko.Spec.ContinuousBackups != nil &&
559+
b.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil {
560+
a.ko.Spec.ContinuousBackups = &v1alpha1.PointInTimeRecoverySpecification{
561+
PointInTimeRecoveryEnabled: &DefaultPITREnabledValue,
562+
}
563+
}
499564
}
500565

501566
// equalAttributeDefinitions return whether two AttributeDefinition arrays are equal or not.
@@ -614,7 +679,10 @@ func equalLocalSecondaryIndexes(
614679
if !equalStrings(a.Projection.ProjectionType, b.Projection.ProjectionType) {
615680
return false
616681
}
617-
if !ackcompare.SliceStringPEqual(a.Projection.NonKeyAttributes, b.Projection.NonKeyAttributes) {
682+
if !ackcompare.SliceStringPEqual(
683+
a.Projection.NonKeyAttributes,
684+
b.Projection.NonKeyAttributes,
685+
) {
618686
return false
619687
}
620688
}

Diff for: pkg/resource/table/hooks_continuous_backup.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"). You may
4+
// not use this file except in compliance with the License. A copy of the
5+
// License is located at
6+
//
7+
// http://aws.amazon.com/apache2.0/
8+
//
9+
// or in the "license" file accompanying this file. This file is distributed
10+
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
// express or implied. See the License for the specific language governing
12+
// permissions and limitations under the License.
13+
14+
package table
15+
16+
import (
17+
"context"
18+
19+
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
20+
svcsdk "github.com/aws/aws-sdk-go/service/dynamodb"
21+
22+
"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
23+
)
24+
25+
// syncContinuousBackup syncs the PointInTimeRecoverySpecification of the dynamodb table.
26+
func (rm *resourceManager) syncContinuousBackup(
27+
ctx context.Context,
28+
desired *resource,
29+
) (err error) {
30+
rlog := ackrtlog.FromContext(ctx)
31+
exit := rlog.Trace("rm.syncContinuousBackup")
32+
defer func(err error) { exit(err) }(err)
33+
34+
pitrSpec := &svcsdk.PointInTimeRecoverySpecification{}
35+
if desired.ko.Spec.ContinuousBackups != nil &&
36+
desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled != nil {
37+
pitrSpec.SetPointInTimeRecoveryEnabled(
38+
*desired.ko.Spec.ContinuousBackups.PointInTimeRecoveryEnabled,
39+
)
40+
}
41+
42+
_, err = rm.sdkapi.UpdateContinuousBackupsWithContext(
43+
ctx,
44+
&svcsdk.UpdateContinuousBackupsInput{
45+
TableName: desired.ko.Spec.TableName,
46+
PointInTimeRecoverySpecification: pitrSpec,
47+
},
48+
)
49+
rm.metrics.RecordAPICall("UPDATE", "UpdateContinuousBackups", err)
50+
return err
51+
}
52+
53+
// getResourcePointInTimeRecoveryWithContext gets the PointInTimeRecoverySpecification of the dynamodb table.
54+
func (rm *resourceManager) getResourcePointInTimeRecoveryWithContext(
55+
ctx context.Context,
56+
tableName *string,
57+
) (*v1alpha1.PointInTimeRecoverySpecification, error) {
58+
var err error
59+
rlog := ackrtlog.FromContext(ctx)
60+
exit := rlog.Trace("rm.getResourcePointInTimeRecoveryWithContext")
61+
defer func(err error) { exit(err) }(err)
62+
63+
res, err := rm.sdkapi.DescribeContinuousBackupsWithContext(
64+
ctx,
65+
&svcsdk.DescribeContinuousBackupsInput{
66+
TableName: tableName,
67+
},
68+
)
69+
70+
rm.metrics.RecordAPICall("GET", "DescribeContinuousBackups", err)
71+
if err != nil {
72+
return nil, err
73+
}
74+
75+
isEnabled := false
76+
if res.ContinuousBackupsDescription != nil {
77+
isEnabled = *res.ContinuousBackupsDescription.PointInTimeRecoveryDescription.PointInTimeRecoveryStatus == svcsdk.PointInTimeRecoveryStatusEnabled
78+
}
79+
80+
return &v1alpha1.PointInTimeRecoverySpecification{
81+
PointInTimeRecoveryEnabled: &isEnabled,
82+
}, nil
83+
}

0 commit comments

Comments
 (0)