@@ -18,14 +18,15 @@ import (
18
18
"strings"
19
19
"time"
20
20
21
- ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
22
21
ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
23
22
ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
23
+ ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
24
24
"github.com/aws/aws-sdk-go-v2/aws"
25
25
svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb"
26
26
svcsdktypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
27
27
28
28
"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
29
+ svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
29
30
)
30
31
31
32
// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects
@@ -262,25 +263,6 @@ func deleteReplicaUpdate(regionName string) svcsdktypes.ReplicationGroupUpdate {
262
263
}
263
264
}
264
265
265
- // canUpdateTableReplicas returns true if it's possible to update table replicas.
266
- // We can only modify replicas when they are in ACTIVE state.
267
- func canUpdateTableReplicas (r * resource ) bool {
268
- // Check if Status or Replicas is nil
269
- // needed when called by sdkdelete
270
- if r == nil || r .ko == nil || r .ko .Status .Replicas == nil {
271
- return true // If no replicas exist, we can proceed with updates
272
- }
273
- // Check if any replica is not in ACTIVE state
274
- for _ , replicaDesc := range r .ko .Status .Replicas {
275
- if replicaDesc .RegionName != nil && replicaDesc .ReplicaStatus != nil {
276
- if * replicaDesc .ReplicaStatus != string (svcsdktypes .ReplicaStatusActive ) {
277
- return false
278
- }
279
- }
280
- }
281
- return true
282
- }
283
-
284
266
// hasStreamSpecificationWithNewAndOldImages checks if the table has DynamoDB Streams enabled
285
267
// with the stream containing both the new and the old images of the item.
286
268
func hasStreamSpecificationWithNewAndOldImages (r * resource ) bool {
@@ -317,8 +299,8 @@ func (rm *resourceManager) syncReplicas(
317
299
// Handle specific errors
318
300
if awsErr , ok := ackerr .AWSError (err ); ok {
319
301
// Handle ValidationException - when replicas are not part of the global table
320
- if awsErr .ErrorCode () == "ValidationException" &&
321
- strings .Contains (awsErr .ErrorMessage (), "not part of the global table" ) {
302
+ if awsErr .ErrorCode () == "ValidationException" &&
303
+ strings .Contains (awsErr .ErrorMessage (), "not part of the global table" ) {
322
304
// A replica was already deleted
323
305
rlog .Debug ("replica already deleted from global table" ,
324
306
"table" , * latest .ko .Spec .TableName ,
@@ -328,7 +310,7 @@ func (rm *resourceManager) syncReplicas(
328
310
30 * time .Second ,
329
311
)
330
312
}
331
-
313
+
332
314
// Handle ResourceInUseException - when the table is being updated
333
315
if awsErr .ErrorCode () == "ResourceInUseException" {
334
316
rlog .Debug ("table is currently in use, will retry" ,
@@ -382,20 +364,29 @@ func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload(
382
364
383
365
if len (createReplicas ) > 0 {
384
366
replica := * createReplicas [0 ]
367
+ if checkReplicaStatus (latest .ko .Status .Replicas , * replica .RegionName ) {
368
+ return nil , 0 , requeueWaitReplicasActive
369
+ }
385
370
rlog .Debug ("creating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
386
371
input .ReplicaUpdates = append (input .ReplicaUpdates , createReplicaUpdate (createReplicas [0 ]))
387
372
return input , replicasInQueue , nil
388
373
}
389
374
390
375
if len (updateReplicas ) > 0 {
391
376
replica := * updateReplicas [0 ]
377
+ if checkReplicaStatus (latest .ko .Status .Replicas , * replica .RegionName ) {
378
+ return nil , 0 , requeueWaitReplicasActive
379
+ }
392
380
rlog .Debug ("updating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
393
381
input .ReplicaUpdates = append (input .ReplicaUpdates , updateReplicaUpdate (updateReplicas [0 ]))
394
382
return input , replicasInQueue , nil
395
383
}
396
384
397
385
if len (deleteRegions ) > 0 {
398
386
replica := deleteRegions [0 ]
387
+ if checkReplicaStatus (latest .ko .Status .Replicas , replica ) {
388
+ return nil , 0 , requeueWaitReplicasActive
389
+ }
399
390
rlog .Debug ("deleting replica in region" , "table" , * desired .ko .Spec .TableName , "region" , replica )
400
391
input .ReplicaUpdates = append (input .ReplicaUpdates , deleteReplicaUpdate (deleteRegions [0 ]))
401
392
return input , replicasInQueue , nil
@@ -451,3 +442,58 @@ func calculateReplicaUpdates(
451
442
452
443
return createReplicas , updateReplicas , deleteRegions
453
444
}
445
+
446
+ func setTableReplicas (ko * svcapitypes.Table , replicas []svcsdktypes.ReplicaDescription ) {
447
+ if len (replicas ) > 0 {
448
+ tableReplicas := []* v1alpha1.CreateReplicationGroupMemberAction {}
449
+ for _ , replica := range replicas {
450
+ replicaElem := & v1alpha1.CreateReplicationGroupMemberAction {}
451
+ if replica .RegionName != nil {
452
+ replicaElem .RegionName = replica .RegionName
453
+ }
454
+ if replica .KMSMasterKeyId != nil {
455
+ replicaElem .KMSMasterKeyID = replica .KMSMasterKeyId
456
+ }
457
+ if replica .ProvisionedThroughputOverride != nil {
458
+ replicaElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
459
+ ReadCapacityUnits : replica .ProvisionedThroughputOverride .ReadCapacityUnits ,
460
+ }
461
+ }
462
+ if replica .GlobalSecondaryIndexes != nil {
463
+ gsiList := []* v1alpha1.ReplicaGlobalSecondaryIndex {}
464
+ for _ , gsi := range replica .GlobalSecondaryIndexes {
465
+ gsiElem := & v1alpha1.ReplicaGlobalSecondaryIndex {
466
+ IndexName : gsi .IndexName ,
467
+ }
468
+ if gsi .ProvisionedThroughputOverride != nil {
469
+ gsiElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
470
+ ReadCapacityUnits : gsi .ProvisionedThroughputOverride .ReadCapacityUnits ,
471
+ }
472
+ }
473
+ gsiList = append (gsiList , gsiElem )
474
+ }
475
+ replicaElem .GlobalSecondaryIndexes = gsiList
476
+ }
477
+ if replica .ReplicaTableClassSummary != nil && replica .ReplicaTableClassSummary .TableClass != "" {
478
+ replicaElem .TableClassOverride = aws .String (string (replica .ReplicaTableClassSummary .TableClass ))
479
+ }
480
+ tableReplicas = append (tableReplicas , replicaElem )
481
+ }
482
+ ko .Spec .TableReplicas = tableReplicas
483
+ } else {
484
+ ko .Spec .TableReplicas = nil
485
+ }
486
+ }
487
+
488
+ func checkReplicaStatus (ReplicaDescription []* svcapitypes.ReplicaDescription , regionName string ) bool {
489
+ for _ , replica := range ReplicaDescription {
490
+ if * replica .RegionName == regionName {
491
+ replicaStatus := replica .ReplicaStatus
492
+ if * replicaStatus == string (svcsdktypes .ReplicaStatusCreating ) || * replicaStatus == string (svcsdktypes .ReplicaStatusDeleting ) || * replicaStatus == string (svcsdktypes .ReplicaStatusUpdating ) {
493
+ return true
494
+ }
495
+ }
496
+ }
497
+
498
+ return false
499
+ }
0 commit comments