-
Notifications
You must be signed in to change notification settings - Fork 20
Add TableReplicas
Support for DynamoDB Table Replicas
#120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ca89ad3
to
b53e944
Compare
replica update
supportreplica
support dynamoDB tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rushmash91 !
f42a7c9
to
81e1cbd
Compare
0185830
to
75da9b2
Compare
replica
support dynamoDB tablesReplicationGroup
Support for DynamoDB Table Replicas
5cdbb69
to
72c06f0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting close!
ko.Spec.TableReplicas = tableReplicas | ||
} else { | ||
ko.Spec.TableReplicas = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we supposed to also update the status in here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already being handled by sdkfind. So, not needed.
tableReplicas := []*svcapitypes.CreateReplicationGroupMemberAction{} | ||
for _, replica := range resp.Table.Replicas { | ||
replicaElem := &svcapitypes.CreateReplicationGroupMemberAction{} | ||
if replica.RegionName != nil { | ||
replicaElem.RegionName = replica.RegionName | ||
} | ||
if replica.KMSMasterKeyId != nil { | ||
replicaElem.KMSMasterKeyID = replica.KMSMasterKeyId | ||
} | ||
if replica.ProvisionedThroughputOverride != nil { | ||
replicaElem.ProvisionedThroughputOverride = &svcapitypes.ProvisionedThroughputOverride{ | ||
ReadCapacityUnits: replica.ProvisionedThroughputOverride.ReadCapacityUnits, | ||
} | ||
} | ||
if replica.GlobalSecondaryIndexes != nil { | ||
gsiList := []*svcapitypes.ReplicaGlobalSecondaryIndex{} | ||
for _, gsi := range replica.GlobalSecondaryIndexes { | ||
gsiElem := &svcapitypes.ReplicaGlobalSecondaryIndex{ | ||
IndexName: gsi.IndexName, | ||
} | ||
if gsi.ProvisionedThroughputOverride != nil { | ||
gsiElem.ProvisionedThroughputOverride = &svcapitypes.ProvisionedThroughputOverride{ | ||
ReadCapacityUnits: gsi.ProvisionedThroughputOverride.ReadCapacityUnits, | ||
} | ||
} | ||
gsiList = append(gsiList, gsiElem) | ||
} | ||
replicaElem.GlobalSecondaryIndexes = gsiList | ||
} | ||
if replica.ReplicaTableClassSummary != nil && replica.ReplicaTableClassSummary.TableClass != "" { | ||
replicaElem.TableClassOverride = aws.String(string(replica.ReplicaTableClassSummary.TableClass)) | ||
} | ||
tableReplicas = append(tableReplicas, replicaElem) | ||
} | ||
ko.Spec.TableReplicas = tableReplicas | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic can be move to one of the hooks files and we can just do:
if len(resp.Table.Replicas) > 0 {
setTableReplicas(...)
}
# Wait for both initial replicas to be created | ||
table.wait_until( | ||
table_name, | ||
table.replicas_match([REPLICA_REGION_1, REPLICA_REGION_2]), | ||
timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, | ||
interval_seconds=30, | ||
) | ||
|
||
# Wait for both replicas to be active | ||
for region in [REPLICA_REGION_1, REPLICA_REGION_2]: | ||
table.wait_until( | ||
table_name, | ||
table.replica_status_matches(region, "ACTIVE"), | ||
timeout_seconds=REPLICA_WAIT_AFTER_SECONDS, | ||
interval_seconds=30, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: isn't the wait in L145, sufficiant? will the table ever be marked as ACTIVE, when the replicas haven't been fully created?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the table can reach ACTIVE status before all of its replicas are fully active.
So, I wait for table to become active -> check if the replicas exist -> wait for all replicas to be active
if cr and "status" in cr and "conditions" in cr["status"]: | ||
for condition_obj in cr["status"]["conditions"]: | ||
if condition_obj["type"] == "ACK.Terminal" and condition_obj["status"] == "True": | ||
terminal_condition_set = True | ||
# Verify the error message | ||
assert "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES" in condition_obj[ | ||
"message"] | ||
break | ||
|
||
if terminal_condition_set: | ||
break | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could also use https://github.com/aws-controllers-k8s/test-infra/blob/main/src/acktest/k8s/condition.py#L29 (i know not all controllers are using them)
# Create the k8s resource | ||
ref = k8s.CustomResourceReference( | ||
CRD_GROUP, CRD_VERSION, RESOURCE_PLURAL, | ||
table_name, namespace="default", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this test fail, L528 will never be executed which will leave a dangling DDB Table in our accounts. Ideally you want to use a function that provisions the table, yields the info you need, and calls a delete function at the end.
c6facdd
to
0550f7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work on this @rushmash91 !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job on this one @rushmash91 🚀
left a few nits
@@ -224,6 +233,15 @@ func (rm *resourceManager) customUpdateTable( | |||
} | |||
return nil, err | |||
} | |||
case delta.DifferentAt("Spec.TableReplicas"): | |||
if !hasStreamSpecificationWithNewAndOldImages(desired) { | |||
msg := "table must have DynamoDB Streams enabled with StreamViewType set to NEW_AND_OLD_IMAGES for replica updates" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this actually affect it? it would be nice to have comments explaining why...maybe documentation for the table as well
) | ||
|
||
// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects | ||
func equalCreateReplicationGroupMemberActions(a, b *v1alpha1.CreateReplicationGroupMemberAction) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
func equalCreateReplicationGroupMemberActions(a, b *v1alpha1.CreateReplicationGroupMemberAction) bool { | |
func areReplicaGroupsDifferent(a, b *v1alpha1.CreateReplicationGroupMemberAction) bool { |
same for others below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more intuitive to keep the 'equal' naming pattern with negation rather than introducing 'areXDifferent'. using something like '!equalX()' maintains consistency with the existing naming convention accross all ACK
} | ||
|
||
// equalReplicaGlobalSecondaryIndexes compares two ReplicaGlobalSecondaryIndex objects | ||
func equalReplicaGlobalSecondaryIndexes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another nit: Should we move the delta logic out of this file? maybe create a new one called delta_hooks.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the controller is currently following this convention there are hooks files for GSI and continuous backup as well.
I'll have a separate PR making a delta files and a few other optimizations for all hooks.
for _, gsi := range replica.GlobalSecondaryIndexes { | ||
replicaGSI := svcsdktypes.ReplicaGlobalSecondaryIndex{} | ||
if gsi.IndexName != nil { | ||
replicaGSI.IndexName = aws.String(*gsi.IndexName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why not this? or is indexName a different type?
replicaGSI.IndexName = aws.String(*gsi.IndexName) | |
replicaGSI.IndexName = gsi.IndexName |
func updateReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) svcsdktypes.ReplicationGroupUpdate { | ||
replicaUpdate := svcsdktypes.ReplicationGroupUpdate{} | ||
updateAction := &svcsdktypes.UpdateReplicationGroupMemberAction{} | ||
isValidUpdate := false // updates to gsi without ProvisionedThroughputOverride are invalid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we set this to true and only change it to false when ProvisionedThroughputOverride is not defined?
or we could also not use it and just return nil when provisionedTroughputOverride is not defined
_, err = rm.sdkapi.UpdateTable(ctx, input) | ||
rm.metrics.RecordAPICall("UPDATE", "UpdateTable", err) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: gofmt
exit(err) | ||
}() | ||
|
||
createReplicas, updateReplicas, deleteRegions := calculateReplicaUpdates(latest, desired) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of calculating all here, what if we returned after we find a create, an update, or a delete, and call the update and requeue immediately? not sure if that would be overengineering tho..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just keeping consistency with the mess i created with GSIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer this. Also consistent with the GSI hooks. Since the api pattern is same. But renaming functions to computeReplicaupdatesDelta
} | ||
} | ||
|
||
func checkReplicaStatus(ReplicaDescription []*svcapitypes.ReplicaDescription, regionName string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can this have a better name? maybe checkIfReplicasInProgress
or checkIfReplicasUpdating
@rushmash91: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rushmash91 🚀
/lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: a-hilaly, michaelhtm, rushmash91 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
…controllers-k8s#120)" This reverts commit 755ecd8.
…ntrollers-k8s#120) This reverts commit 755ecd8. Ensure we keep the field in helm
fixes aws-controllers-k8s/community#2077
This PR implements support for managing DynamoDB table replicas through the
tableReplicas
field. This enhancement allows users to manage multi-region table replicas that automatically remain in sync. This is complemented by areplicaDescription
field in the Table status for tracking replica states.Changes Overview
tableReplicas
field to Table spec for defining replica configurations across regions