diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 13f2367..a456140 100755 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,13 +1,13 @@ ack_generate_info: - build_date: "2025-02-20T18:05:49Z" - build_hash: a326346bd3a6973254d247c9ab2dc76790c36241 + build_date: "2025-03-24T15:20:40Z" + build_hash: 3722729cebe6d3c03c7e442655ef0846f91566a2 go_version: go1.24.0 - version: v0.43.2 -api_directory_checksum: 3bc0637159c94a74d2402d9a707f9c21339e9b45 + version: v0.43.2-7-g3722729 +api_directory_checksum: cb49386ebd7bb50e2521072a76262c72b9dbd285 api_version: v1alpha1 aws_sdk_go_version: v1.32.6 generator_config_info: - file_checksum: f6d68afa724d9e1d8fb6ce58da11ed0e5635f9d5 + file_checksum: 4533fa8aca3b134b5895ad6ce9a093c3446d99da original_file_name: generator.yaml last_modification: reason: API generation diff --git a/apis/v1alpha1/generator.yaml b/apis/v1alpha1/generator.yaml index 22f91e8..4ed38b6 100644 --- a/apis/v1alpha1/generator.yaml +++ b/apis/v1alpha1/generator.yaml @@ -25,6 +25,11 @@ operations: resources: Table: fields: + TableReplicas: + custom_field: + list_of: CreateReplicationGroupMemberAction + compare: + is_ignored: true GlobalSecondaryIndexesDescriptions: custom_field: list_of: GlobalSecondaryIndexDescription @@ -71,6 +76,7 @@ resources: code: ResourceNotFoundException terminal_codes: - InvalidParameter + - ValidationException update_operation: custom_method_name: customUpdateTable hooks: diff --git a/apis/v1alpha1/table.go b/apis/v1alpha1/table.go index f2cc8b1..4865df0 100644 --- a/apis/v1alpha1/table.go +++ b/apis/v1alpha1/table.go @@ -163,7 +163,8 @@ type TableSpec struct { // The name of the table to create. You can also provide the Amazon Resource // Name (ARN) of the table in this parameter. // +kubebuilder:validation:Required - TableName *string `json:"tableName"` + TableName *string `json:"tableName"` + TableReplicas []*CreateReplicationGroupMemberAction `json:"tableReplicas,omitempty"` // A list of key-value pairs to label the table. For more information, see Tagging // for DynamoDB (https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Tagging.html). Tags []*Tag `json:"tags,omitempty"` diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 0d584eb..46bd371 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -2759,6 +2759,17 @@ func (in *TableSpec) DeepCopyInto(out *TableSpec) { *out = new(string) **out = **in } + if in.TableReplicas != nil { + in, out := &in.TableReplicas, &out.TableReplicas + *out = make([]*CreateReplicationGroupMemberAction, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(CreateReplicationGroupMemberAction) + (*in).DeepCopyInto(*out) + } + } + } if in.Tags != nil { in, out := &in.Tags, &out.Tags *out = make([]*Tag, len(*in)) diff --git a/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml b/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml index ee3e5a4..196263e 100644 --- a/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml +++ b/config/crd/bases/dynamodb.services.k8s.aws_tables.yaml @@ -376,6 +376,45 @@ spec: The name of the table to create. You can also provide the Amazon Resource Name (ARN) of the table in this parameter. type: string + tableReplicas: + items: + description: Represents a replica to be created. + properties: + globalSecondaryIndexes: + items: + description: Represents the properties of a replica global + secondary index. + properties: + indexName: + type: string + provisionedThroughputOverride: + description: |- + Replica-specific provisioned throughput settings. If not specified, uses + the source table's provisioned throughput settings. + properties: + readCapacityUnits: + format: int64 + type: integer + type: object + type: object + type: array + kmsMasterKeyID: + type: string + provisionedThroughputOverride: + description: |- + Replica-specific provisioned throughput settings. If not specified, uses + the source table's provisioned throughput settings. + properties: + readCapacityUnits: + format: int64 + type: integer + type: object + regionName: + type: string + tableClassOverride: + type: string + type: object + type: array tags: description: |- A list of key-value pairs to label the table. For more information, see Tagging diff --git a/generator.yaml b/generator.yaml index 22f91e8..4ed38b6 100644 --- a/generator.yaml +++ b/generator.yaml @@ -25,6 +25,11 @@ operations: resources: Table: fields: + TableReplicas: + custom_field: + list_of: CreateReplicationGroupMemberAction + compare: + is_ignored: true GlobalSecondaryIndexesDescriptions: custom_field: list_of: GlobalSecondaryIndexDescription @@ -71,6 +76,7 @@ resources: code: ResourceNotFoundException terminal_codes: - InvalidParameter + - ValidationException update_operation: custom_method_name: customUpdateTable hooks: diff --git a/helm/crds/dynamodb.services.k8s.aws_tables.yaml b/helm/crds/dynamodb.services.k8s.aws_tables.yaml index 36dfe26..feaa235 100644 --- a/helm/crds/dynamodb.services.k8s.aws_tables.yaml +++ b/helm/crds/dynamodb.services.k8s.aws_tables.yaml @@ -380,6 +380,45 @@ spec: The name of the table to create. You can also provide the Amazon Resource Name (ARN) of the table in this parameter. type: string + tableReplicas: + items: + description: Represents a replica to be created. + properties: + globalSecondaryIndexes: + items: + description: Represents the properties of a replica global + secondary index. + properties: + indexName: + type: string + provisionedThroughputOverride: + description: |- + Replica-specific provisioned throughput settings. If not specified, uses + the source table's provisioned throughput settings. + properties: + readCapacityUnits: + format: int64 + type: integer + type: object + type: object + type: array + kmsMasterKeyID: + type: string + provisionedThroughputOverride: + description: |- + Replica-specific provisioned throughput settings. If not specified, uses + the source table's provisioned throughput settings. + properties: + readCapacityUnits: + format: int64 + type: integer + type: object + regionName: + type: string + tableClassOverride: + type: string + type: object + type: array tags: description: |- A list of key-value pairs to label the table. For more information, see Tagging diff --git a/pkg/resource/table/hooks.go b/pkg/resource/table/hooks.go index 22a2577..16d51eb 100644 --- a/pkg/resource/table/hooks.go +++ b/pkg/resource/table/hooks.go @@ -15,6 +15,7 @@ package table import ( "context" + "errors" "fmt" "strings" "time" @@ -34,21 +35,25 @@ import ( var ( ErrTableDeleting = fmt.Errorf( - "Table in '%v' state, cannot be modified or deleted", + "table in '%v' state, cannot be modified or deleted", svcsdktypes.TableStatusDeleting, ) ErrTableCreating = fmt.Errorf( - "Table in '%v' state, cannot be modified or deleted", + "table in '%v' state, cannot be modified or deleted", svcsdktypes.TableStatusCreating, ) ErrTableUpdating = fmt.Errorf( - "Table in '%v' state, cannot be modified or deleted", + "table in '%v' state, cannot be modified or deleted", svcsdktypes.TableStatusUpdating, ) ErrTableGSIsUpdating = fmt.Errorf( - "Table GSIs in '%v' state, cannot be modified or deleted", + "table GSIs in '%v' state, cannot be modified or deleted", svcsdktypes.IndexStatusCreating, ) + ErrTableReplicasUpdating = fmt.Errorf( + "table replica in '%v' state, cannot be modified or deleted", + svcsdktypes.ReplicaStatusUpdating, + ) ) // TerminalStatuses are the status strings that are terminal states for a @@ -74,12 +79,16 @@ var ( ) requeueWaitWhileUpdating = ackrequeue.NeededAfter( ErrTableUpdating, - 5*time.Second, + 10*time.Second, ) requeueWaitGSIReady = ackrequeue.NeededAfter( ErrTableGSIsUpdating, 10*time.Second, ) + requeueWaitReplicasActive = ackrequeue.NeededAfter( + ErrTableReplicasUpdating, + 10*time.Second, + ) ) // tableHasTerminalStatus returns whether the supplied Dynamodb table is in a @@ -224,6 +233,17 @@ func (rm *resourceManager) customUpdateTable( } return nil, err } + case delta.DifferentAt("Spec.TableReplicas"): + // Enabling replicas required streams enabled and StreamViewType to be NEW_AND_OLD_IMAGES + // Version 2019.11.21 TableUpdate API requirement + if !hasStreamSpecificationWithNewAndOldImages(desired) { + msg := "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES for replica updates" + rlog.Debug(msg) + return nil, ackerr.NewTerminalError(errors.New(msg)) + } + if err := rm.syncReplicas(ctx, latest, desired); err != nil { + return nil, err + } } } @@ -558,6 +578,15 @@ func customPreCompare( } } + // Handle ReplicaUpdates API comparison + if len(a.ko.Spec.TableReplicas) != len(b.ko.Spec.TableReplicas) { + delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) + } else if a.ko.Spec.TableReplicas != nil && b.ko.Spec.TableReplicas != nil { + if !equalReplicaArrays(a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) { + delta.Add("Spec.TableReplicas", a.ko.Spec.TableReplicas, b.ko.Spec.TableReplicas) + } + } + if a.ko.Spec.DeletionProtectionEnabled == nil { a.ko.Spec.DeletionProtectionEnabled = aws.Bool(false) } diff --git a/pkg/resource/table/hooks_replica_updates.go b/pkg/resource/table/hooks_replica_updates.go new file mode 100644 index 0000000..f6cc180 --- /dev/null +++ b/pkg/resource/table/hooks_replica_updates.go @@ -0,0 +1,476 @@ +// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// http://aws.amazon.com/apache2.0/ +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + +package table + +import ( + "context" + + ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb" + svcsdktypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + + "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1" + svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1" +) + +// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects +func equalCreateReplicationGroupMemberActions(a, b *v1alpha1.CreateReplicationGroupMemberAction) bool { + if !equalStrings(a.RegionName, b.RegionName) { + return false + } + if !equalStrings(a.KMSMasterKeyID, b.KMSMasterKeyID) { + return false + } + if !equalStrings(a.TableClassOverride, b.TableClassOverride) { + return false + } + if a.ProvisionedThroughputOverride != nil && b.ProvisionedThroughputOverride != nil { + if !equalInt64s(a.ProvisionedThroughputOverride.ReadCapacityUnits, b.ProvisionedThroughputOverride.ReadCapacityUnits) { + return false + } + } else if (a.ProvisionedThroughputOverride == nil) != (b.ProvisionedThroughputOverride == nil) { + return false + } + + return equalReplicaGlobalSecondaryIndexArrays(a.GlobalSecondaryIndexes, b.GlobalSecondaryIndexes) +} + +// equalReplicaGlobalSecondaryIndexes compares two ReplicaGlobalSecondaryIndex objects +func equalReplicaGlobalSecondaryIndexes( + a *v1alpha1.ReplicaGlobalSecondaryIndex, + b *v1alpha1.ReplicaGlobalSecondaryIndex, +) bool { + if !equalStrings(a.IndexName, b.IndexName) { + return false + } + + if a.ProvisionedThroughputOverride != nil && b.ProvisionedThroughputOverride != nil { + if !equalInt64s(a.ProvisionedThroughputOverride.ReadCapacityUnits, b.ProvisionedThroughputOverride.ReadCapacityUnits) { + return false + } + } else if (a.ProvisionedThroughputOverride == nil) != (b.ProvisionedThroughputOverride == nil) { + return false + } + + return true +} + +// equalReplicaGlobalSecondaryIndexArrays compares two arrays of ReplicaGlobalSecondaryIndex objects +func equalReplicaGlobalSecondaryIndexArrays( + a []*v1alpha1.ReplicaGlobalSecondaryIndex, + b []*v1alpha1.ReplicaGlobalSecondaryIndex, +) bool { + if len(a) != len(b) { + return false + } + + aGSIMap := make(map[string]*v1alpha1.ReplicaGlobalSecondaryIndex) + bGSIMap := make(map[string]*v1alpha1.ReplicaGlobalSecondaryIndex) + + for _, gsi := range a { + if gsi.IndexName != nil { + aGSIMap[*gsi.IndexName] = gsi + } + } + + for _, gsi := range b { + if gsi.IndexName != nil { + bGSIMap[*gsi.IndexName] = gsi + } + } + + for indexName, aGSI := range aGSIMap { + bGSI, exists := bGSIMap[indexName] + if !exists { + return false + } + + if !equalReplicaGlobalSecondaryIndexes(aGSI, bGSI) { + return false + } + } + + return true +} + +// equalReplicaArrays returns whether two CreateReplicationGroupMemberAction arrays are equal or not. +func equalReplicaArrays(a, b []*v1alpha1.CreateReplicationGroupMemberAction) bool { + if len(a) != len(b) { + return false + } + + aMap := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + bMap := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + + for _, replica := range a { + if replica.RegionName != nil { + aMap[*replica.RegionName] = replica + } + } + + for _, replica := range b { + if replica.RegionName != nil { + bMap[*replica.RegionName] = replica + } + } + + for regionName, aReplica := range aMap { + bReplica, exists := bMap[regionName] + if !exists { + return false + } + + if !equalCreateReplicationGroupMemberActions(aReplica, bReplica) { + return false + } + } + + for regionName := range bMap { + if _, exists := aMap[regionName]; !exists { + return false + } + } + + return true +} + +// createReplicaUpdate creates a ReplicationGroupUpdate for creating a new replica +func createReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) svcsdktypes.ReplicationGroupUpdate { + replicaUpdate := svcsdktypes.ReplicationGroupUpdate{} + createAction := &svcsdktypes.CreateReplicationGroupMemberAction{} + + if replica.RegionName != nil { + createAction.RegionName = aws.String(*replica.RegionName) + } + + if replica.KMSMasterKeyID != nil { + createAction.KMSMasterKeyId = aws.String(*replica.KMSMasterKeyID) + } + + if replica.TableClassOverride != nil { + createAction.TableClassOverride = svcsdktypes.TableClass(*replica.TableClassOverride) + } + + if replica.ProvisionedThroughputOverride != nil { + createAction.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{} + if replica.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + createAction.ProvisionedThroughputOverride.ReadCapacityUnits = replica.ProvisionedThroughputOverride.ReadCapacityUnits + } + } + + if replica.GlobalSecondaryIndexes != nil { + gsiList := []svcsdktypes.ReplicaGlobalSecondaryIndex{} + for _, gsi := range replica.GlobalSecondaryIndexes { + replicaGSI := svcsdktypes.ReplicaGlobalSecondaryIndex{} + if gsi.IndexName != nil { + replicaGSI.IndexName = gsi.IndexName + } + if gsi.ProvisionedThroughputOverride != nil { + replicaGSI.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{} + if gsi.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + replicaGSI.ProvisionedThroughputOverride.ReadCapacityUnits = gsi.ProvisionedThroughputOverride.ReadCapacityUnits + } + } + gsiList = append(gsiList, replicaGSI) + } + createAction.GlobalSecondaryIndexes = gsiList + } + + replicaUpdate.Create = createAction + return replicaUpdate +} + +// updateReplicaUpdate creates a ReplicationGroupUpdate for updating an existing replica +func updateReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) svcsdktypes.ReplicationGroupUpdate { + replicaUpdate := svcsdktypes.ReplicationGroupUpdate{} + updateAction := &svcsdktypes.UpdateReplicationGroupMemberAction{} + + if replica.RegionName != nil { + updateAction.RegionName = aws.String(*replica.RegionName) + // RegionName is required but doesn't count as a update + } + + if replica.KMSMasterKeyID != nil { + updateAction.KMSMasterKeyId = aws.String(*replica.KMSMasterKeyID) + } + + if replica.TableClassOverride != nil { + updateAction.TableClassOverride = svcsdktypes.TableClass(*replica.TableClassOverride) + } + + if replica.ProvisionedThroughputOverride != nil && + replica.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + updateAction.ProvisionedThroughputOverride = &svcsdktypes.ProvisionedThroughputOverride{ + ReadCapacityUnits: replica.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + + // Only include GSIs that have provisioned throughput overrides + var gsisWithOverrides []svcsdktypes.ReplicaGlobalSecondaryIndex + for _, gsi := range replica.GlobalSecondaryIndexes { + if gsi.IndexName != nil && gsi.ProvisionedThroughputOverride != nil && + gsi.ProvisionedThroughputOverride.ReadCapacityUnits != nil { + gsisWithOverrides = append(gsisWithOverrides, svcsdktypes.ReplicaGlobalSecondaryIndex{ + IndexName: aws.String(*gsi.IndexName), + ProvisionedThroughputOverride: &svcsdktypes.ProvisionedThroughputOverride{ + ReadCapacityUnits: gsi.ProvisionedThroughputOverride.ReadCapacityUnits, + }, + }) + } + } + + if len(gsisWithOverrides) > 0 { + updateAction.GlobalSecondaryIndexes = gsisWithOverrides + } + + // Check if there are any actual updates to perform + // replica GSI updates are invalid updates since the GSI already exists on the source table + hasUpdates := updateAction.KMSMasterKeyId != nil || + updateAction.TableClassOverride != "" || + updateAction.ProvisionedThroughputOverride != nil || + len(updateAction.GlobalSecondaryIndexes) > 0 + + if hasUpdates { + replicaUpdate.Update = updateAction + return replicaUpdate + } + + // If no valid updates, return an empty ReplicationGroupUpdate + return svcsdktypes.ReplicationGroupUpdate{ + Update: nil, + } +} + +// deleteReplicaUpdate creates a ReplicationGroupUpdate for deleting an existing replica +func deleteReplicaUpdate(regionName string) svcsdktypes.ReplicationGroupUpdate { + return svcsdktypes.ReplicationGroupUpdate{ + Delete: &svcsdktypes.DeleteReplicationGroupMemberAction{ + RegionName: aws.String(regionName), + }, + } +} + +// hasStreamSpecificationWithNewAndOldImages checks if the table has DynamoDB Streams enabled +// with the stream containing both the new and the old images of the item. +func hasStreamSpecificationWithNewAndOldImages(r *resource) bool { + StreamEnabled := r.ko.Spec.StreamSpecification != nil && + r.ko.Spec.StreamSpecification.StreamEnabled != nil && + *r.ko.Spec.StreamSpecification.StreamEnabled + StreamViewType := r.ko.Spec.StreamSpecification != nil && + r.ko.Spec.StreamSpecification.StreamViewType != nil && + *r.ko.Spec.StreamSpecification.StreamViewType == "NEW_AND_OLD_IMAGES" + return StreamEnabled && StreamViewType +} + +// syncReplicas updates the replica configuration for a table +func (rm *resourceManager) syncReplicas( + ctx context.Context, + latest *resource, + desired *resource, +) (err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.syncReplicas") + defer func() { + exit(err) + }() + + input, replicasInQueue, err := rm.newUpdateTableReplicaUpdatesOneAtATimePayload(ctx, latest, desired) + if err != nil { + return err + } + + // Call the UpdateTable API + _, err = rm.sdkapi.UpdateTable(ctx, input) + rm.metrics.RecordAPICall("UPDATE", "UpdateTable", err) + if err != nil { + return err + } + + // If there are more replicas to process, requeue + if replicasInQueue > 0 { + rlog.Debug("more replica updates pending, will requeue", + "table", *latest.ko.Spec.TableName, + "remaining_updates", replicasInQueue) + return requeueWaitWhileUpdating + } + + return nil +} + +// newUpdateTableReplicaUpdatesOneAtATimePayload creates the UpdateTable input payload for replica updates, +// processing only one replica at a time +func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload( + ctx context.Context, + latest *resource, + desired *resource, +) (input *svcsdk.UpdateTableInput, replicasInQueue int, err error) { + rlog := ackrtlog.FromContext(ctx) + exit := rlog.Trace("rm.newUpdateTableReplicaUpdatesOneAtATimePayload") + defer func() { + exit(err) + }() + + createReplicas, updateReplicas, deleteRegions := computeReplicaupdatesDelta(latest, desired) + + input = &svcsdk.UpdateTableInput{ + TableName: aws.String(*desired.ko.Spec.TableName), + ReplicaUpdates: []svcsdktypes.ReplicationGroupUpdate{}, + } + + totalReplicasOperations := len(createReplicas) + len(updateReplicas) + len(deleteRegions) + replicasInQueue = totalReplicasOperations - 1 + + // Process replica updates in order: create, update, delete + // We'll only perform one replica action at a time + + if len(createReplicas) > 0 { + replica := *createReplicas[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, *replica.RegionName) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("creating replica in region", "table", *desired.ko.Spec.TableName, "region", *replica.RegionName) + input.ReplicaUpdates = append(input.ReplicaUpdates, createReplicaUpdate(createReplicas[0])) + return input, replicasInQueue, nil + } + + if len(updateReplicas) > 0 { + replica := *updateReplicas[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, *replica.RegionName) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("updating replica in region", "table", *desired.ko.Spec.TableName, "region", *replica.RegionName) + updateReplica := updateReplicaUpdate(updateReplicas[0]) + if updateReplica.Update == nil { + return nil, 0, requeueWaitReplicasActive + } + input.ReplicaUpdates = append(input.ReplicaUpdates, updateReplica) + return input, replicasInQueue, nil + } + + if len(deleteRegions) > 0 { + replica := deleteRegions[0] + if checkIfReplicasInProgress(latest.ko.Status.Replicas, replica) { + return nil, 0, requeueWaitReplicasActive + } + rlog.Debug("deleting replica in region", "table", *desired.ko.Spec.TableName, "region", replica) + input.ReplicaUpdates = append(input.ReplicaUpdates, deleteReplicaUpdate(deleteRegions[0])) + return input, replicasInQueue, nil + } + + return input, replicasInQueue, nil +} + +// computeReplicaupdatesDelta calculates the replica updates needed to reconcile the latest state with the desired state +// Returns three slices: replicas to create, replicas to update, and region names to delete +func computeReplicaupdatesDelta( + latest *resource, + desired *resource, +) ( + createReplicas []*v1alpha1.CreateReplicationGroupMemberAction, + updateReplicas []*v1alpha1.CreateReplicationGroupMemberAction, + deleteRegions []string, +) { + latestReplicas := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + if latest.ko.Spec.TableReplicas != nil { + for _, replica := range latest.ko.Spec.TableReplicas { + if replica.RegionName != nil { + latestReplicas[*replica.RegionName] = replica + } + } + } + + desiredReplicas := make(map[string]*v1alpha1.CreateReplicationGroupMemberAction) + if desired != nil && desired.ko.Spec.TableReplicas != nil { + for _, replica := range desired.ko.Spec.TableReplicas { + if replica.RegionName != nil { + desiredReplicas[*replica.RegionName] = replica + } + } + } + + // Calculate replicas to create or update + for desiredRegion, desiredReplica := range desiredReplicas { + existingReplica, exists := latestReplicas[desiredRegion] + if !exists { + createReplicas = append(createReplicas, desiredReplica) + } else if !equalCreateReplicationGroupMemberActions(existingReplica, desiredReplica) { + updateReplicas = append(updateReplicas, desiredReplica) + } + } + + // Calculate regions to delete + for regionName := range latestReplicas { + if _, exists := desiredReplicas[regionName]; !exists { + deleteRegions = append(deleteRegions, regionName) + } + } + + return createReplicas, updateReplicas, deleteRegions +} + +func setTableReplicas(ko *svcapitypes.Table, replicas []svcsdktypes.ReplicaDescription) { + if len(replicas) > 0 { + tableReplicas := []*v1alpha1.CreateReplicationGroupMemberAction{} + for _, replica := range replicas { + replicaElem := &v1alpha1.CreateReplicationGroupMemberAction{} + if replica.RegionName != nil { + replicaElem.RegionName = replica.RegionName + } + if replica.KMSMasterKeyId != nil { + replicaElem.KMSMasterKeyID = replica.KMSMasterKeyId + } + if replica.ProvisionedThroughputOverride != nil { + replicaElem.ProvisionedThroughputOverride = &v1alpha1.ProvisionedThroughputOverride{ + ReadCapacityUnits: replica.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + if replica.GlobalSecondaryIndexes != nil { + gsiList := []*v1alpha1.ReplicaGlobalSecondaryIndex{} + for _, gsi := range replica.GlobalSecondaryIndexes { + gsiElem := &v1alpha1.ReplicaGlobalSecondaryIndex{ + IndexName: gsi.IndexName, + } + if gsi.ProvisionedThroughputOverride != nil { + gsiElem.ProvisionedThroughputOverride = &v1alpha1.ProvisionedThroughputOverride{ + ReadCapacityUnits: gsi.ProvisionedThroughputOverride.ReadCapacityUnits, + } + } + gsiList = append(gsiList, gsiElem) + } + replicaElem.GlobalSecondaryIndexes = gsiList + } + if replica.ReplicaTableClassSummary != nil && replica.ReplicaTableClassSummary.TableClass != "" { + replicaElem.TableClassOverride = aws.String(string(replica.ReplicaTableClassSummary.TableClass)) + } + tableReplicas = append(tableReplicas, replicaElem) + } + ko.Spec.TableReplicas = tableReplicas + } else { + ko.Spec.TableReplicas = nil + } +} + +func checkIfReplicasInProgress(ReplicaDescription []*svcapitypes.ReplicaDescription, regionName string) bool { + for _, replica := range ReplicaDescription { + if *replica.RegionName == regionName { + replicaStatus := replica.ReplicaStatus + if *replicaStatus == string(svcsdktypes.ReplicaStatusCreating) || *replicaStatus == string(svcsdktypes.ReplicaStatusDeleting) || *replicaStatus == string(svcsdktypes.ReplicaStatusUpdating) { + return true + } + } + } + + return false +} diff --git a/pkg/resource/table/manager.go b/pkg/resource/table/manager.go index dda8240..04a3b87 100644 --- a/pkg/resource/table/manager.go +++ b/pkg/resource/table/manager.go @@ -299,9 +299,9 @@ func (rm *resourceManager) EnsureTags( defaultTags := ackrt.GetDefaultTags(&rm.cfg, r.ko, md) var existingTags []*svcapitypes.Tag existingTags = r.ko.Spec.Tags - resourceTags := ToACKTags(existingTags) + resourceTags, keyOrder := convertToOrderedACKTags(existingTags) tags := acktags.Merge(resourceTags, defaultTags) - r.ko.Spec.Tags = FromACKTags(tags) + r.ko.Spec.Tags = fromACKTags(tags, keyOrder) return nil } @@ -318,9 +318,9 @@ func (rm *resourceManager) FilterSystemTags(res acktypes.AWSResource) { } var existingTags []*svcapitypes.Tag existingTags = r.ko.Spec.Tags - resourceTags := ToACKTags(existingTags) + resourceTags, tagKeyOrder := convertToOrderedACKTags(existingTags) ignoreSystemTags(resourceTags) - r.ko.Spec.Tags = FromACKTags(resourceTags) + r.ko.Spec.Tags = fromACKTags(resourceTags, tagKeyOrder) } // mirrorAWSTags ensures that AWS tags are included in the desired resource @@ -342,10 +342,10 @@ func mirrorAWSTags(a *resource, b *resource) { var existingDesiredTags []*svcapitypes.Tag existingDesiredTags = a.ko.Spec.Tags existingLatestTags = b.ko.Spec.Tags - desiredTags := ToACKTags(existingDesiredTags) - latestTags := ToACKTags(existingLatestTags) + desiredTags, desiredTagKeyOrder := convertToOrderedACKTags(existingDesiredTags) + latestTags, _ := convertToOrderedACKTags(existingLatestTags) syncAWSTags(desiredTags, latestTags) - a.ko.Spec.Tags = FromACKTags(desiredTags) + a.ko.Spec.Tags = fromACKTags(desiredTags, desiredTagKeyOrder) } // newResourceManager returns a new struct implementing diff --git a/pkg/resource/table/sdk.go b/pkg/resource/table/sdk.go index ce93f8a..659bfae 100644 --- a/pkg/resource/table/sdk.go +++ b/pkg/resource/table/sdk.go @@ -440,6 +440,7 @@ func (rm *resourceManager) sdkFind( } else { ko.Spec.BillingMode = aws.String("PROVISIONED") } + setTableReplicas(ko, resp.Table.Replicas) if isTableCreating(&resource{ko}) { return &resource{ko}, requeueWaitWhileCreating } @@ -1016,6 +1017,23 @@ func (rm *resourceManager) sdkDelete( if isTableUpdating(r) { return nil, requeueWaitWhileUpdating } + + // If there are replicas, we need to remove them before deleting the table + if len(r.ko.Spec.TableReplicas) > 0 { + desired := &resource{ + ko: r.ko.DeepCopy(), + } + desired.ko.Spec.TableReplicas = nil + + err := rm.syncReplicas(ctx, r, desired) + if err != nil { + return nil, err + } + // Requeue to wait for replica removal to complete before attempting table deletion + // When syncReplicas returns an error other than requeue + return r, requeueWaitWhileDeleting + } + input, err := rm.newDeleteRequestPayload(r) if err != nil { return nil, err @@ -1149,7 +1167,8 @@ func (rm *resourceManager) terminalAWSError(err error) bool { return false } switch terminalErr.ErrorCode() { - case "InvalidParameter": + case "InvalidParameter", + "ValidationException": return true default: return false diff --git a/pkg/resource/table/tags.go b/pkg/resource/table/tags.go index fa49d2a..0b84ac7 100644 --- a/pkg/resource/table/tags.go +++ b/pkg/resource/table/tags.go @@ -30,39 +30,51 @@ var ( ACKSystemTags = []string{"services.k8s.aws/namespace", "services.k8s.aws/controller-version"} ) -// ToACKTags converts the tags parameter into 'acktags.Tags' shape. +// convertToOrderedACKTags converts the tags parameter into 'acktags.Tags' shape. // This method helps in creating the hub(acktags.Tags) for merging -// default controller tags with existing resource tags. -func ToACKTags(tags []*svcapitypes.Tag) acktags.Tags { +// default controller tags with existing resource tags. It also returns a slice +// of keys maintaining the original key Order when the tags are a list +func convertToOrderedACKTags(tags []*svcapitypes.Tag) (acktags.Tags, []string) { result := acktags.NewTags() - if tags == nil || len(tags) == 0 { - return result - } + keyOrder := []string{} + if len(tags) == 0 { + return result, keyOrder + } for _, t := range tags { if t.Key != nil { - if t.Value == nil { - result[*t.Key] = "" - } else { + keyOrder = append(keyOrder, *t.Key) + if t.Value != nil { result[*t.Key] = *t.Value + } else { + result[*t.Key] = "" } } } - return result + return result, keyOrder } -// FromACKTags converts the tags parameter into []*svcapitypes.Tag shape. +// fromACKTags converts the tags parameter into []*svcapitypes.Tag shape. // This method helps in setting the tags back inside AWSResource after merging -// default controller tags with existing resource tags. -func FromACKTags(tags acktags.Tags) []*svcapitypes.Tag { +// default controller tags with existing resource tags. When a list, +// it maintains the order from original +func fromACKTags(tags acktags.Tags, keyOrder []string) []*svcapitypes.Tag { result := []*svcapitypes.Tag{} + + for _, k := range keyOrder { + v, ok := tags[k] + if ok { + tag := svcapitypes.Tag{Key: &k, Value: &v} + result = append(result, &tag) + delete(tags, k) + } + } for k, v := range tags { - kCopy := k - vCopy := v - tag := svcapitypes.Tag{Key: &kCopy, Value: &vCopy} + tag := svcapitypes.Tag{Key: &k, Value: &v} result = append(result, &tag) } + return result } diff --git a/templates/hooks/table/sdk_delete_pre_build_request.go.tpl b/templates/hooks/table/sdk_delete_pre_build_request.go.tpl index fa720f3..476452f 100644 --- a/templates/hooks/table/sdk_delete_pre_build_request.go.tpl +++ b/templates/hooks/table/sdk_delete_pre_build_request.go.tpl @@ -3,4 +3,20 @@ } if isTableUpdating(r) { return nil, requeueWaitWhileUpdating - } \ No newline at end of file + } + + // If there are replicas, we need to remove them before deleting the table + if len(r.ko.Spec.TableReplicas) > 0 { + desired := &resource{ + ko: r.ko.DeepCopy(), + } + desired.ko.Spec.TableReplicas = nil + + err := rm.syncReplicas(ctx, r, desired) + if err != nil { + return nil, err + } + // Requeue to wait for replica removal to complete before attempting table deletion + // When syncReplicas returns an error other than requeue + return r, requeueWaitWhileDeleting + } diff --git a/templates/hooks/table/sdk_read_one_post_set_output.go.tpl b/templates/hooks/table/sdk_read_one_post_set_output.go.tpl index 4525c25..c8a174d 100644 --- a/templates/hooks/table/sdk_read_one_post_set_output.go.tpl +++ b/templates/hooks/table/sdk_read_one_post_set_output.go.tpl @@ -53,6 +53,7 @@ } else { ko.Spec.BillingMode = aws.String("PROVISIONED") } + setTableReplicas(r, resp.Table.Replicas) if isTableCreating(&resource{ko}) { return &resource{ko}, requeueWaitWhileCreating } diff --git a/test/e2e/resources/table_with_gsi_and_replicas.yaml b/test/e2e/resources/table_with_gsi_and_replicas.yaml new file mode 100644 index 0000000..bb8298c --- /dev/null +++ b/test/e2e/resources/table_with_gsi_and_replicas.yaml @@ -0,0 +1,41 @@ +apiVersion: dynamodb.services.k8s.aws/v1alpha1 +kind: Table +metadata: + name: $TABLE_NAME +spec: + tableName: $TABLE_NAME + tableClass: STANDARD + attributeDefinitions: + - attributeName: PK + attributeType: S + - attributeName: SK + attributeType: S + - attributeName: GSI1PK + attributeType: S + - attributeName: GSI1SK + attributeType: S + keySchema: + - attributeName: PK + keyType: HASH + - attributeName: SK + keyType: RANGE + billingMode: PAY_PER_REQUEST + streamSpecification: + streamEnabled: true + streamViewType: "NEW_AND_OLD_IMAGES" + tableReplicas: + - regionName: $REPLICA_REGION_1 + globalSecondaryIndexes: + - indexName: GSI1 + - regionName: $REPLICA_REGION_2 + globalSecondaryIndexes: + - indexName: GSI1 + globalSecondaryIndexes: + - indexName: GSI1 + keySchema: + - attributeName: GSI1PK + keyType: HASH + - attributeName: GSI1SK + keyType: RANGE + projection: + projectionType: ALL \ No newline at end of file diff --git a/test/e2e/resources/table_with_replicas.yaml b/test/e2e/resources/table_with_replicas.yaml new file mode 100644 index 0000000..dbca03f --- /dev/null +++ b/test/e2e/resources/table_with_replicas.yaml @@ -0,0 +1,24 @@ +apiVersion: dynamodb.services.k8s.aws/v1alpha1 +kind: Table +metadata: + name: $TABLE_NAME +spec: + tableName: $TABLE_NAME + tableClass: STANDARD + attributeDefinitions: + - attributeName: PK + attributeType: S + - attributeName: SK + attributeType: S + keySchema: + - attributeName: PK + keyType: HASH + - attributeName: SK + keyType: RANGE + billingMode: PAY_PER_REQUEST + streamSpecification: + streamEnabled: true + streamViewType: "NEW_AND_OLD_IMAGES" + tableReplicas: + - regionName: $REPLICA_REGION_1 + - regionName: $REPLICA_REGION_2 diff --git a/test/e2e/resources/table_with_replicas_invalid.yaml b/test/e2e/resources/table_with_replicas_invalid.yaml new file mode 100644 index 0000000..bd4d870 --- /dev/null +++ b/test/e2e/resources/table_with_replicas_invalid.yaml @@ -0,0 +1,20 @@ +apiVersion: dynamodb.services.k8s.aws/v1alpha1 +kind: Table +metadata: + name: $TABLE_NAME +spec: + tableName: $TABLE_NAME + tableClass: STANDARD + attributeDefinitions: + - attributeName: PK + attributeType: S + - attributeName: SK + attributeType: S + keySchema: + - attributeName: PK + keyType: HASH + - attributeName: SK + keyType: RANGE + billingMode: PAY_PER_REQUEST + tableReplicas: + - regionName: $REPLICA_REGION_1 diff --git a/test/e2e/table.py b/test/e2e/table.py index 5deb005..fff5b00 100644 --- a/test/e2e/table.py +++ b/test/e2e/table.py @@ -260,3 +260,61 @@ def get_point_in_time_recovery_enabled(table_name): return resp['ContinuousBackupsDescription']['PointInTimeRecoveryDescription']['PointInTimeRecoveryStatus'] == 'ENABLED' except c.exceptions.ResourceNotFoundException: return None + + +class ReplicaMatcher: + def __init__(self, expected_regions): + self.expected_regions = expected_regions + + def __call__(self, record: dict) -> bool: + if 'Replicas' not in record: + return False + + actual_regions = set() + for replica in record['Replicas']: + if 'RegionName' in replica: + actual_regions.add(replica['RegionName']) + + return set(self.expected_regions) == actual_regions + + +def replicas_match(expected_regions) -> TableMatchFunc: + return ReplicaMatcher(expected_regions) + + +class ReplicaStatusMatcher: + def __init__(self, region, status): + self.region = region + self.status = status + + def __call__(self, record: dict) -> bool: + if 'Replicas' not in record: + return False + + for replica in record['Replicas']: + if 'RegionName' in replica and replica['RegionName'] == self.region: + return 'ReplicaStatus' in replica and replica['ReplicaStatus'] == self.status + + return False + + +def replica_status_matches(region, status) -> TableMatchFunc: + return ReplicaStatusMatcher(region, status) + + +def get_replicas(table_name): + """Returns the replicas for a DynamoDB table + + Args: + table_name: the name of the DynamoDB table to get replicas for + Returns: + A list of replicas or None if the table doesn't exist + """ + dynamodb = boto3.client('dynamodb') + try: + response = dynamodb.describe_table(TableName=table_name) + if 'Table' in response and 'Replicas' in response['Table']: + return response['Table']['Replicas'] + return [] + except dynamodb.exceptions.ResourceNotFoundException: + return None diff --git a/test/e2e/tests/test_table_replicas.py b/test/e2e/tests/test_table_replicas.py new file mode 100644 index 0000000..76e0743 --- /dev/null +++ b/test/e2e/tests/test_table_replicas.py @@ -0,0 +1,447 @@ +# Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You may +# not use this file except in compliance with the License. A copy of the +# License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +"""Integration tests for the DynamoDB Table Replicas. +""" + +import logging +import time +from typing import Dict, Tuple + +import boto3 +import pytest +from acktest import tags +from acktest.k8s import resource as k8s +from acktest.resources import random_suffix_name +from e2e import (CRD_GROUP, CRD_VERSION, condition, + load_dynamodb_resource, service_marker, table) +from e2e.replacement_values import REPLACEMENT_VALUES +from acktest.k8s import condition + +RESOURCE_PLURAL = "tables" + +DELETE_WAIT_AFTER_SECONDS = 30 +MODIFY_WAIT_AFTER_SECONDS = 600 +REPLICA_WAIT_AFTER_SECONDS = 600 + +REPLICA_REGION_1 = "us-east-1" +REPLICA_REGION_2 = "eu-west-1" +REPLICA_REGION_3 = "eu-central-1" +REPLICA_REGION_4 = "ap-southeast-1" +REPLICA_REGION_5 = "eu-north-1" + + +def create_table_with_replicas(name: str, resource_template, regions=None): + if regions is None: + regions = [REPLICA_REGION_1, REPLICA_REGION_2] + + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = name + replacements["REPLICA_REGION_1"] = regions[0] + replacements["REPLICA_REGION_2"] = regions[1] + + resource_data = load_dynamodb_resource( + resource_template, + additional_replacements=replacements, + ) + logging.debug(resource_data) + + # Create the k8s resource + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + assert cr is not None + assert k8s.get_resource_exists(ref) + + return (ref, cr) + + +@pytest.fixture(scope="function") +def table_with_replicas(): + table_name = random_suffix_name("table-replicas", 32) + + (ref, res) = create_table_with_replicas( + table_name, + "table_with_replicas", + [REPLICA_REGION_1, REPLICA_REGION_2] + ) + + # Wait for table to be ACTIVE before proceeding + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Wait for initial replicas to be ACTIVE before yielding + table.wait_until( + table_name, + table.replicas_match([REPLICA_REGION_1, REPLICA_REGION_2]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + yield (ref, res) + + + deleted = k8s.delete_custom_resource(ref) + assert deleted + +def create_table_with_invalid_replicas(name: str): + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = name + replacements["REPLICA_REGION_1"] = REPLICA_REGION_1 + + resource_data = load_dynamodb_resource( + "table_with_replicas_invalid", + additional_replacements=replacements, + ) + logging.debug(resource_data) + + # Create the k8s resource + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + assert cr is not None + assert k8s.get_resource_exists(ref) + + return (ref, cr) + + +@pytest.fixture(scope="function") +def table_with_invalid_replicas(): + table_name = random_suffix_name("table-invalid-replicas", 32) + + (ref, res) = create_table_with_invalid_replicas(table_name) + + yield (ref, res) + + # Delete the k8s resource if it still exists + if k8s.get_resource_exists(ref): + k8s.delete_custom_resource(ref) + time.sleep(DELETE_WAIT_AFTER_SECONDS) + + +@pytest.fixture(scope="function") +def table_replicas_gsi(): + table_name = random_suffix_name("table-replicas-gsi", 32) + replacements = REPLACEMENT_VALUES.copy() + replacements["TABLE_NAME"] = table_name + replacements["REPLICA_REGION_1"] = REPLICA_REGION_1 + replacements["REPLICA_REGION_2"] = REPLICA_REGION_2 + + resource_data = load_dynamodb_resource( + "table_with_gsi_and_replicas", + additional_replacements=replacements, + ) + + ref = k8s.CustomResourceReference( + CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, + table_name, namespace="default", + ) + k8s.create_custom_resource(ref, resource_data) + cr = k8s.wait_resource_consumed_by_controller(ref) + + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + yield (ref, cr) + + deleted = k8s.delete_custom_resource(ref) + time.sleep(DELETE_WAIT_AFTER_SECONDS) + assert deleted + +@service_marker +@pytest.mark.canary +class TestTableReplicas: + def table_exists(self, table_name: str) -> bool: + return table.get(table_name) is not None + + def test_create_table_with_replicas(self, table_with_replicas): + (_, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + # Table should already be ACTIVE from fixture + assert table.get(table_name) is not None + + # Verify replicas exist and are ACTIVE + for region in [REPLICA_REGION_1, REPLICA_REGION_2]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + def test_add_replica(self, table_with_replicas): + (ref, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + assert table.get(table_name) is not None + table.wait_until( + table_name, + table.status_matches("ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Update replicas + cr = k8s.get_resource(ref) + cr["spec"]["tableReplicas"] = [ + {"regionName": REPLICA_REGION_3}, + {"regionName": REPLICA_REGION_4}, + {"regionName": REPLICA_REGION_5} + ] + k8s.patch_custom_resource(ref, cr) + table.wait_until( + table_name, + table.replicas_match( + [REPLICA_REGION_3, REPLICA_REGION_4, REPLICA_REGION_5]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Verify all replicas are ACTIVE + for region in [REPLICA_REGION_3, REPLICA_REGION_4, REPLICA_REGION_5]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + def test_remove_replica(self, table_with_replicas): + (ref, res) = table_with_replicas + table_name = res["spec"]["tableName"] + + assert self.table_exists(table_name) + + table.wait_until( + table_name, + table.replicas_match([REPLICA_REGION_1, REPLICA_REGION_2]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + for region in [REPLICA_REGION_1, REPLICA_REGION_2]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + cr = k8s.wait_resource_consumed_by_controller(ref) + current_replicas = table.get_replicas(table_name) + assert current_replicas is not None + assert len(current_replicas) >= 1 + + current_regions = [r["RegionName"] for r in current_replicas] + logging.info(f"Current replicas: {current_regions}") + + regions_to_keep = current_regions[:-1] + regions_to_remove = [current_regions[-1]] + + cr["spec"]["tableReplicas"] = [ + {"regionName": region} for region in regions_to_keep + ] + + k8s.patch_custom_resource(ref, cr) + + # Wait for the replica to be removed + table.wait_until( + table_name, + table.replicas_match(regions_to_keep), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Verify remaining replicas + replicas = table.get_replicas(table_name) + assert replicas is not None + assert len(replicas) == len(regions_to_keep) + + region_names = [r["RegionName"] for r in replicas] + for region in regions_to_keep: + assert region in region_names + for region in regions_to_remove: + assert region not in region_names + + def test_delete_table_with_replicas(self, table_with_replicas): + (ref, res) = table_with_replicas + + table_name = res["spec"]["tableName"] + assert self.table_exists(table_name) + + def test_terminal_condition_for_invalid_stream_specification(self, table_with_invalid_replicas): + (ref, res) = table_with_invalid_replicas + + table_name = res["spec"]["tableName"] + assert self.table_exists(table_name) + + max_wait_seconds = 120 + interval_seconds = 10 + start_time = time.time() + terminal_condition_found = False + + while time.time() - start_time < max_wait_seconds: + try: + condition.assert_type_status( + ref, + condition.CONDITION_TYPE_TERMINAL, + True) + + terminal_condition_found = True + cond = k8s.get_resource_condition( + ref, condition.CONDITION_TYPE_TERMINAL) + assert "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES" in cond[ + "message"] + break + except: + time.sleep(interval_seconds) + + assert terminal_condition_found, "Terminal condition was not set for invalid StreamSpecification" + + def test_staged_replicas_and_gsi_updates(self, table_replicas_gsi): + (ref, cr) = table_replicas_gsi + table_name = cr["spec"]["tableName"] + max_wait_seconds = REPLICA_WAIT_AFTER_SECONDS + interval_seconds = 30 + start_time = time.time() + + while time.time() - start_time < max_wait_seconds: + if self.table_exists(table_name): + break + time.sleep(interval_seconds) + assert self.table_exists(table_name) + + table.wait_until( + table_name, + table.gsi_matches([{ + "indexName": "GSI1", + "keySchema": [ + {"attributeName": "GSI1PK", "keyType": "HASH"}, + {"attributeName": "GSI1SK", "keyType": "RANGE"} + ], + "projection": { + "projectionType": "ALL" + } + }]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + # Step 2: Update - add second GSI and two more replicas + cr = k8s.wait_resource_consumed_by_controller(ref) + + # Add attribute definition needed for GSI2 + cr["spec"]["attributeDefinitions"].append( + {"attributeName": "GSI2PK", "attributeType": "S"} + ) + + # Add GSI2 + cr["spec"]["globalSecondaryIndexes"].append({ + "indexName": "GSI2", + "keySchema": [ + {"attributeName": "GSI2PK", "keyType": "HASH"} + ], + "projection": { + "projectionType": "KEYS_ONLY" + } + }) + + # Add two more replicas + cr["spec"]["tableReplicas"] = [ + {"regionName": REPLICA_REGION_1, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]}, + {"regionName": REPLICA_REGION_2, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]}, + {"regionName": REPLICA_REGION_3, + "globalSecondaryIndexes": [{"indexName": "GSI1"}, {"indexName": "GSI2"}]} + ] + + # Update the resource + k8s.patch_custom_resource(ref, cr) + + # Wait for the new GSI to be created + table.wait_until( + table_name, + table.gsi_matches([ + { + "indexName": "GSI1", + "keySchema": [ + {"attributeName": "GSI1PK", "keyType": "HASH"}, + {"attributeName": "GSI1SK", "keyType": "RANGE"} + ], + "projection": { + "projectionType": "ALL" + } + }, + { + "indexName": "GSI2", + "keySchema": [ + {"attributeName": "GSI2PK", "keyType": "HASH"} + ], + "projection": { + "projectionType": "KEYS_ONLY" + } + } + ]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS*2, + interval_seconds=30, + ) + + table.wait_until( + table_name, + table.replicas_match( + [REPLICA_REGION_1, REPLICA_REGION_2, REPLICA_REGION_3]), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS*2, + interval_seconds=30, + ) + + for region in [REPLICA_REGION_1, REPLICA_REGION_2, REPLICA_REGION_3]: + table.wait_until( + table_name, + table.replica_status_matches(region, "ACTIVE"), + timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, + interval_seconds=30, + ) + + table_info = table.get(table_name) + assert "GlobalSecondaryIndexes" in table_info + assert len(table_info["GlobalSecondaryIndexes"]) == 2 + gsi_names = [gsi["IndexName"] + for gsi in table_info["GlobalSecondaryIndexes"]] + assert "GSI1" in gsi_names + assert "GSI2" in gsi_names + + replicas = table.get_replicas(table_name) + assert replicas is not None + assert len(replicas) == 3 + region_names = [r["RegionName"] for r in replicas] + assert REPLICA_REGION_1 in region_names + assert REPLICA_REGION_2 in region_names + assert REPLICA_REGION_3 in region_names \ No newline at end of file