@@ -15,17 +15,14 @@ package table
15
15
16
16
import (
17
17
"context"
18
- "strings"
19
- "time"
20
18
21
19
ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
22
- ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors"
23
- ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
24
20
"github.com/aws/aws-sdk-go-v2/aws"
25
21
svcsdk "github.com/aws/aws-sdk-go-v2/service/dynamodb"
26
22
svcsdktypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
27
23
28
24
"github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
25
+ svcapitypes "github.com/aws-controllers-k8s/dynamodb-controller/apis/v1alpha1"
29
26
)
30
27
31
28
// equalCreateReplicationGroupMemberActions compares two CreateReplicationGroupMemberAction objects
@@ -178,7 +175,7 @@ func createReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) s
178
175
for _ , gsi := range replica .GlobalSecondaryIndexes {
179
176
replicaGSI := svcsdktypes.ReplicaGlobalSecondaryIndex {}
180
177
if gsi .IndexName != nil {
181
- replicaGSI .IndexName = aws . String ( * gsi .IndexName )
178
+ replicaGSI .IndexName = gsi .IndexName
182
179
}
183
180
if gsi .ProvisionedThroughputOverride != nil {
184
181
replicaGSI .ProvisionedThroughputOverride = & svcsdktypes.ProvisionedThroughputOverride {}
@@ -250,7 +247,9 @@ func updateReplicaUpdate(replica *v1alpha1.CreateReplicationGroupMemberAction) s
250
247
}
251
248
252
249
// If no valid updates, return an empty ReplicationGroupUpdate
253
- return svcsdktypes.ReplicationGroupUpdate {}
250
+ return svcsdktypes.ReplicationGroupUpdate {
251
+ Update : nil ,
252
+ }
254
253
}
255
254
256
255
// deleteReplicaUpdate creates a ReplicationGroupUpdate for deleting an existing replica
@@ -262,25 +261,6 @@ func deleteReplicaUpdate(regionName string) svcsdktypes.ReplicationGroupUpdate {
262
261
}
263
262
}
264
263
265
- // canUpdateTableReplicas returns true if it's possible to update table replicas.
266
- // We can only modify replicas when they are in ACTIVE state.
267
- func canUpdateTableReplicas (r * resource ) bool {
268
- // Check if Status or Replicas is nil
269
- // needed when called by sdkdelete
270
- if r == nil || r .ko == nil || r .ko .Status .Replicas == nil {
271
- return true // If no replicas exist, we can proceed with updates
272
- }
273
- // Check if any replica is not in ACTIVE state
274
- for _ , replicaDesc := range r .ko .Status .Replicas {
275
- if replicaDesc .RegionName != nil && replicaDesc .ReplicaStatus != nil {
276
- if * replicaDesc .ReplicaStatus != string (svcsdktypes .ReplicaStatusActive ) {
277
- return false
278
- }
279
- }
280
- }
281
- return true
282
- }
283
-
284
264
// hasStreamSpecificationWithNewAndOldImages checks if the table has DynamoDB Streams enabled
285
265
// with the stream containing both the new and the old images of the item.
286
266
func hasStreamSpecificationWithNewAndOldImages (r * resource ) bool {
@@ -314,33 +294,7 @@ func (rm *resourceManager) syncReplicas(
314
294
_ , err = rm .sdkapi .UpdateTable (ctx , input )
315
295
rm .metrics .RecordAPICall ("UPDATE" , "UpdateTable" , err )
316
296
if err != nil {
317
- // Handle specific errors
318
- if awsErr , ok := ackerr .AWSError (err ); ok {
319
- // Handle ValidationException - when replicas are not part of the global table
320
- if awsErr .ErrorCode () == "ValidationException" &&
321
- strings .Contains (awsErr .ErrorMessage (), "not part of the global table" ) {
322
- // A replica was already deleted
323
- rlog .Debug ("replica already deleted from global table" ,
324
- "table" , * latest .ko .Spec .TableName ,
325
- "error" , awsErr .ErrorMessage ())
326
- return ackrequeue .NeededAfter (
327
- ErrTableUpdating ,
328
- 30 * time .Second ,
329
- )
330
- }
331
-
332
- // Handle ResourceInUseException - when the table is being updated
333
- if awsErr .ErrorCode () == "ResourceInUseException" {
334
- rlog .Debug ("table is currently in use, will retry" ,
335
- "table" , * latest .ko .Spec .TableName ,
336
- "error" , awsErr .ErrorMessage ())
337
- return ackrequeue .NeededAfter (
338
- ErrTableUpdating ,
339
- 30 * time .Second ,
340
- )
341
- }
342
- return err
343
- }
297
+ return err
344
298
}
345
299
346
300
// If there are more replicas to process, requeue
@@ -367,7 +321,7 @@ func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload(
367
321
exit (err )
368
322
}()
369
323
370
- createReplicas , updateReplicas , deleteRegions := calculateReplicaUpdates (latest , desired )
324
+ createReplicas , updateReplicas , deleteRegions := computeReplicaupdatesDelta (latest , desired )
371
325
372
326
input = & svcsdk.UpdateTableInput {
373
327
TableName : aws .String (* desired .ko .Spec .TableName ),
@@ -382,20 +336,33 @@ func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload(
382
336
383
337
if len (createReplicas ) > 0 {
384
338
replica := * createReplicas [0 ]
339
+ if checkIfReplicasInProgress (latest .ko .Status .Replicas , * replica .RegionName ) {
340
+ return nil , 0 , requeueWaitReplicasActive
341
+ }
385
342
rlog .Debug ("creating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
386
343
input .ReplicaUpdates = append (input .ReplicaUpdates , createReplicaUpdate (createReplicas [0 ]))
387
344
return input , replicasInQueue , nil
388
345
}
389
346
390
347
if len (updateReplicas ) > 0 {
391
348
replica := * updateReplicas [0 ]
349
+ if checkIfReplicasInProgress (latest .ko .Status .Replicas , * replica .RegionName ) {
350
+ return nil , 0 , requeueWaitReplicasActive
351
+ }
392
352
rlog .Debug ("updating replica in region" , "table" , * desired .ko .Spec .TableName , "region" , * replica .RegionName )
393
- input .ReplicaUpdates = append (input .ReplicaUpdates , updateReplicaUpdate (updateReplicas [0 ]))
353
+ updateReplica := updateReplicaUpdate (updateReplicas [0 ])
354
+ if updateReplica .Update == nil {
355
+ return nil , 0 , requeueWaitReplicasActive
356
+ }
357
+ input .ReplicaUpdates = append (input .ReplicaUpdates , updateReplica )
394
358
return input , replicasInQueue , nil
395
359
}
396
360
397
361
if len (deleteRegions ) > 0 {
398
362
replica := deleteRegions [0 ]
363
+ if checkIfReplicasInProgress (latest .ko .Status .Replicas , replica ) {
364
+ return nil , 0 , requeueWaitReplicasActive
365
+ }
399
366
rlog .Debug ("deleting replica in region" , "table" , * desired .ko .Spec .TableName , "region" , replica )
400
367
input .ReplicaUpdates = append (input .ReplicaUpdates , deleteReplicaUpdate (deleteRegions [0 ]))
401
368
return input , replicasInQueue , nil
@@ -404,9 +371,9 @@ func (rm *resourceManager) newUpdateTableReplicaUpdatesOneAtATimePayload(
404
371
return input , replicasInQueue , nil
405
372
}
406
373
407
- // calculateReplicaUpdates calculates the replica updates needed to reconcile the latest state with the desired state
374
+ // computeReplicaupdatesDelta calculates the replica updates needed to reconcile the latest state with the desired state
408
375
// Returns three slices: replicas to create, replicas to update, and region names to delete
409
- func calculateReplicaUpdates (
376
+ func computeReplicaupdatesDelta (
410
377
latest * resource ,
411
378
desired * resource ,
412
379
) (
@@ -451,3 +418,58 @@ func calculateReplicaUpdates(
451
418
452
419
return createReplicas , updateReplicas , deleteRegions
453
420
}
421
+
422
+ func setTableReplicas (ko * svcapitypes.Table , replicas []svcsdktypes.ReplicaDescription ) {
423
+ if len (replicas ) > 0 {
424
+ tableReplicas := []* v1alpha1.CreateReplicationGroupMemberAction {}
425
+ for _ , replica := range replicas {
426
+ replicaElem := & v1alpha1.CreateReplicationGroupMemberAction {}
427
+ if replica .RegionName != nil {
428
+ replicaElem .RegionName = replica .RegionName
429
+ }
430
+ if replica .KMSMasterKeyId != nil {
431
+ replicaElem .KMSMasterKeyID = replica .KMSMasterKeyId
432
+ }
433
+ if replica .ProvisionedThroughputOverride != nil {
434
+ replicaElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
435
+ ReadCapacityUnits : replica .ProvisionedThroughputOverride .ReadCapacityUnits ,
436
+ }
437
+ }
438
+ if replica .GlobalSecondaryIndexes != nil {
439
+ gsiList := []* v1alpha1.ReplicaGlobalSecondaryIndex {}
440
+ for _ , gsi := range replica .GlobalSecondaryIndexes {
441
+ gsiElem := & v1alpha1.ReplicaGlobalSecondaryIndex {
442
+ IndexName : gsi .IndexName ,
443
+ }
444
+ if gsi .ProvisionedThroughputOverride != nil {
445
+ gsiElem .ProvisionedThroughputOverride = & v1alpha1.ProvisionedThroughputOverride {
446
+ ReadCapacityUnits : gsi .ProvisionedThroughputOverride .ReadCapacityUnits ,
447
+ }
448
+ }
449
+ gsiList = append (gsiList , gsiElem )
450
+ }
451
+ replicaElem .GlobalSecondaryIndexes = gsiList
452
+ }
453
+ if replica .ReplicaTableClassSummary != nil && replica .ReplicaTableClassSummary .TableClass != "" {
454
+ replicaElem .TableClassOverride = aws .String (string (replica .ReplicaTableClassSummary .TableClass ))
455
+ }
456
+ tableReplicas = append (tableReplicas , replicaElem )
457
+ }
458
+ ko .Spec .TableReplicas = tableReplicas
459
+ } else {
460
+ ko .Spec .TableReplicas = nil
461
+ }
462
+ }
463
+
464
+ func checkIfReplicasInProgress (ReplicaDescription []* svcapitypes.ReplicaDescription , regionName string ) bool {
465
+ for _ , replica := range ReplicaDescription {
466
+ if * replica .RegionName == regionName {
467
+ replicaStatus := replica .ReplicaStatus
468
+ if * replicaStatus == string (svcsdktypes .ReplicaStatusCreating ) || * replicaStatus == string (svcsdktypes .ReplicaStatusDeleting ) || * replicaStatus == string (svcsdktypes .ReplicaStatusUpdating ) {
469
+ return true
470
+ }
471
+ }
472
+ }
473
+
474
+ return false
475
+ }
0 commit comments