Skip to content

Enable all feature flags for new clusters #434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

"github.com/rabbitmq/cluster-operator/internal/resource"
"github.com/rabbitmq/cluster-operator/internal/status"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"reflect"
"strings"

"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -182,7 +184,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, err
}

if err = r.annotateConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster); err != nil {
if err = r.annotateIfNeeded(ctx, builder, operationResult, rabbitmqCluster); err != nil {
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -210,7 +212,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er

// By this point the StatefulSet may have finished deploying. Run any
// post-deploy steps if so, or requeue until the deployment is finished.
requeueAfter, err = r.runPostDeployStepsIfNeeded(ctx, rabbitmqCluster)
requeueAfter, err = r.runRabbitmqCLICommandsIfAnnotated(ctx, rabbitmqCluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -338,3 +340,19 @@ func validateAndGetOwner(owner *metav1.OwnerReference) []string {
}
return []string{owner.Name}
}

func (r *RabbitmqClusterReconciler) markForQueueRebalance(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
if rmq.ObjectMeta.Annotations == nil {
rmq.ObjectMeta.Annotations = make(map[string]string)
}

if len(rmq.ObjectMeta.Annotations[queueRebalanceAnnotation]) > 0 {
return nil
}

rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] = time.Now().Format(time.RFC3339)
if err := r.Update(ctx, rmq); err != nil {
return err
}
return nil
}
5 changes: 2 additions & 3 deletions controllers/rabbitmqcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,12 +1188,11 @@ func createSecret(ctx context.Context, secretName string, namespace string, data
func waitForClusterCreation(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, client runtimeClient.Client) {
EventuallyWithOffset(1, func() string {
rabbitmqClusterCreated := rabbitmqv1beta1.RabbitmqCluster{}
err := client.Get(
if err := client.Get(
ctx,
types.NamespacedName{Name: rabbitmqCluster.Name, Namespace: rabbitmqCluster.Namespace},
&rabbitmqClusterCreated,
)
if err != nil {
); err != nil {
return fmt.Sprintf("%v+", err)
}

Expand Down
147 changes: 147 additions & 0 deletions controllers/reconcile_cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package controllers

import (
"context"
"fmt"
"time"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"

func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
sts, err := r.statefulSet(ctx, rmq)
if err != nil {
return 0, err
}
if !allReplicasReadyAndUpdated(sts) {
r.Log.Info("not all replicas ready yet; requeuing request to run RabbitMQ CLI commands",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 15 * time.Second, nil
}

// Retrieve the plugins config map, if it exists.
pluginsConfig, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.PluginsConfigName))
if client.IgnoreNotFound(err) != nil {
return 0, err
}
updatedRecently, err := pluginsConfigUpdatedRecently(pluginsConfig)
if err != nil {
return 0, err
}
if updatedRecently {
// plugins configMap was updated very recently
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
r.Log.Info("requeuing request to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 2 * time.Second, nil
}

if pluginsConfig.ObjectMeta.Annotations != nil && pluginsConfig.ObjectMeta.Annotations[pluginsUpdateAnnotation] != "" {
if err = r.runSetPluginsCommand(ctx, rmq, pluginsConfig); err != nil {
return 0, err
}
}

// If RabbitMQ cluster is newly created, enable all feature flags since some are disabled by default
if sts.ObjectMeta.Annotations != nil && sts.ObjectMeta.Annotations[stsCreateAnnotation] != "" {
if err := r.runEnableFeatureFlagsCommand(ctx, rmq, sts); err != nil {
return 0, err
}
}

// If the cluster has been marked as needing it, run rabbitmq-queues rebalance all
if rmq.ObjectMeta.Annotations != nil && rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] != "" {
if err := r.runQueueRebalanceCommand(ctx, rmq); err != nil {
return 0, err
}
}

return 0, nil
}

func (r *RabbitmqClusterReconciler) runEnableFeatureFlagsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
cmd := "set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to enable all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
r.Log.Info("successfully enabled all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name)
return r.deleteAnnotation(ctx, sts, stsCreateAnnotation)
}

// There are 2 paths how plugins are set:
// 1. When StatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
// This method implements the 2nd path.
func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, configMap *corev1.ConfigMap) error {
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
cmd := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
}
r.Log.Info("successfully set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
return r.deleteAnnotation(ctx, configMap, pluginsUpdateAnnotation)
}

func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
cmd := "rabbitmq-queues rebalance all"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to run queue rebalance",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
return r.deleteAnnotation(ctx, rmq, queueRebalanceAnnotation)
}

func statefulSetNeedsQueueRebalance(sts *appsv1.StatefulSet, rmq *rabbitmqv1beta1.RabbitmqCluster) bool {
return statefulSetBeingUpdated(sts) &&
!rmq.Spec.SkipPostDeploySteps &&
*rmq.Spec.Replicas > 1
}

func allReplicasReadyAndUpdated(sts *appsv1.StatefulSet) bool {
return sts.Status.ReadyReplicas == *sts.Spec.Replicas && !statefulSetBeingUpdated(sts)
}

func statefulSetBeingUpdated(sts *appsv1.StatefulSet) bool {
return sts.Status.CurrentRevision != sts.Status.UpdateRevision
}
Loading