Skip to content

Commit 30c058f

Browse files
committed
Enable all feature flags for new clusters
Closes #414
1 parent 7244faa commit 30c058f

9 files changed

+245
-132
lines changed

controllers/rabbitmqcluster_controller.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ import (
1414
"context"
1515
"encoding/json"
1616
"fmt"
17+
"reflect"
18+
"strings"
19+
1720
"github.com/rabbitmq/cluster-operator/internal/resource"
1821
"github.com/rabbitmq/cluster-operator/internal/status"
1922
"k8s.io/apimachinery/pkg/api/errors"
2023
"k8s.io/client-go/kubernetes"
2124
"k8s.io/client-go/rest"
2225
"k8s.io/client-go/tools/record"
23-
"reflect"
24-
"strings"
2526

2627
"k8s.io/apimachinery/pkg/types"
2728

@@ -182,7 +183,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
182183
return ctrl.Result{}, err
183184
}
184185

185-
if err = r.annotateConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster); err != nil {
186+
if err = r.annotateIfNeeded(ctx, builder, operationResult, rabbitmqCluster); err != nil {
186187
return ctrl.Result{}, err
187188
}
188189
}

controllers/rabbitmqcluster_controller_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1188,12 +1188,11 @@ func createSecret(ctx context.Context, secretName string, namespace string, data
11881188
func waitForClusterCreation(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, client runtimeClient.Client) {
11891189
EventuallyWithOffset(1, func() string {
11901190
rabbitmqClusterCreated := rabbitmqv1beta1.RabbitmqCluster{}
1191-
err := client.Get(
1191+
if err := client.Get(
11921192
ctx,
11931193
types.NamespacedName{Name: rabbitmqCluster.Name, Namespace: rabbitmqCluster.Namespace},
11941194
&rabbitmqClusterCreated,
1195-
)
1196-
if err != nil {
1195+
); err != nil {
11971196
return fmt.Sprintf("%v+", err)
11981197
}
11991198

controllers/reconcile_queue_rebalance.go renamed to controllers/reconcile_post_deploy.go

+68-9
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ package controllers
33
import (
44
"context"
55
"fmt"
6+
"time"
7+
68
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
79
"github.com/rabbitmq/cluster-operator/internal/resource"
810
appsv1 "k8s.io/api/apps/v1"
11+
corev1 "k8s.io/api/core/v1"
912
"sigs.k8s.io/controller-runtime/pkg/client"
10-
"time"
1113
)
1214

1315
const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
@@ -53,7 +55,7 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
5355
// plugins configMap was updated very recently
5456
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
5557
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
56-
r.Log.Info("requeuing request to set plugins on RabbitmqCluster",
58+
r.Log.Info("requeuing request to set plugins",
5759
"namespace", rmq.Namespace,
5860
"name", rmq.Name)
5961
return 2 * time.Second, nil
@@ -65,28 +67,85 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
6567
}
6668
}
6769

70+
// If RabbitMQ cluster is newly created, enable all feature flags since some are disabled by default
71+
if sts.ObjectMeta.Annotations != nil && sts.ObjectMeta.Annotations[stsCreateAnnotation] != "" {
72+
if err := r.enableAllFeatureFlags(ctx, rmq, sts); err != nil {
73+
return 0, err
74+
}
75+
}
76+
6877
// If the cluster has been marked as needing it, run rabbitmq-queues rebalance all
69-
if rmq.ObjectMeta.Annotations != nil && len(rmq.ObjectMeta.Annotations[queueRebalanceAnnotation]) > 0 {
70-
err = r.runQueueRebalanceCommand(ctx, rmq)
78+
if rmq.ObjectMeta.Annotations != nil && rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] != "" {
79+
if err := r.runQueueRebalanceCommand(ctx, rmq); err != nil {
80+
return 0, err
81+
}
82+
}
83+
84+
return 0, nil
85+
}
86+
87+
func (r *RabbitmqClusterReconciler) enableAllFeatureFlags(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
88+
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
89+
cmd := "set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
90+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
91+
if err != nil {
92+
r.Log.Error(err, "failed to enable all feature flags",
93+
"namespace", rmq.Namespace,
94+
"name", rmq.Name,
95+
"pod", podName,
96+
"command", cmd,
97+
"stdout", stdout,
98+
"stderr", stderr)
99+
return err
100+
}
101+
r.Log.Info("successfully enabled all feature flags",
102+
"namespace", rmq.Namespace,
103+
"name", rmq.Name)
104+
return r.deleteAnnotation(ctx, sts, stsCreateAnnotation)
105+
}
106+
107+
// There are 2 paths how plugins are set:
108+
// 1. When StatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
109+
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
110+
// This method implements the 2nd path.
111+
func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, configMap *corev1.ConfigMap) error {
112+
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
113+
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
114+
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
115+
rabbitCommand := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
116+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", rabbitCommand)
117+
if err != nil {
118+
r.Log.Error(err, "failed to set plugins",
119+
"namespace", rmq.Namespace,
120+
"name", rmq.Name,
121+
"pod", podName,
122+
"command", rabbitCommand,
123+
"stdout", stdout,
124+
"stderr", stderr)
125+
return err
126+
}
71127
}
72-
return 0, err
128+
r.Log.Info("successfully set plugins",
129+
"namespace", rmq.Namespace,
130+
"name", rmq.Name)
131+
return r.deleteAnnotation(ctx, configMap, pluginsUpdateAnnotation)
73132
}
74133

75134
func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
76135
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
77-
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", "rabbitmq-queues rebalance all")
136+
cmd := "rabbitmq-queues rebalance all"
137+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
78138
if err != nil {
79139
r.Log.Error(err, "failed to run queue rebalance",
80140
"namespace", rmq.Namespace,
81141
"name", rmq.Name,
82142
"pod", podName,
83-
"command", "rabbitmq-queues rebalance all",
143+
"command", cmd,
84144
"stdout", stdout,
85145
"stderr", stderr)
86146
return err
87147
}
88-
delete(rmq.ObjectMeta.Annotations, queueRebalanceAnnotation)
89-
return r.Update(ctx, rmq)
148+
return r.deleteAnnotation(ctx, rmq, queueRebalanceAnnotation)
90149
}
91150

92151
func statefulSetNeedsQueueRebalance(sts *appsv1.StatefulSet, rmq *rabbitmqv1beta1.RabbitmqCluster) bool {

controllers/reconcile_queue_rebalance_test.go renamed to controllers/reconcile_post_deploy_test.go

+48-31
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package controllers_test
22

33
import (
4+
"time"
5+
46
"k8s.io/apimachinery/pkg/types"
57
"k8s.io/utils/pointer"
6-
"time"
78

89
appsv1 "k8s.io/api/apps/v1"
910
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -13,7 +14,7 @@ import (
1314
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
1415
)
1516

16-
var _ = Describe("Reconcile queue Rebalance", func() {
17+
var _ = Describe("Reconcile post deploy", func() {
1718
var (
1819
cluster *rabbitmqv1beta1.RabbitmqCluster
1920
annotations map[string]string
@@ -29,6 +30,41 @@ var _ = Describe("Reconcile queue Rebalance", func() {
2930
waitForClusterDeletion(ctx, cluster, client)
3031
})
3132

33+
When("cluster is created", func() {
34+
var sts *appsv1.StatefulSet
35+
BeforeEach(func() {
36+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
37+
ObjectMeta: metav1.ObjectMeta{
38+
Name: "rabbitmq-feature-flags",
39+
Namespace: defaultNamespace,
40+
},
41+
}
42+
Expect(client.Create(ctx, cluster)).To(Succeed())
43+
waitForClusterCreation(ctx, cluster, client)
44+
})
45+
46+
It("enables all feature flags", func() {
47+
By("annotating that StatefulSet got created", func() {
48+
sts = statefulSet(ctx, cluster)
49+
value := sts.ObjectMeta.Annotations["rabbitmq.com/createdAt"]
50+
_, err := time.Parse(time.RFC3339, value)
51+
Expect(err).NotTo(HaveOccurred(), "annotation rabbitmq.com/createdAt was not a valid RFC3339 timestamp")
52+
})
53+
54+
By("enabling all feature flags once all Pods are up, and removing the annotation", func() {
55+
sts.Status.Replicas = 1
56+
sts.Status.ReadyReplicas = 1
57+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
58+
Eventually(func() map[string]string {
59+
Expect(client.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.ChildResourceName("server")}, sts)).To(Succeed())
60+
return sts.ObjectMeta.Annotations
61+
}, 5).ShouldNot(HaveKey("rabbitmq.com/createdAt"))
62+
Expect(fakeExecutor.ExecutedCommands()).To(ContainElement(command{"bash", "-c",
63+
"set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"}))
64+
})
65+
})
66+
})
67+
3268
When("the cluster is configured to run post-deploy steps", func() {
3369
BeforeEach(func() {
3470
cluster = &rabbitmqv1beta1.RabbitmqCluster{
@@ -54,22 +90,19 @@ var _ = Describe("Reconcile queue Rebalance", func() {
5490
sts.Status.UpdatedReplicas = 1
5591
sts.Status.UpdateRevision = "some-new-revision"
5692

57-
statusWriter := client.Status()
58-
err := statusWriter.Update(ctx, sts)
59-
Expect(err).NotTo(HaveOccurred())
93+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
6094
})
6195

6296
It("triggers the controller to run rabbitmq-queues rebalance all", func() {
6397
By("setting an annotation on the CR", func() {
6498
Eventually(func() map[string]string {
6599
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
66-
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
67-
Expect(err).To(BeNil())
100+
Expect(client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)).To(Succeed())
68101
annotations = rmq.ObjectMeta.Annotations
69102
return annotations
70103
}, 5).Should(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
71104
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/queueRebalanceNeededAt"])
72-
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/queueRebalanceNeededAt was not a valid RFC3339 timestamp")
105+
Expect(err).NotTo(HaveOccurred(), "annotation rabbitmq.com/queueRebalanceNeededAt was not a valid RFC3339 timestamp")
73106
})
74107

75108
By("not removing the annotation when all replicas are updated but not yet ready", func() {
@@ -78,9 +111,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
78111
sts.Status.UpdatedReplicas = 3
79112
sts.Status.UpdateRevision = "some-new-revision"
80113
sts.Status.ReadyReplicas = 2
81-
statusWriter := client.Status()
82-
err := statusWriter.Update(ctx, sts)
83-
Expect(err).NotTo(HaveOccurred())
114+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
84115
Eventually(func() map[string]string {
85116
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
86117
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
@@ -89,15 +120,13 @@ var _ = Describe("Reconcile queue Rebalance", func() {
89120
return annotations
90121
}, 5).Should(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
91122
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
92-
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/queueRebalanceNeededAt"])
123+
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/queueRebalanceNeededAt"])
93124
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/queueRebalanceNeededAt was not a valid RFC3339 timestamp")
94125
})
95126

96127
By("removing the annotation once all Pods are up, and triggering the queue rebalance", func() {
97128
sts.Status.ReadyReplicas = 3
98-
statusWriter := client.Status()
99-
err := statusWriter.Update(ctx, sts)
100-
Expect(err).NotTo(HaveOccurred())
129+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
101130
Eventually(func() map[string]string {
102131
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
103132
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
@@ -136,9 +165,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
136165
sts.Status.UpdatedReplicas = 1
137166
sts.Status.UpdateRevision = "some-new-revision"
138167

139-
statusWriter := client.Status()
140-
err := statusWriter.Update(ctx, sts)
141-
Expect(err).NotTo(HaveOccurred())
168+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
142169
})
143170

144171
It("does not trigger the controller to run rabbitmq-queues rebalance all", func() {
@@ -157,9 +184,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
157184
sts.Status.UpdatedReplicas = 3
158185
sts.Status.UpdateRevision = "some-new-revision"
159186
sts.Status.ReadyReplicas = 3
160-
statusWriter := client.Status()
161-
err := statusWriter.Update(ctx, sts)
162-
Expect(err).NotTo(HaveOccurred())
187+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
163188
Consistently(func() map[string]string {
164189
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
165190
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
@@ -168,9 +193,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
168193
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
169194
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
170195
})
171-
172196
})
173-
174197
})
175198
})
176199

@@ -201,9 +224,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
201224
sts.Status.UpdateRevision = "some-new-revision"
202225
sts.Status.ReadyReplicas = 0
203226

204-
statusWriter := client.Status()
205-
err := statusWriter.Update(ctx, sts)
206-
Expect(err).NotTo(HaveOccurred())
227+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
207228
})
208229

209230
It("does not trigger the controller to run rabbitmq-queues rebalance all", func() {
@@ -222,9 +243,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
222243
sts.Status.UpdatedReplicas = 1
223244
sts.Status.UpdateRevision = "some-new-revision"
224245
sts.Status.ReadyReplicas = 1
225-
statusWriter := client.Status()
226-
err := statusWriter.Update(ctx, sts)
227-
Expect(err).NotTo(HaveOccurred())
246+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
228247
Consistently(func() map[string]string {
229248
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
230249
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
@@ -233,9 +252,7 @@ var _ = Describe("Reconcile queue Rebalance", func() {
233252
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
234253
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
235254
})
236-
237255
})
238-
239256
})
240257
})
241258
})

0 commit comments

Comments
 (0)