Skip to content

Commit bab4c6e

Browse files
authoredNov 3, 2020
Enable all feature flags for new clusters (#434)
* Enable all feature flags for new clusters Closes #414 * Rename post deploy to CLI because the RabbitMQ CLI commands get executed at the end of the reconcile loop which does not necessarily mean that a deployment just finished. This commit also fixes a bug where the rabbitmq-plugins command was run on every successful reconile loop.
1 parent d0298dc commit bab4c6e

10 files changed

+342
-232
lines changed
 

‎controllers/rabbitmqcluster_controller.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@ import (
1414
"context"
1515
"encoding/json"
1616
"fmt"
17+
"reflect"
18+
"strings"
19+
"time"
20+
1721
"github.com/rabbitmq/cluster-operator/internal/resource"
1822
"github.com/rabbitmq/cluster-operator/internal/status"
1923
"k8s.io/apimachinery/pkg/api/errors"
2024
"k8s.io/client-go/kubernetes"
2125
"k8s.io/client-go/rest"
2226
"k8s.io/client-go/tools/record"
23-
"reflect"
24-
"strings"
2527

2628
"k8s.io/apimachinery/pkg/types"
2729

@@ -182,7 +184,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
182184
return ctrl.Result{}, err
183185
}
184186

185-
if err = r.annotateConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster); err != nil {
187+
if err = r.annotateIfNeeded(ctx, builder, operationResult, rabbitmqCluster); err != nil {
186188
return ctrl.Result{}, err
187189
}
188190
}
@@ -210,7 +212,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
210212

211213
// By this point the StatefulSet may have finished deploying. Run any
212214
// post-deploy steps if so, or requeue until the deployment is finished.
213-
requeueAfter, err = r.runPostDeployStepsIfNeeded(ctx, rabbitmqCluster)
215+
requeueAfter, err = r.runRabbitmqCLICommandsIfAnnotated(ctx, rabbitmqCluster)
214216
if err != nil {
215217
return ctrl.Result{}, err
216218
}
@@ -338,3 +340,19 @@ func validateAndGetOwner(owner *metav1.OwnerReference) []string {
338340
}
339341
return []string{owner.Name}
340342
}
343+
344+
func (r *RabbitmqClusterReconciler) markForQueueRebalance(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
345+
if rmq.ObjectMeta.Annotations == nil {
346+
rmq.ObjectMeta.Annotations = make(map[string]string)
347+
}
348+
349+
if len(rmq.ObjectMeta.Annotations[queueRebalanceAnnotation]) > 0 {
350+
return nil
351+
}
352+
353+
rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] = time.Now().Format(time.RFC3339)
354+
if err := r.Update(ctx, rmq); err != nil {
355+
return err
356+
}
357+
return nil
358+
}

‎controllers/rabbitmqcluster_controller_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -1188,12 +1188,11 @@ func createSecret(ctx context.Context, secretName string, namespace string, data
11881188
func waitForClusterCreation(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, client runtimeClient.Client) {
11891189
EventuallyWithOffset(1, func() string {
11901190
rabbitmqClusterCreated := rabbitmqv1beta1.RabbitmqCluster{}
1191-
err := client.Get(
1191+
if err := client.Get(
11921192
ctx,
11931193
types.NamespacedName{Name: rabbitmqCluster.Name, Namespace: rabbitmqCluster.Namespace},
11941194
&rabbitmqClusterCreated,
1195-
)
1196-
if err != nil {
1195+
); err != nil {
11971196
return fmt.Sprintf("%v+", err)
11981197
}
11991198

‎controllers/reconcile_cli.go

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
9+
"github.com/rabbitmq/cluster-operator/internal/resource"
10+
appsv1 "k8s.io/api/apps/v1"
11+
corev1 "k8s.io/api/core/v1"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
)
14+
15+
const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
16+
17+
func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
18+
sts, err := r.statefulSet(ctx, rmq)
19+
if err != nil {
20+
return 0, err
21+
}
22+
if !allReplicasReadyAndUpdated(sts) {
23+
r.Log.Info("not all replicas ready yet; requeuing request to run RabbitMQ CLI commands",
24+
"namespace", rmq.Namespace,
25+
"name", rmq.Name)
26+
return 15 * time.Second, nil
27+
}
28+
29+
// Retrieve the plugins config map, if it exists.
30+
pluginsConfig, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.PluginsConfigName))
31+
if client.IgnoreNotFound(err) != nil {
32+
return 0, err
33+
}
34+
updatedRecently, err := pluginsConfigUpdatedRecently(pluginsConfig)
35+
if err != nil {
36+
return 0, err
37+
}
38+
if updatedRecently {
39+
// plugins configMap was updated very recently
40+
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
41+
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
42+
r.Log.Info("requeuing request to set plugins",
43+
"namespace", rmq.Namespace,
44+
"name", rmq.Name)
45+
return 2 * time.Second, nil
46+
}
47+
48+
if pluginsConfig.ObjectMeta.Annotations != nil && pluginsConfig.ObjectMeta.Annotations[pluginsUpdateAnnotation] != "" {
49+
if err = r.runSetPluginsCommand(ctx, rmq, pluginsConfig); err != nil {
50+
return 0, err
51+
}
52+
}
53+
54+
// If RabbitMQ cluster is newly created, enable all feature flags since some are disabled by default
55+
if sts.ObjectMeta.Annotations != nil && sts.ObjectMeta.Annotations[stsCreateAnnotation] != "" {
56+
if err := r.runEnableFeatureFlagsCommand(ctx, rmq, sts); err != nil {
57+
return 0, err
58+
}
59+
}
60+
61+
// If the cluster has been marked as needing it, run rabbitmq-queues rebalance all
62+
if rmq.ObjectMeta.Annotations != nil && rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] != "" {
63+
if err := r.runQueueRebalanceCommand(ctx, rmq); err != nil {
64+
return 0, err
65+
}
66+
}
67+
68+
return 0, nil
69+
}
70+
71+
func (r *RabbitmqClusterReconciler) runEnableFeatureFlagsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
72+
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
73+
cmd := "set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
74+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
75+
if err != nil {
76+
r.Log.Error(err, "failed to enable all feature flags",
77+
"namespace", rmq.Namespace,
78+
"name", rmq.Name,
79+
"pod", podName,
80+
"command", cmd,
81+
"stdout", stdout,
82+
"stderr", stderr)
83+
return err
84+
}
85+
r.Log.Info("successfully enabled all feature flags",
86+
"namespace", rmq.Namespace,
87+
"name", rmq.Name)
88+
return r.deleteAnnotation(ctx, sts, stsCreateAnnotation)
89+
}
90+
91+
// There are 2 paths how plugins are set:
92+
// 1. When StatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
93+
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
94+
// This method implements the 2nd path.
95+
func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, configMap *corev1.ConfigMap) error {
96+
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
97+
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
98+
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
99+
cmd := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
100+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
101+
if err != nil {
102+
r.Log.Error(err, "failed to set plugins",
103+
"namespace", rmq.Namespace,
104+
"name", rmq.Name,
105+
"pod", podName,
106+
"command", cmd,
107+
"stdout", stdout,
108+
"stderr", stderr)
109+
return err
110+
}
111+
}
112+
r.Log.Info("successfully set plugins",
113+
"namespace", rmq.Namespace,
114+
"name", rmq.Name)
115+
return r.deleteAnnotation(ctx, configMap, pluginsUpdateAnnotation)
116+
}
117+
118+
func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
119+
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
120+
cmd := "rabbitmq-queues rebalance all"
121+
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
122+
if err != nil {
123+
r.Log.Error(err, "failed to run queue rebalance",
124+
"namespace", rmq.Namespace,
125+
"name", rmq.Name,
126+
"pod", podName,
127+
"command", cmd,
128+
"stdout", stdout,
129+
"stderr", stderr)
130+
return err
131+
}
132+
return r.deleteAnnotation(ctx, rmq, queueRebalanceAnnotation)
133+
}
134+
135+
func statefulSetNeedsQueueRebalance(sts *appsv1.StatefulSet, rmq *rabbitmqv1beta1.RabbitmqCluster) bool {
136+
return statefulSetBeingUpdated(sts) &&
137+
!rmq.Spec.SkipPostDeploySteps &&
138+
*rmq.Spec.Replicas > 1
139+
}
140+
141+
func allReplicasReadyAndUpdated(sts *appsv1.StatefulSet) bool {
142+
return sts.Status.ReadyReplicas == *sts.Spec.Replicas && !statefulSetBeingUpdated(sts)
143+
}
144+
145+
func statefulSetBeingUpdated(sts *appsv1.StatefulSet) bool {
146+
return sts.Status.CurrentRevision != sts.Status.UpdateRevision
147+
}

0 commit comments

Comments
 (0)
Please sign in to comment.