@@ -60,6 +60,8 @@ const (
60
60
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
61
61
pluginsUpdateAnnotation = "rabbitmq.com/pluginsUpdatedAt"
62
62
queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
63
+ serverConfAnnotation = "rabbitmq.com/serverConfUpdatedAt"
64
+ stsRestartAnnotation = "rabbitmq.com/lastRestartAt"
63
65
)
64
66
65
67
// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
@@ -103,28 +105,27 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
103
105
return ctrl.Result {}, nil
104
106
}
105
107
106
- // Resource has been marked for deletion
108
+ // Check if the resource has been marked for deletion
107
109
if ! rabbitmqCluster .ObjectMeta .DeletionTimestamp .IsZero () {
108
110
logger .Info ("Deleting RabbitmqCluster" ,
109
111
"namespace" , rabbitmqCluster .Namespace ,
110
112
"name" , rabbitmqCluster .Name )
111
- // Stop reconciliation as the item is being deleted
112
113
return ctrl.Result {}, r .prepareForDeletion (ctx , rabbitmqCluster )
113
114
}
114
115
116
+ // Ensure the resource have a deletion marker
117
+ if err := r .addFinalizerIfNeeded (ctx , rabbitmqCluster ); err != nil {
118
+ return ctrl.Result {}, err
119
+ }
120
+
115
121
// TLS: check if specified, and if secret exists
116
122
if rabbitmqCluster .TLSEnabled () {
117
123
if result , err := r .checkTLSSecrets (ctx , rabbitmqCluster ); err != nil {
118
124
return result , err
119
125
}
120
126
}
121
127
122
- if err := r .addFinalizerIfNeeded (ctx , rabbitmqCluster ); err != nil {
123
- return ctrl.Result {}, err
124
- }
125
-
126
128
childResources , err := r .getChildResources (ctx , * rabbitmqCluster )
127
-
128
129
if err != nil {
129
130
return ctrl.Result {}, err
130
131
}
@@ -195,12 +196,19 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
195
196
return ctrl.Result {}, err
196
197
}
197
198
198
- r .annotatePluginsConfigMapIfUpdated (ctx , builder , operationResult , rabbitmqCluster )
199
- if restarted := r .restartStatefulSetIfNeeded (ctx , builder , operationResult , rabbitmqCluster ); restarted {
200
- return ctrl.Result {RequeueAfter : time .Second * 10 }, nil
199
+ if err = r .annotateConfigMapIfUpdated (ctx , builder , operationResult , rabbitmqCluster ); err != nil {
200
+ return ctrl.Result {}, err
201
201
}
202
202
}
203
203
204
+ requeueAfter , err := r .restartStatefulSetIfNeeded (ctx , rabbitmqCluster )
205
+ if err != nil {
206
+ return ctrl.Result {}, err
207
+ }
208
+ if requeueAfter > 0 {
209
+ return ctrl.Result {RequeueAfter : requeueAfter }, nil
210
+ }
211
+
204
212
// Set ReconcileSuccess to true here because all CRUD operations to Kube API related
205
213
// to child resources returned no error
206
214
rabbitmqCluster .Status .SetCondition (status .ReconcileSuccess , corev1 .ConditionTrue , "Success" , "Created or Updated all child resources" )
@@ -216,7 +224,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
216
224
217
225
// By this point the StatefulSet may have finished deploying. Run any
218
226
// post-deploy steps if so, or requeue until the deployment is finished.
219
- requeueAfter , err : = r .runPostDeployStepsIfNeeded (ctx , rabbitmqCluster )
227
+ requeueAfter , err = r .runPostDeployStepsIfNeeded (ctx , rabbitmqCluster )
220
228
if err != nil {
221
229
return ctrl.Result {}, err
222
230
}
@@ -338,7 +346,7 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
338
346
}
339
347
340
348
// Retrieve the plugins config map, if it exists.
341
- pluginsConfig , err := r .pluginsConfigMap (ctx , rmq )
349
+ pluginsConfig , err := r .configMap (ctx , rmq , rmq . ChildResourceName ( resource . PluginsConfigName ) )
342
350
if client .IgnoreNotFound (err ) != nil {
343
351
return 0 , err
344
352
}
@@ -419,16 +427,32 @@ func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rm
419
427
return nil
420
428
}
421
429
422
- // Adds an arbitrary annotation (rabbitmq.com/lastRestartAt) to the StatefulSet PodTemplate to trigger a StatefulSet restart
423
- // if builder requires StatefulSet to be updated.
424
- func (r * RabbitmqClusterReconciler ) restartStatefulSetIfNeeded (
425
- ctx context.Context ,
426
- builder resource.ResourceBuilder ,
427
- operationResult controllerutil.OperationResult ,
428
- rmq * rabbitmqv1beta1.RabbitmqCluster ) (restarted bool ) {
430
+ // Adds an arbitrary annotation to the sts PodTemplate to trigger a sts restart
431
+ // it compares annotation "rabbitmq.com/serverConfUpdatedAt" from server-conf configMap and annotation "rabbitmq.com/lastRestartAt" from sts
432
+ // to determine whether to restart sts
433
+ func (r * RabbitmqClusterReconciler ) restartStatefulSetIfNeeded (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (time.Duration , error ) {
434
+ serverConf , err := r .configMap (ctx , rmq , rmq .ChildResourceName (resource .ServerConfigMapName ))
435
+ if err != nil {
436
+ // requeue request after 10s if unable to find server-conf configmap, else return the error
437
+ return 10 * time .Second , client .IgnoreNotFound (err )
438
+ }
429
439
430
- if ! (builder .UpdateRequiresStsRestart () && operationResult == controllerutil .OperationResultUpdated ) {
431
- return false
440
+ serverConfigUpdatedAt , ok := serverConf .Annotations [serverConfAnnotation ]
441
+ if ! ok {
442
+ // server-conf configmap hasn't been updated; no need to restart sts
443
+ return 0 , nil
444
+ }
445
+
446
+ sts , err := r .statefulSet (ctx , rmq )
447
+ if err != nil {
448
+ // requeue request after 10s if unable to find sts, else return the error
449
+ return 10 * time .Second , client .IgnoreNotFound (err )
450
+ }
451
+
452
+ stsRestartedAt , ok := sts .Spec .Template .ObjectMeta .Annotations [stsRestartAnnotation ]
453
+ if ok && stsRestartedAt > serverConfigUpdatedAt {
454
+ // sts was updated after the last server-conf configmap update; no need to restart sts
455
+ return 0 , nil
432
456
}
433
457
434
458
if err := clientretry .RetryOnConflict (clientretry .DefaultRetry , func () error {
@@ -439,19 +463,21 @@ func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
439
463
if sts .Spec .Template .ObjectMeta .Annotations == nil {
440
464
sts .Spec .Template .ObjectMeta .Annotations = make (map [string ]string )
441
465
}
442
- sts .Spec .Template .ObjectMeta .Annotations ["rabbitmq.com/lastRestartAt" ] = time .Now ().Format (time .RFC3339 )
466
+ sts .Spec .Template .ObjectMeta .Annotations [stsRestartAnnotation ] = time .Now ().Format (time .RFC3339 )
443
467
return r .Update (ctx , sts )
444
468
}); err != nil {
445
469
msg := fmt .Sprintf ("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated" , rmq .ChildResourceName ("server" ), rmq .Namespace )
446
470
r .Log .Error (err , msg )
447
471
r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
448
- return false
472
+ // failed to restart sts; return error to requeue request
473
+ return 0 , err
449
474
}
450
475
451
476
msg := fmt .Sprintf ("restarted StatefulSet %s of Namespace %s" , rmq .ChildResourceName ("server" ), rmq .Namespace )
452
477
r .Log .Info (msg )
453
478
r .Recorder .Event (rmq , corev1 .EventTypeNormal , "SuccessfulUpdate" , msg )
454
- return true
479
+
480
+ return 0 , nil
455
481
}
456
482
457
483
func (r * RabbitmqClusterReconciler ) statefulSet (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (* appsv1.StatefulSet , error ) {
@@ -462,43 +488,59 @@ func (r *RabbitmqClusterReconciler) statefulSet(ctx context.Context, rmq *rabbit
462
488
return sts , nil
463
489
}
464
490
465
- func (r * RabbitmqClusterReconciler ) pluginsConfigMap (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (* corev1.ConfigMap , error ) {
491
+ func (r * RabbitmqClusterReconciler ) configMap (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster , name string ) (* corev1.ConfigMap , error ) {
466
492
configMap := & corev1.ConfigMap {}
467
- if err := r .Get (ctx , types.NamespacedName {Namespace : rmq .Namespace , Name : rmq . ChildResourceName ( resource . PluginsConfig ) }, configMap ); err != nil {
493
+ if err := r .Get (ctx , types.NamespacedName {Namespace : rmq .Namespace , Name : name }, configMap ); err != nil {
468
494
return nil , err
469
495
}
470
496
return configMap , nil
471
497
}
472
498
473
- // Annotates the plugins ConfigMap if it was updated such that 'rabbitmq-plugins set' will be called on the RabbitMQ nodes at a later point in time
474
- func ( r * RabbitmqClusterReconciler ) annotatePluginsConfigMapIfUpdated (
475
- ctx context.Context ,
476
- builder resource. ResourceBuilder ,
477
- operationResult controllerutil. OperationResult ,
478
- rmq * rabbitmqv1beta1. RabbitmqCluster ) {
499
+ // Annotates the plugins ConfigMap or the server-conf ConfigMap
500
+ // annotations later used to indicate whether to call 'rabbitmq-plugins set' or to restart the sts
501
+ func ( r * RabbitmqClusterReconciler ) annotateConfigMapIfUpdated ( ctx context.Context , builder resource. ResourceBuilder , operationResult controllerutil. OperationResult , rmq * rabbitmqv1beta1. RabbitmqCluster ) error {
502
+ if operationResult != controllerutil . OperationResultUpdated {
503
+ return nil
504
+ }
479
505
480
- if _ , ok := builder .(* resource.RabbitmqPluginsConfigMapBuilder ); ! ok {
481
- return
506
+ var configMap , annotationKey string
507
+ switch builder .(type ) {
508
+ case * resource.RabbitmqPluginsConfigMapBuilder :
509
+ configMap = rmq .ChildResourceName (resource .PluginsConfigName )
510
+ annotationKey = pluginsUpdateAnnotation
511
+ case * resource.ServerConfigMapBuilder :
512
+ configMap = rmq .ChildResourceName (resource .ServerConfigMapName )
513
+ annotationKey = serverConfAnnotation
514
+ default :
515
+ return nil
482
516
}
483
- if operationResult != controllerutil .OperationResultUpdated {
484
- return
517
+
518
+ if err := r .annotateConfigMap (ctx , rmq .Namespace , configMap , annotationKey , time .Now ().Format (time .RFC3339 )); err != nil {
519
+ msg := fmt .Sprintf ("Failed to annotate ConfigMap %s of Namespace %s; %s may be outdated" , configMap , rmq .Namespace , rmq .Name )
520
+ r .Log .Error (err , msg )
521
+ r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
522
+ return err
485
523
}
486
524
525
+ r .Log .Info ("successfully annotated" , "ConfigMap" , configMap , "Namespace" , rmq .Namespace )
526
+ return nil
527
+ }
528
+
529
+ func (r * RabbitmqClusterReconciler ) annotateConfigMap (ctx context.Context , namespace , name , key , value string ) error {
487
530
if retryOnConflictErr := clientretry .RetryOnConflict (clientretry .DefaultRetry , func () error {
488
531
configMap := corev1.ConfigMap {}
489
- if err := r .Get (ctx , types.NamespacedName {Namespace : rmq . Namespace , Name : rmq . ChildResourceName ( resource . PluginsConfig ) }, & configMap ); err != nil {
532
+ if err := r .Get (ctx , types.NamespacedName {Namespace : namespace , Name : name }, & configMap ); err != nil {
490
533
return client .IgnoreNotFound (err )
491
534
}
492
535
if configMap .Annotations == nil {
493
536
configMap .Annotations = make (map [string ]string )
494
537
}
495
- configMap .Annotations [pluginsUpdateAnnotation ] = time . Now (). Format ( time . RFC3339 )
538
+ configMap .Annotations [key ] = value
496
539
return r .Update (ctx , & configMap )
497
540
}); retryOnConflictErr != nil {
498
- msg := fmt .Sprintf ("Failed to annotate ConfigMap %s of Namespace %s; enabled_plugins may be outdated" , rmq .ChildResourceName (resource .PluginsConfig ), rmq .Namespace )
499
- r .Log .Error (retryOnConflictErr , msg )
500
- r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
541
+ return retryOnConflictErr
501
542
}
543
+ return nil
502
544
}
503
545
504
546
func (r * RabbitmqClusterReconciler ) exec (namespace , podName , containerName string , command ... string ) (string , string , error ) {
@@ -600,8 +642,8 @@ func (r *RabbitmqClusterReconciler) addRabbitmqDeletionLabel(ctx context.Context
600
642
return nil
601
643
}
602
644
645
+ // addFinalizerIfNeeded adds a deletion finalizer if the RabbitmqCluster does not have one yet and is not marked for deletion
603
646
func (r * RabbitmqClusterReconciler ) addFinalizerIfNeeded (ctx context.Context , rabbitmqCluster * rabbitmqv1beta1.RabbitmqCluster ) error {
604
- // The RabbitmqCluster is not marked for deletion (no deletion timestamp) but does not have the deletion finalizer
605
647
if rabbitmqCluster .ObjectMeta .DeletionTimestamp .IsZero () && ! controllerutil .ContainsFinalizer (rabbitmqCluster , deletionFinalizer ) {
606
648
controllerutil .AddFinalizer (rabbitmqCluster , deletionFinalizer )
607
649
if err := r .Client .Update (ctx , rabbitmqCluster ); err != nil {
0 commit comments