Skip to content

Commit 79a594c

Browse files
committed
WIP: Enable all feature flags for new clusters
see #414
1 parent cc96594 commit 79a594c

7 files changed

+365
-292
lines changed

controllers/rabbitmqcluster_controller.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ import (
1414
"context"
1515
"encoding/json"
1616
"fmt"
17+
"reflect"
18+
"strings"
19+
1720
"github.com/rabbitmq/cluster-operator/internal/resource"
1821
"github.com/rabbitmq/cluster-operator/internal/status"
1922
"k8s.io/apimachinery/pkg/api/errors"
2023
"k8s.io/client-go/kubernetes"
2124
"k8s.io/client-go/rest"
2225
"k8s.io/client-go/tools/record"
23-
"reflect"
24-
"strings"
2526

2627
"k8s.io/apimachinery/pkg/types"
2728

@@ -182,7 +183,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
182183
return ctrl.Result{}, err
183184
}
184185

185-
if err = r.annotateConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster); err != nil {
186+
if err = r.annotateIfNeeded(ctx, builder, operationResult, rabbitmqCluster); err != nil {
186187
return ctrl.Result{}, err
187188
}
188189
}

controllers/rabbitmqcluster_controller_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1191,12 +1191,11 @@ func createSecret(ctx context.Context, secretName string, namespace string, data
11911191
func waitForClusterCreation(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, client runtimeClient.Client) {
11921192
EventuallyWithOffset(1, func() string {
11931193
rabbitmqClusterCreated := rabbitmqv1beta1.RabbitmqCluster{}
1194-
err := client.Get(
1194+
if err := client.Get(
11951195
ctx,
11961196
types.NamespacedName{Name: rabbitmqCluster.Name, Namespace: rabbitmqCluster.Namespace},
11971197
&rabbitmqClusterCreated,
1198-
)
1199-
if err != nil {
1198+
); err != nil {
12001199
return fmt.Sprintf("%v+", err)
12011200
}
12021201

controllers/reconcile_queue_rebalance.go renamed to controllers/reconcile_post_deploy.go

+40-7
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package controllers
33
import (
44
"context"
55
"fmt"
6+
"time"
7+
68
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
79
"github.com/rabbitmq/cluster-operator/internal/resource"
810
appsv1 "k8s.io/api/apps/v1"
911
"sigs.k8s.io/controller-runtime/pkg/client"
10-
"time"
1112
)
1213

1314
const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
@@ -53,7 +54,7 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
5354
// plugins configMap was updated very recently
5455
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
5556
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
56-
r.Log.Info("requeuing request to set plugins on RabbitmqCluster",
57+
r.Log.Info("requeuing request to set plugins",
5758
"namespace", rmq.Namespace,
5859
"name", rmq.Name)
5960
return 2 * time.Second, nil
@@ -65,22 +66,54 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
6566
}
6667
}
6768

69+
// If RabbitMQ cluster is newly created, enable all feature flags since some are disabled by default
70+
if sts.ObjectMeta.Annotations != nil && sts.ObjectMeta.Annotations[stsCreateAnnotation] != "" {
71+
if err := r.enableAllFeatureFlags(ctx, rmq, sts); err != nil {
72+
return 0, err
73+
}
74+
}
75+
6876
// If the cluster has been marked as needing it, run rabbitmq-queues rebalance all
69-
if rmq.ObjectMeta.Annotations != nil && len(rmq.ObjectMeta.Annotations[queueRebalanceAnnotation]) > 0 {
70-
err = r.runQueueRebalanceCommand(ctx, rmq)
77+
if rmq.ObjectMeta.Annotations != nil && rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] != "" {
78+
if err := r.runQueueRebalanceCommand(ctx, rmq); err != nil {
79+
return 0, err
80+
}
81+
}
82+
83+
return 0, nil
84+
}
85+
86+
func (r *RabbitmqClusterReconciler) enableAllFeatureFlags(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
87+
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
88+
cmd := "set -eo pipefail; rabbitmqctl --silent list_feature_flags | (grep disabled || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
89+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
90+
if err != nil {
91+
r.Log.Error(err, "failed to enable all feature flags",
92+
"namespace", rmq.Namespace,
93+
"name", rmq.Name,
94+
"pod", podName,
95+
"command", cmd,
96+
"stdout", stdout,
97+
"stderr", stderr)
98+
return err
7199
}
72-
return 0, err
100+
r.Log.Info("successfully enabled all feature flags",
101+
"namespace", rmq.Namespace,
102+
"name", rmq.Name)
103+
delete(sts.ObjectMeta.Annotations, stsCreateAnnotation)
104+
return r.Update(ctx, sts)
73105
}
74106

75107
func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
76108
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
77-
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", "rabbitmq-queues rebalance all")
109+
cmd := "rabbitmq-queues rebalance all"
110+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
78111
if err != nil {
79112
r.Log.Error(err, "failed to run queue rebalance",
80113
"namespace", rmq.Namespace,
81114
"name", rmq.Name,
82115
"pod", podName,
83-
"command", "rabbitmq-queues rebalance all",
116+
"command", cmd,
84117
"stdout", stdout,
85118
"stderr", stderr)
86119
return err
+250
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
package controllers_test
2+
3+
import (
4+
"time"
5+
6+
"k8s.io/apimachinery/pkg/types"
7+
"k8s.io/utils/pointer"
8+
9+
appsv1 "k8s.io/api/apps/v1"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
12+
. "github.com/onsi/ginkgo"
13+
. "github.com/onsi/gomega"
14+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
15+
)
16+
17+
var _ = Describe("Reconcile post deploy", func() {
18+
var (
19+
cluster *rabbitmqv1beta1.RabbitmqCluster
20+
annotations map[string]string
21+
defaultNamespace = "default"
22+
)
23+
24+
BeforeEach(func() {
25+
annotations = map[string]string{}
26+
})
27+
28+
AfterEach(func() {
29+
Expect(client.Delete(ctx, cluster)).To(Succeed())
30+
waitForClusterDeletion(ctx, cluster, client)
31+
})
32+
33+
When("cluster is created", func() {
34+
var sts *appsv1.StatefulSet
35+
BeforeEach(func() {
36+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
37+
ObjectMeta: metav1.ObjectMeta{
38+
Name: "rabbitmq-feature-flags",
39+
Namespace: defaultNamespace,
40+
},
41+
}
42+
Expect(client.Create(ctx, cluster)).To(Succeed())
43+
waitForClusterCreation(ctx, cluster, client)
44+
})
45+
46+
It("enables all feature flags", func() {
47+
By("annotating that StatefulSet got created")
48+
sts = statefulSet(ctx, cluster)
49+
value := sts.ObjectMeta.Annotations["rabbitmq.com/createdAt"]
50+
_, err := time.Parse(time.RFC3339, value)
51+
Expect(err).NotTo(HaveOccurred(), "annotation rabbitmq.com/createdAt was not a valid RFC3339 timestamp")
52+
53+
By("enabling all feature flags once all Pods are up, and removing the annotation")
54+
sts.Status.Replicas = 1
55+
sts.Status.ReadyReplicas = 1
56+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
57+
Eventually(func() map[string]string {
58+
Expect(client.Get(ctx, types.NamespacedName{Namespace: cluster.Namespace, Name: cluster.ChildResourceName("server")}, sts)).To(Succeed())
59+
return sts.ObjectMeta.Annotations
60+
}, 5).ShouldNot(HaveKey("rabbitmq.com/createdAt"))
61+
Expect(fakeExecutor.ExecutedCommands()).To(ContainElement(command{"bash", "-c",
62+
"set -eo pipefail; rabbitmqctl --silent list_feature_flags | (grep disabled || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"}))
63+
})
64+
})
65+
66+
When("the cluster is configured to run post-deploy steps", func() {
67+
BeforeEach(func() {
68+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
69+
ObjectMeta: metav1.ObjectMeta{
70+
Name: "rabbitmq-three",
71+
Namespace: defaultNamespace,
72+
},
73+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
74+
Replicas: pointer.Int32Ptr(3),
75+
},
76+
}
77+
Expect(client.Create(ctx, cluster)).To(Succeed())
78+
waitForClusterCreation(ctx, cluster, client)
79+
})
80+
When("the cluster is updated", func() {
81+
var sts *appsv1.StatefulSet
82+
83+
BeforeEach(func() {
84+
sts = statefulSet(ctx, cluster)
85+
sts.Status.Replicas = 3
86+
sts.Status.CurrentReplicas = 2
87+
sts.Status.CurrentRevision = "some-old-revision"
88+
sts.Status.UpdatedReplicas = 1
89+
sts.Status.UpdateRevision = "some-new-revision"
90+
91+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
92+
})
93+
94+
It("triggers the controller to run rabbitmq-queues rebalance all", func() {
95+
By("setting an annotation on the CR")
96+
Eventually(func() map[string]string {
97+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
98+
Expect(client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)).To(Succeed())
99+
annotations = rmq.ObjectMeta.Annotations
100+
return annotations
101+
}, 5).Should(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
102+
_, err := time.Parse(time.RFC3339, annotations["rabbitmq.com/queueRebalanceNeededAt"])
103+
Expect(err).NotTo(HaveOccurred(), "annotation rabbitmq.com/queueRebalanceNeededAt was not a valid RFC3339 timestamp")
104+
105+
By("not removing the annotation when all replicas are updated but not yet ready")
106+
sts.Status.CurrentReplicas = 3
107+
sts.Status.CurrentRevision = "some-new-revision"
108+
sts.Status.UpdatedReplicas = 3
109+
sts.Status.UpdateRevision = "some-new-revision"
110+
sts.Status.ReadyReplicas = 2
111+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
112+
Eventually(func() map[string]string {
113+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
114+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
115+
Expect(err).To(BeNil())
116+
annotations = rmq.ObjectMeta.Annotations
117+
return annotations
118+
}, 5).Should(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
119+
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
120+
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/queueRebalanceNeededAt"])
121+
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/queueRebalanceNeededAt was not a valid RFC3339 timestamp")
122+
123+
By("removing the annotation once all Pods are up, and triggering the queue rebalance")
124+
sts.Status.ReadyReplicas = 3
125+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
126+
Eventually(func() map[string]string {
127+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
128+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
129+
Expect(err).To(BeNil())
130+
return rmq.ObjectMeta.Annotations
131+
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
132+
Expect(fakeExecutor.ExecutedCommands()).To(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
133+
})
134+
})
135+
})
136+
137+
When("the cluster is not configured to run post-deploy steps", func() {
138+
BeforeEach(func() {
139+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
140+
ObjectMeta: metav1.ObjectMeta{
141+
Name: "rabbitmq-three-no-post-deploy",
142+
Namespace: defaultNamespace,
143+
},
144+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
145+
Replicas: pointer.Int32Ptr(3),
146+
SkipPostDeploySteps: true,
147+
},
148+
}
149+
Expect(client.Create(ctx, cluster)).To(Succeed())
150+
waitForClusterCreation(ctx, cluster, client)
151+
})
152+
When("the cluster is updated", func() {
153+
var sts *appsv1.StatefulSet
154+
155+
BeforeEach(func() {
156+
sts = statefulSet(ctx, cluster)
157+
sts.Status.Replicas = 3
158+
sts.Status.CurrentReplicas = 2
159+
sts.Status.CurrentRevision = "some-old-revision"
160+
sts.Status.UpdatedReplicas = 1
161+
sts.Status.UpdateRevision = "some-new-revision"
162+
163+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
164+
})
165+
166+
It("does not trigger the controller to run rabbitmq-queues rebalance all", func() {
167+
By("never setting the annotation on the CR", func() {
168+
Consistently(func() map[string]string {
169+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
170+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
171+
Expect(err).To(BeNil())
172+
return rmq.ObjectMeta.Annotations
173+
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
174+
})
175+
176+
By("not running the rebalance command once all nodes are up")
177+
sts.Status.CurrentReplicas = 3
178+
sts.Status.CurrentRevision = "some-new-revision"
179+
sts.Status.UpdatedReplicas = 3
180+
sts.Status.UpdateRevision = "some-new-revision"
181+
sts.Status.ReadyReplicas = 3
182+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
183+
Consistently(func() map[string]string {
184+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
185+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
186+
Expect(err).To(BeNil())
187+
return rmq.ObjectMeta.Annotations
188+
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
189+
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
190+
})
191+
})
192+
})
193+
194+
When("the cluster is a single node cluster", func() {
195+
BeforeEach(func() {
196+
cluster = &rabbitmqv1beta1.RabbitmqCluster{
197+
ObjectMeta: metav1.ObjectMeta{
198+
Name: "rabbitmq-one-rebalance",
199+
Namespace: defaultNamespace,
200+
},
201+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
202+
Replicas: pointer.Int32Ptr(1),
203+
SkipPostDeploySteps: false,
204+
},
205+
}
206+
Expect(client.Create(ctx, cluster)).To(Succeed())
207+
waitForClusterCreation(ctx, cluster, client)
208+
})
209+
When("the cluster is updated", func() {
210+
var sts *appsv1.StatefulSet
211+
212+
BeforeEach(func() {
213+
sts = statefulSet(ctx, cluster)
214+
sts.Status.Replicas = 1
215+
sts.Status.CurrentReplicas = 1
216+
sts.Status.CurrentRevision = "some-old-revision"
217+
sts.Status.UpdatedReplicas = 0
218+
sts.Status.UpdateRevision = "some-new-revision"
219+
sts.Status.ReadyReplicas = 0
220+
221+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
222+
})
223+
224+
It("does not trigger the controller to run rabbitmq-queues rebalance all", func() {
225+
By("never setting the annotation on the CR")
226+
Consistently(func() map[string]string {
227+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
228+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
229+
Expect(err).To(BeNil())
230+
return rmq.ObjectMeta.Annotations
231+
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
232+
233+
By("not running the rebalance command once all nodes are up")
234+
sts.Status.CurrentReplicas = 1
235+
sts.Status.CurrentRevision = "some-new-revision"
236+
sts.Status.UpdatedReplicas = 1
237+
sts.Status.UpdateRevision = "some-new-revision"
238+
sts.Status.ReadyReplicas = 1
239+
Expect(client.Status().Update(ctx, sts)).To(Succeed())
240+
Consistently(func() map[string]string {
241+
rmq := &rabbitmqv1beta1.RabbitmqCluster{}
242+
err := client.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, rmq)
243+
Expect(err).To(BeNil())
244+
return rmq.ObjectMeta.Annotations
245+
}, 5).ShouldNot(HaveKey("rabbitmq.com/queueRebalanceNeededAt"))
246+
Expect(fakeExecutor.ExecutedCommands()).NotTo(ContainElement(command{"sh", "-c", "rabbitmq-queues rebalance all"}))
247+
})
248+
})
249+
})
250+
})

0 commit comments

Comments
 (0)