Skip to content

Commit a27503b

Browse files
committedDec 10, 2020
Spport pausing reconciliation by setting label on the CR
- when label rabbitmq.com/pauseReconciliation is set to true on a CR, reconciliation is skipped - publish event, write logs, and set NoWarnings status.condition to false when reconciliation is paused for a CR - tested in integration tests
1 parent d84f9e0 commit a27503b

File tree

3 files changed

+154
-72
lines changed

3 files changed

+154
-72
lines changed
 

‎controllers/rabbitmqcluster_controller.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ var (
4848
)
4949

5050
const (
51-
ownerKey = ".metadata.controller"
52-
ownerKind = "RabbitmqCluster"
51+
ownerKey = ".metadata.controller"
52+
ownerKind = "RabbitmqCluster"
53+
pauseReconciliationLabel = "rabbitmq.com/pauseReconciliation"
5354
)
5455

5556
// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
@@ -101,6 +102,23 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
101102
return ctrl.Result{}, r.prepareForDeletion(ctx, rabbitmqCluster)
102103
}
103104

105+
// exit if pause reconciliation label is set to true
106+
if v, ok := rabbitmqCluster.Labels[pauseReconciliationLabel]; ok && v == "true" {
107+
logger.Info("Not reconciling RabbitmqCluster",
108+
"namespace", rabbitmqCluster.Namespace,
109+
"name", rabbitmqCluster.Name)
110+
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning,
111+
"PausedReconciliation", fmt.Sprintf("label '%s' is set to true", pauseReconciliationLabel))
112+
113+
rabbitmqCluster.Status.SetCondition(status.NoWarnings, corev1.ConditionFalse, "reconciliation paused")
114+
if writerErr := r.Status().Update(ctx, rabbitmqCluster); writerErr != nil {
115+
r.Log.Error(writerErr, "Error trying to Update NoWarnings condition state",
116+
"namespace", rabbitmqCluster.Namespace,
117+
"name", rabbitmqCluster.Name)
118+
}
119+
return ctrl.Result{}, nil
120+
}
121+
104122
// Ensure the resource have a deletion marker
105123
if err := r.addFinalizerIfNeeded(ctx, rabbitmqCluster); err != nil {
106124
return ctrl.Result{}, err

‎controllers/rabbitmqcluster_controller_test.go

+126-58
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
)
3535

3636
const (
37-
ClusterCreationTimeout = 5 * time.Second
37+
ClusterCreationTimeout = 10 * time.Second
3838
ClusterDeletionTimeout = 5 * time.Second
3939
)
4040

@@ -79,19 +79,15 @@ var _ = Describe("RabbitmqClusterController", func() {
7979
})
8080

8181
By("creating the server conf configmap", func() {
82-
configMapName := cluster.ChildResourceName("server-conf")
83-
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, configMapName, metav1.GetOptions{})
84-
Expect(err).NotTo(HaveOccurred())
85-
Expect(configMap.Name).To(Equal(configMapName))
86-
Expect(configMap.OwnerReferences[0].Name).To(Equal(cluster.Name))
82+
cfm := configMap(ctx, cluster, "server-conf")
83+
Expect(cfm.Name).To(Equal(cluster.ChildResourceName("server-conf")))
84+
Expect(cfm.OwnerReferences[0].Name).To(Equal(cluster.Name))
8785
})
8886

8987
By("creating the plugins conf configmap", func() {
90-
configMapName := cluster.ChildResourceName("plugins-conf")
91-
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, configMapName, metav1.GetOptions{})
92-
Expect(err).NotTo(HaveOccurred())
93-
Expect(configMap.Name).To(Equal(configMapName))
94-
Expect(configMap.OwnerReferences[0].Name).To(Equal(cluster.Name))
88+
cfm := configMap(ctx, cluster, "plugins-conf")
89+
Expect(cfm.Name).To(Equal(cluster.ChildResourceName("plugins-conf")))
90+
Expect(cfm.OwnerReferences[0].Name).To(Equal(cluster.Name))
9591
})
9692

9793
By("creating a rabbitmq default-user secret", func() {
@@ -352,19 +348,13 @@ var _ = Describe("RabbitmqClusterController", func() {
352348
})
353349

354350
Context("Custom Resource updates", func() {
355-
var (
356-
svcName string
357-
stsName string
358-
)
359351
BeforeEach(func() {
360352
cluster = &rabbitmqv1beta1.RabbitmqCluster{
361353
ObjectMeta: metav1.ObjectMeta{
362354
Name: "rabbitmq-cr-update",
363355
Namespace: defaultNamespace,
364356
},
365357
}
366-
svcName = cluster.ChildResourceName("")
367-
stsName = cluster.ChildResourceName("server")
368358

369359
Expect(client.Create(ctx, cluster)).To(Succeed())
370360
waitForClusterCreation(ctx, cluster, client)
@@ -381,8 +371,7 @@ var _ = Describe("RabbitmqClusterController", func() {
381371
})).To(Succeed())
382372

383373
Eventually(func() map[string]string {
384-
svcName := cluster.ChildResourceName("")
385-
svc, _ := clientSet.CoreV1().Services(cluster.Namespace).Get(ctx, svcName, metav1.GetOptions{})
374+
svc := service(ctx, cluster, "")
386375
return svc.Annotations
387376
}, 3).Should(HaveKeyWithValue("test-key", "test-value"))
388377

@@ -409,8 +398,7 @@ var _ = Describe("RabbitmqClusterController", func() {
409398
})).To(Succeed())
410399

411400
Eventually(func() corev1.ResourceList {
412-
sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
413-
Expect(err).NotTo(HaveOccurred())
401+
sts := statefulSet(ctx, cluster)
414402
resourceRequirements = sts.Spec.Template.Spec.Containers[0].Resources
415403
return resourceRequirements.Requests
416404
}, 3).Should(HaveKeyWithValue(corev1.ResourceCPU, expectedRequirements.Requests[corev1.ResourceCPU]))
@@ -430,7 +418,7 @@ var _ = Describe("RabbitmqClusterController", func() {
430418
})).To(Succeed())
431419

432420
Eventually(func() string {
433-
sts, _ := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
421+
sts := statefulSet(ctx, cluster)
434422
return sts.Spec.Template.Spec.Containers[0].Image
435423
}, 3).Should(Equal("rabbitmq:3.8.0"))
436424
})
@@ -441,7 +429,7 @@ var _ = Describe("RabbitmqClusterController", func() {
441429
})).To(Succeed())
442430

443431
Eventually(func() []corev1.LocalObjectReference {
444-
sts, _ := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
432+
sts := statefulSet(ctx, cluster)
445433
return sts.Spec.Template.Spec.ImagePullSecrets
446434
}, 3).Should(ConsistOf(corev1.LocalObjectReference{Name: "my-new-secret"}))
447435
})
@@ -453,13 +441,11 @@ var _ = Describe("RabbitmqClusterController", func() {
453441
})).To(Succeed())
454442

455443
Eventually(func() map[string]string {
456-
service, err := clientSet.CoreV1().Services(cluster.Namespace).Get(ctx, svcName, metav1.GetOptions{})
457-
Expect(err).NotTo(HaveOccurred())
458-
return service.Labels
444+
svc := service(ctx, cluster, "")
445+
return svc.Labels
459446
}, 3).Should(HaveKeyWithValue("foo", "bar"))
460-
var sts *appsv1.StatefulSet
461447
Eventually(func() map[string]string {
462-
sts, _ = clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
448+
sts := statefulSet(ctx, cluster)
463449
return sts.Labels
464450
}, 3).Should(HaveKeyWithValue("foo", "bar"))
465451
})
@@ -477,22 +463,19 @@ var _ = Describe("RabbitmqClusterController", func() {
477463

478464
It("updates annotations for services", func() {
479465
Eventually(func() map[string]string {
480-
service, err := clientSet.CoreV1().Services(cluster.Namespace).Get(ctx, cluster.ChildResourceName("nodes"), metav1.GetOptions{})
481-
Expect(err).NotTo(HaveOccurred())
482-
return service.Annotations
466+
svc := service(ctx, cluster, "nodes")
467+
return svc.Annotations
483468
}, 3).Should(HaveKeyWithValue(annotationKey, annotationValue))
484469

485470
Eventually(func() map[string]string {
486-
service, err := clientSet.CoreV1().Services(cluster.Namespace).Get(ctx, cluster.ChildResourceName(""), metav1.GetOptions{})
487-
Expect(err).NotTo(HaveOccurred())
488-
return service.Annotations
471+
svc := service(ctx, cluster, "")
472+
return svc.Annotations
489473
}, 3).Should(HaveKeyWithValue(annotationKey, annotationValue))
490474
})
491475

492476
It("updates annotations for stateful set", func() {
493477
Eventually(func() map[string]string {
494-
sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
495-
Expect(err).NotTo(HaveOccurred())
478+
sts := statefulSet(ctx, cluster)
496479
return sts.Annotations
497480
}, 3).Should(HaveKeyWithValue(annotationKey, annotationValue))
498481
})
@@ -542,9 +525,8 @@ var _ = Describe("RabbitmqClusterController", func() {
542525
})).To(Succeed())
543526

544527
Eventually(func() string {
545-
service, err := clientSet.CoreV1().Services(cluster.Namespace).Get(ctx, cluster.ChildResourceName(""), metav1.GetOptions{})
546-
Expect(err).NotTo(HaveOccurred())
547-
return string(service.Spec.Type)
528+
svc := service(ctx, cluster, "")
529+
return string(svc.Spec.Type)
548530
}, 3).Should(Equal("NodePort"))
549531
})
550532

@@ -572,7 +554,7 @@ var _ = Describe("RabbitmqClusterController", func() {
572554
})).To(Succeed())
573555

574556
Eventually(func() *corev1.Affinity {
575-
sts, _ := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
557+
sts := statefulSet(ctx, cluster)
576558
return sts.Spec.Template.Spec.Affinity
577559
}, 3).Should(Equal(affinity))
578560

@@ -587,7 +569,7 @@ var _ = Describe("RabbitmqClusterController", func() {
587569
r.Spec.Affinity = affinity
588570
})).To(Succeed())
589571
Eventually(func() *corev1.Affinity {
590-
sts, _ := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, stsName, metav1.GetOptions{})
572+
sts := statefulSet(ctx, cluster)
591573
return sts.Spec.Template.Spec.Affinity
592574
}, 3).Should(BeNil())
593575
})
@@ -621,13 +603,9 @@ var _ = Describe("RabbitmqClusterController", func() {
621603
})
622604

623605
It("recreates child resources after deletion", func() {
624-
oldConfMap, err := clientSet.CoreV1().ConfigMaps(defaultNamespace).Get(ctx, configMapName, metav1.GetOptions{})
625-
Expect(err).NotTo(HaveOccurred())
626-
606+
oldConfMap := configMap(ctx, cluster, "server-conf")
627607
oldClientSvc := service(ctx, cluster, "")
628-
629608
oldHeadlessSvc := service(ctx, cluster, "nodes")
630-
631609
oldSts := statefulSet(ctx, cluster)
632610

633611
Expect(clientSet.AppsV1().StatefulSets(defaultNamespace).Delete(ctx, stsName, metav1.DeleteOptions{})).NotTo(HaveOccurred())
@@ -636,26 +614,17 @@ var _ = Describe("RabbitmqClusterController", func() {
636614
Expect(clientSet.CoreV1().Services(defaultNamespace).Delete(ctx, headlessServiceName, metav1.DeleteOptions{})).NotTo(HaveOccurred())
637615

638616
Eventually(func() bool {
639-
sts, err := clientSet.AppsV1().StatefulSets(defaultNamespace).Get(ctx, stsName, metav1.GetOptions{})
640-
if err != nil {
641-
return false
642-
}
617+
sts := statefulSet(ctx, cluster)
643618
return string(sts.UID) != string(oldSts.UID)
644619
}, 10).Should(BeTrue())
645620

646621
Eventually(func() bool {
647-
clientSvc, err := clientSet.CoreV1().Services(defaultNamespace).Get(ctx, svcName, metav1.GetOptions{})
648-
if err != nil {
649-
return false
650-
}
622+
clientSvc := service(ctx, cluster, "")
651623
return string(clientSvc.UID) != string(oldClientSvc.UID)
652624
}, 5).Should(BeTrue())
653625

654626
Eventually(func() bool {
655-
headlessSvc, err := clientSet.CoreV1().Services(defaultNamespace).Get(ctx, headlessServiceName, metav1.GetOptions{})
656-
if err != nil {
657-
return false
658-
}
627+
headlessSvc := service(ctx, cluster, "nodes")
659628
return string(headlessSvc.UID) != string(oldHeadlessSvc.UID)
660629
}, 5).Should(Not(Equal(oldHeadlessSvc.UID)))
661630

@@ -1096,6 +1065,94 @@ var _ = Describe("RabbitmqClusterController", func() {
10961065
})
10971066
})
10981067

1068+
Context("Pause reconciliation", func() {
1069+
BeforeEach(func() {
1070+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
1071+
ObjectMeta: metav1.ObjectMeta{
1072+
Name: "rabbitmq-pause-reconcile",
1073+
Namespace: defaultNamespace,
1074+
},
1075+
}
1076+
Expect(client.Create(ctx, cluster)).To(Succeed())
1077+
waitForClusterCreation(ctx, cluster, client)
1078+
})
1079+
1080+
AfterEach(func() {
1081+
Expect(client.Delete(ctx, cluster)).To(Succeed())
1082+
})
1083+
1084+
It("works", func() {
1085+
By("skipping reconciling if label is set to true", func() {
1086+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
1087+
r.Labels = map[string]string{"rabbitmq.com/pauseReconciliation": "true"}
1088+
r.Spec.Service.Type = "LoadBalancer"
1089+
r.Spec.Rabbitmq.AdditionalConfig = "test=test"
1090+
})).To(Succeed())
1091+
1092+
// service type is unchanged
1093+
Consistently(func() corev1.ServiceType {
1094+
return service(ctx, cluster, "").Spec.Type
1095+
}, 10*time.Second).Should(Equal(corev1.ServiceTypeClusterIP))
1096+
1097+
// configMap and statefulSet do not have their update and restart annotations set
1098+
Expect(configMap(ctx, cluster, "server-conf").Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
1099+
Expect(statefulSet(ctx, cluster).Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))
1100+
1101+
// PausedReconciliation event is published
1102+
Expect(aggregateEventMsgs(ctx, cluster, "PausedReconciliation")).To(
1103+
ContainSubstring("label 'rabbitmq.com/pauseReconciliation' is set to true"))
1104+
1105+
// NoWarnings status.condition is set to false with reason 'reconciliation paused'
1106+
Eventually(func() string {
1107+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
1108+
Expect(client.Get(ctx,
1109+
types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)).To(Succeed())
1110+
for i := range rmq.Status.Conditions {
1111+
if rmq.Status.Conditions[i].Type == status.NoWarnings {
1112+
return fmt.Sprintf("NoWarnings status: %s with reason: %s",
1113+
rmq.Status.Conditions[i].Status,
1114+
rmq.Status.Conditions[i].Reason)
1115+
}
1116+
}
1117+
return "NoWarnings status: condition not present"
1118+
}, 5).Should(Equal("NoWarnings status: False with reason: reconciliation paused"))
1119+
})
1120+
1121+
By("resuming reconciliation when label is removed", func() {
1122+
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
1123+
r.Labels = map[string]string{}
1124+
})).To(Succeed())
1125+
1126+
// service type is updated
1127+
Eventually(func() corev1.ServiceType {
1128+
svc := service(ctx, cluster, "")
1129+
return svc.Spec.Type
1130+
}, 10*time.Second).Should(Equal(corev1.ServiceTypeLoadBalancer))
1131+
1132+
// configMap and statefulSet now have their update and restart annotations set
1133+
Eventually(func() map[string]string {
1134+
return configMap(ctx, cluster, "server-conf").Annotations
1135+
}, 10*time.Second).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
1136+
Eventually(func() map[string]string {
1137+
return statefulSet(ctx, cluster).Spec.Template.Annotations
1138+
}, 10*time.Second).Should(HaveKey("rabbitmq.com/lastRestartAt"))
1139+
1140+
// NoWarnings status.condition is set to true
1141+
Eventually(func() string {
1142+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
1143+
Expect(client.Get(ctx,
1144+
types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)).To(Succeed())
1145+
for i := range rmq.Status.Conditions {
1146+
if rmq.Status.Conditions[i].Type == status.NoWarnings {
1147+
return fmt.Sprintf("NoWarnings status: %s", rmq.Status.Conditions[i].Status)
1148+
}
1149+
}
1150+
return "NoWarnings status: condition not present"
1151+
}, 5).Should(Equal("NoWarnings status: True"))
1152+
})
1153+
})
1154+
})
1155+
10991156
})
11001157

11011158
func extractContainer(containers []corev1.Container, containerName string) corev1.Container {
@@ -1144,6 +1201,17 @@ func service(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqClust
11441201
return svc
11451202
}
11461203

1204+
func configMap(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, configMapName string) *corev1.ConfigMap {
1205+
cfmName := rabbitmqCluster.ChildResourceName(configMapName)
1206+
var cfm *corev1.ConfigMap
1207+
EventuallyWithOffset(1, func() error {
1208+
var err error
1209+
cfm, err = clientSet.CoreV1().ConfigMaps(rabbitmqCluster.Namespace).Get(ctx, cfmName, metav1.GetOptions{})
1210+
return err
1211+
}, 10).Should(Succeed())
1212+
return cfm
1213+
}
1214+
11471215
func createSecret(ctx context.Context, secretName string, namespace string, data map[string]string) (corev1.Secret, error) {
11481216
secret := corev1.Secret{
11491217
ObjectMeta: metav1.ObjectMeta{

‎controllers/reconcile_rabbitmq_configurations_test.go

+8-12
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ var _ = Describe("Reconcile rabbitmq Configurations", func() {
2828
Expect(client.Create(ctx, cluster)).To(Succeed())
2929
waitForClusterCreation(ctx, cluster, client)
3030

31-
// ensure that configMap and statefulSet does not have annotations set when configurations haven't changed
32-
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server-conf"), metav1.GetOptions{})
33-
Expect(err).To(Not(HaveOccurred()))
34-
Expect(configMap.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
31+
// ensure that cfm and statefulSet does not have annotations set when configurations haven't changed
32+
cfm := configMap(ctx, cluster, "server-conf")
33+
Expect(cfm.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
3534

36-
sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
37-
Expect(err).To(Not(HaveOccurred()))
35+
sts := statefulSet(ctx, cluster)
3836
Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))
3937

4038
// update rabbitmq server configurations
@@ -54,19 +52,17 @@ var _ = Describe("Reconcile rabbitmq Configurations", func() {
5452
// ensure annotations from the server-conf ConfigMap
5553
var annotations map[string]string
5654
Eventually(func() map[string]string {
57-
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server-conf"), metav1.GetOptions{})
58-
Expect(err).To(Not(HaveOccurred()))
59-
annotations = configMap.Annotations
55+
cfm := configMap(ctx, cluster, "server-conf")
56+
annotations = cfm.Annotations
6057
return annotations
6158
}, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
62-
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"])
59+
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"])
6360
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp")
6461

6562
By("annotating the sts podTemplate")
6663
// ensure statefulSet annotations
6764
Eventually(func() map[string]string {
68-
sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
69-
Expect(err).To(Not(HaveOccurred()))
65+
sts := statefulSet(ctx, cluster)
7066
annotations = sts.Spec.Template.Annotations
7167
return annotations
7268
}, 5).Should(HaveKey("rabbitmq.com/lastRestartAt"))

0 commit comments

Comments
 (0)
Please sign in to comment.