Skip to content

Configuration updates #394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ list: ## list Makefile targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

unit-tests: install-tools generate fmt vet manifests ## Run unit tests
ginkgo -r api/ internal/
ginkgo -r --randomizeAllSpecs api/ internal/

integration-tests: install-tools generate fmt vet manifests ## Run integration tests
ginkgo -r controllers/
Expand Down
126 changes: 84 additions & 42 deletions controllers/rabbitmqcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
pluginsUpdateAnnotation = "rabbitmq.com/pluginsUpdatedAt"
queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
serverConfAnnotation = "rabbitmq.com/serverConfUpdatedAt"
stsRestartAnnotation = "rabbitmq.com/lastRestartAt"
)

// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
Expand Down Expand Up @@ -103,28 +105,27 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, nil
}

// Resource has been marked for deletion
// Check if the resource has been marked for deletion
if !rabbitmqCluster.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Deleting RabbitmqCluster",
"namespace", rabbitmqCluster.Namespace,
"name", rabbitmqCluster.Name)
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, r.prepareForDeletion(ctx, rabbitmqCluster)
}

// Ensure the resource have a deletion marker
if err := r.addFinalizerIfNeeded(ctx, rabbitmqCluster); err != nil {
return ctrl.Result{}, err
}

// TLS: check if specified, and if secret exists
if rabbitmqCluster.TLSEnabled() {
if result, err := r.checkTLSSecrets(ctx, rabbitmqCluster); err != nil {
return result, err
}
}

if err := r.addFinalizerIfNeeded(ctx, rabbitmqCluster); err != nil {
return ctrl.Result{}, err
}

childResources, err := r.getChildResources(ctx, *rabbitmqCluster)

if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -195,12 +196,19 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
return ctrl.Result{}, err
}

r.annotatePluginsConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster)
if restarted := r.restartStatefulSetIfNeeded(ctx, builder, operationResult, rabbitmqCluster); restarted {
return ctrl.Result{RequeueAfter: time.Second * 10}, nil
if err = r.annotateConfigMapIfUpdated(ctx, builder, operationResult, rabbitmqCluster); err != nil {
return ctrl.Result{}, err
}
}

requeueAfter, err := r.restartStatefulSetIfNeeded(ctx, rabbitmqCluster)
if err != nil {
return ctrl.Result{}, err
}
if requeueAfter > 0 {
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}

// Set ReconcileSuccess to true here because all CRUD operations to Kube API related
// to child resources returned no error
rabbitmqCluster.Status.SetCondition(status.ReconcileSuccess, corev1.ConditionTrue, "Success", "Created or Updated all child resources")
Expand All @@ -216,7 +224,7 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er

// By this point the StatefulSet may have finished deploying. Run any
// post-deploy steps if so, or requeue until the deployment is finished.
requeueAfter, err := r.runPostDeployStepsIfNeeded(ctx, rabbitmqCluster)
requeueAfter, err = r.runPostDeployStepsIfNeeded(ctx, rabbitmqCluster)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -338,7 +346,7 @@ func (r *RabbitmqClusterReconciler) runPostDeployStepsIfNeeded(ctx context.Conte
}

// Retrieve the plugins config map, if it exists.
pluginsConfig, err := r.pluginsConfigMap(ctx, rmq)
pluginsConfig, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.PluginsConfigName))
if client.IgnoreNotFound(err) != nil {
return 0, err
}
Expand Down Expand Up @@ -419,16 +427,32 @@ func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rm
return nil
}

// Adds an arbitrary annotation (rabbitmq.com/lastRestartAt) to the StatefulSet PodTemplate to trigger a StatefulSet restart
// if builder requires StatefulSet to be updated.
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
ctx context.Context,
builder resource.ResourceBuilder,
operationResult controllerutil.OperationResult,
rmq *rabbitmqv1beta1.RabbitmqCluster) (restarted bool) {
// Adds an arbitrary annotation to the sts PodTemplate to trigger a sts restart
// it compares annotation "rabbitmq.com/serverConfUpdatedAt" from server-conf configMap and annotation "rabbitmq.com/lastRestartAt" from sts
// to determine whether to restart sts
func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (time.Duration, error) {
serverConf, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.ServerConfigMapName))
if err != nil {
// requeue request after 10s if unable to find server-conf configmap, else return the error
return 10 * time.Second, client.IgnoreNotFound(err)
}

if !(builder.UpdateRequiresStsRestart() && operationResult == controllerutil.OperationResultUpdated) {
return false
serverConfigUpdatedAt, ok := serverConf.Annotations[serverConfAnnotation]
if !ok {
// server-conf configmap hasn't been updated; no need to restart sts
return 0, nil
}

sts, err := r.statefulSet(ctx, rmq)
if err != nil {
// requeue request after 10s if unable to find sts, else return the error
return 10 * time.Second, client.IgnoreNotFound(err)
}

stsRestartedAt, ok := sts.Spec.Template.ObjectMeta.Annotations[stsRestartAnnotation]
if ok && stsRestartedAt > serverConfigUpdatedAt {
// sts was updated after the last server-conf configmap update; no need to restart sts
return 0, nil
}

if err := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
Expand All @@ -439,19 +463,21 @@ func (r *RabbitmqClusterReconciler) restartStatefulSetIfNeeded(
if sts.Spec.Template.ObjectMeta.Annotations == nil {
sts.Spec.Template.ObjectMeta.Annotations = make(map[string]string)
}
sts.Spec.Template.ObjectMeta.Annotations["rabbitmq.com/lastRestartAt"] = time.Now().Format(time.RFC3339)
sts.Spec.Template.ObjectMeta.Annotations[stsRestartAnnotation] = time.Now().Format(time.RFC3339)
return r.Update(ctx, sts)
}); err != nil {
msg := fmt.Sprintf("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return false
// failed to restart sts; return error to requeue request
return 0, err
}

msg := fmt.Sprintf("restarted StatefulSet %s of Namespace %s", rmq.ChildResourceName("server"), rmq.Namespace)
r.Log.Info(msg)
r.Recorder.Event(rmq, corev1.EventTypeNormal, "SuccessfulUpdate", msg)
return true

return 0, nil
}

func (r *RabbitmqClusterReconciler) statefulSet(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (*appsv1.StatefulSet, error) {
Expand All @@ -462,43 +488,59 @@ func (r *RabbitmqClusterReconciler) statefulSet(ctx context.Context, rmq *rabbit
return sts, nil
}

func (r *RabbitmqClusterReconciler) pluginsConfigMap(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (*corev1.ConfigMap, error) {
func (r *RabbitmqClusterReconciler) configMap(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, name string) (*corev1.ConfigMap, error) {
configMap := &corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, configMap); err != nil {
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: name}, configMap); err != nil {
return nil, err
}
return configMap, nil
}

// Annotates the plugins ConfigMap if it was updated such that 'rabbitmq-plugins set' will be called on the RabbitMQ nodes at a later point in time
func (r *RabbitmqClusterReconciler) annotatePluginsConfigMapIfUpdated(
ctx context.Context,
builder resource.ResourceBuilder,
operationResult controllerutil.OperationResult,
rmq *rabbitmqv1beta1.RabbitmqCluster) {
// Annotates the plugins ConfigMap or the server-conf ConfigMap
// annotations later used to indicate whether to call 'rabbitmq-plugins set' or to restart the sts
func (r *RabbitmqClusterReconciler) annotateConfigMapIfUpdated(ctx context.Context, builder resource.ResourceBuilder, operationResult controllerutil.OperationResult, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
if operationResult != controllerutil.OperationResultUpdated {
return nil
}

if _, ok := builder.(*resource.RabbitmqPluginsConfigMapBuilder); !ok {
return
var configMap, annotationKey string
switch builder.(type) {
case *resource.RabbitmqPluginsConfigMapBuilder:
configMap = rmq.ChildResourceName(resource.PluginsConfigName)
annotationKey = pluginsUpdateAnnotation
case *resource.ServerConfigMapBuilder:
configMap = rmq.ChildResourceName(resource.ServerConfigMapName)
annotationKey = serverConfAnnotation
default:
return nil
}
if operationResult != controllerutil.OperationResultUpdated {
return

if err := r.annotateConfigMap(ctx, rmq.Namespace, configMap, annotationKey, time.Now().Format(time.RFC3339)); err != nil {
msg := fmt.Sprintf("Failed to annotate ConfigMap %s of Namespace %s; %s may be outdated", configMap, rmq.Namespace, rmq.Name)
r.Log.Error(err, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return err
}

r.Log.Info("successfully annotated", "ConfigMap", configMap, "Namespace", rmq.Namespace)
return nil
}

func (r *RabbitmqClusterReconciler) annotateConfigMap(ctx context.Context, namespace, name, key, value string) error {
if retryOnConflictErr := clientretry.RetryOnConflict(clientretry.DefaultRetry, func() error {
configMap := corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Namespace: rmq.Namespace, Name: rmq.ChildResourceName(resource.PluginsConfig)}, &configMap); err != nil {
if err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, &configMap); err != nil {
return client.IgnoreNotFound(err)
}
if configMap.Annotations == nil {
configMap.Annotations = make(map[string]string)
}
configMap.Annotations[pluginsUpdateAnnotation] = time.Now().Format(time.RFC3339)
configMap.Annotations[key] = value
return r.Update(ctx, &configMap)
}); retryOnConflictErr != nil {
msg := fmt.Sprintf("Failed to annotate ConfigMap %s of Namespace %s; enabled_plugins may be outdated", rmq.ChildResourceName(resource.PluginsConfig), rmq.Namespace)
r.Log.Error(retryOnConflictErr, msg)
r.Recorder.Event(rmq, corev1.EventTypeWarning, "FailedUpdate", msg)
return retryOnConflictErr
}
return nil
}

func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName string, command ...string) (string, string, error) {
Expand Down Expand Up @@ -600,8 +642,8 @@ func (r *RabbitmqClusterReconciler) addRabbitmqDeletionLabel(ctx context.Context
return nil
}

// addFinalizerIfNeeded adds a deletion finalizer if the RabbitmqCluster does not have one yet and is not marked for deletion
func (r *RabbitmqClusterReconciler) addFinalizerIfNeeded(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster) error {
// The RabbitmqCluster is not marked for deletion (no deletion timestamp) but does not have the deletion finalizer
if rabbitmqCluster.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(rabbitmqCluster, deletionFinalizer) {
controllerutil.AddFinalizer(rabbitmqCluster, deletionFinalizer)
if err := r.Client.Update(ctx, rabbitmqCluster); err != nil {
Expand Down
79 changes: 75 additions & 4 deletions controllers/rabbitmqcluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
Expand Down Expand Up @@ -1414,7 +1415,7 @@ var _ = Describe("RabbitmqClusterController", func() {
})
})

Context("Cluster restarts", func() {
Context("Cluster post deploy steps", func() {
var annotations map[string]string

BeforeEach(func() {
Expand Down Expand Up @@ -1512,7 +1513,7 @@ var _ = Describe("RabbitmqClusterController", func() {
BeforeEach(func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-three",
Name: "rabbitmq-three-no-post-deploy",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Expand Down Expand Up @@ -1572,11 +1573,11 @@ var _ = Describe("RabbitmqClusterController", func() {
})
})

When("the cluster is only 1 node large", func() {
When("the cluster is a single node cluster", func() {
BeforeEach(func() {
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-one",
Name: "rabbitmq-one-rebalance",
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Expand Down Expand Up @@ -1638,6 +1639,76 @@ var _ = Describe("RabbitmqClusterController", func() {
})

})

DescribeTable("Server configurations updates",
func(testCase string) {
// create rabbitmqcluster
cluster = &rabbitmqv1beta1.RabbitmqCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "rabbitmq-" + testCase,
Namespace: defaultNamespace,
},
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
Replicas: &one,
},
}
Expect(client.Create(ctx, cluster)).To(Succeed())
waitForClusterCreation(ctx, cluster, client)

// ensure that configMap and statefulSet does not have annotations set when configurations haven't changed
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server-conf"), metav1.GetOptions{})
Expect(err).To(Not(HaveOccurred()))
Expect(configMap.Annotations).ShouldNot(HaveKey("rabbitmq.com/serverConfUpdatedAt"))

sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).To(Not(HaveOccurred()))
Expect(sts.Annotations).ShouldNot(HaveKey("rabbitmq.com/lastRestartAt"))

// update rabbitmq server configurations
Expect(updateWithRetry(cluster, func(r *rabbitmqv1beta1.RabbitmqCluster) {
if testCase == "additional-config" {
r.Spec.Rabbitmq.AdditionalConfig = "test_config=0"
}
if testCase == "advanced-config" {
r.Spec.Rabbitmq.AdvancedConfig = "sample-advanced-config."
}
if testCase == "env-config" {
r.Spec.Rabbitmq.EnvConfig = "some-env-variable"
}
})).To(Succeed())

By("annotating the server-conf ConfigMap")
// ensure annotations from the server-conf ConfigMap
var annotations map[string]string
Eventually(func() map[string]string {
configMap, err := clientSet.CoreV1().ConfigMaps(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server-conf"), metav1.GetOptions{})
Expect(err).To(Not(HaveOccurred()))
annotations = configMap.Annotations
return annotations
}, 5).Should(HaveKey("rabbitmq.com/serverConfUpdatedAt"))
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/serverConfUpdatedAt"])
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/serverConfUpdatedAt was not a valid RFC3339 timestamp")

By("annotating the sts podTemplate")
// ensure statefulSet annotations
Eventually(func() map[string]string {
sts, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
Expect(err).To(Not(HaveOccurred()))
annotations = sts.Spec.Template.Annotations
return annotations
}, 5).Should(HaveKey("rabbitmq.com/lastRestartAt"))
_, err = time.Parse(time.RFC3339, annotations["rabbitmq.com/lastRestartAt"])
Expect(err).NotTo(HaveOccurred(), "Annotation rabbitmq.com/lastRestartAt was not a valid RFC3339 timestamp")

// delete rmq cluster
Expect(client.Delete(ctx, cluster)).To(Succeed())
waitForClusterDeletion(ctx, cluster, client)
},

Entry("spec.rabbitmq.additionalConfig is updated", "additional-config"),
Entry("spec.rabbitmq.advancedConfig is updated", "advanced-config"),
Entry("spec.rabbitmq.envConfig is updated", "env-config"),
)
})

func extractContainer(containers []corev1.Container, containerName string) corev1.Container {
Expand Down
4 changes: 0 additions & 4 deletions internal/resource/client_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type ClientServiceBuilder struct {
Scheme *runtime.Scheme
}

func (builder *ClientServiceBuilder) UpdateRequiresStsRestart() bool {
return false
}

func (builder *ClientServiceBuilder) Build() (runtime.Object, error) {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Expand Down
Loading