-
Notifications
You must be signed in to change notification settings - Fork 284
/
Copy pathreconcile_cli.go
147 lines (132 loc) · 5.47 KB
/
reconcile_cli.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package controllers
import (
"context"
"fmt"
"time"
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
"github.com/rabbitmq/cluster-operator/internal/resource"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
const queueRebalanceAnnotation = "rabbitmq.com/queueRebalanceNeededAt"
func (r *RabbitmqClusterReconciler) runRabbitmqCLICommandsIfAnnotated(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) (requeueAfter time.Duration, err error) {
sts, err := r.statefulSet(ctx, rmq)
if err != nil {
return 0, err
}
if !allReplicasReadyAndUpdated(sts) {
r.Log.Info("not all replicas ready yet; requeuing request to run RabbitMQ CLI commands",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 15 * time.Second, nil
}
// Retrieve the plugins config map, if it exists.
pluginsConfig, err := r.configMap(ctx, rmq, rmq.ChildResourceName(resource.PluginsConfigName))
if client.IgnoreNotFound(err) != nil {
return 0, err
}
updatedRecently, err := pluginsConfigUpdatedRecently(pluginsConfig)
if err != nil {
return 0, err
}
if updatedRecently {
// plugins configMap was updated very recently
// give StatefulSet controller some time to trigger restart of StatefulSet if necessary
// otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
r.Log.Info("requeuing request to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
return 2 * time.Second, nil
}
if pluginsConfig.ObjectMeta.Annotations != nil && pluginsConfig.ObjectMeta.Annotations[pluginsUpdateAnnotation] != "" {
if err = r.runSetPluginsCommand(ctx, rmq, pluginsConfig); err != nil {
return 0, err
}
}
// If RabbitMQ cluster is newly created, enable all feature flags since some are disabled by default
if sts.ObjectMeta.Annotations != nil && sts.ObjectMeta.Annotations[stsCreateAnnotation] != "" {
if err := r.runEnableFeatureFlagsCommand(ctx, rmq, sts); err != nil {
return 0, err
}
}
// If the cluster has been marked as needing it, run rabbitmq-queues rebalance all
if rmq.ObjectMeta.Annotations != nil && rmq.ObjectMeta.Annotations[queueRebalanceAnnotation] != "" {
if err := r.runQueueRebalanceCommand(ctx, rmq); err != nil {
return 0, err
}
}
return 0, nil
}
func (r *RabbitmqClusterReconciler) runEnableFeatureFlagsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, sts *appsv1.StatefulSet) error {
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
cmd := "set -eo pipefail; rabbitmqctl -s list_feature_flags name state stability | (grep 'disabled\\sstable$' || true) | cut -f 1 | xargs -r -n1 rabbitmqctl enable_feature_flag"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "bash", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to enable all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
r.Log.Info("successfully enabled all feature flags",
"namespace", rmq.Namespace,
"name", rmq.Name)
return r.deleteAnnotation(ctx, sts, stsCreateAnnotation)
}
// There are 2 paths how plugins are set:
// 1. When StatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
// 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
// This method implements the 2nd path.
func (r *RabbitmqClusterReconciler) runSetPluginsCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster, configMap *corev1.ConfigMap) error {
plugins := resource.NewRabbitmqPlugins(rmq.Spec.Rabbitmq.AdditionalPlugins)
for i := int32(0); i < *rmq.Spec.Replicas; i++ {
podName := fmt.Sprintf("%s-%d", rmq.ChildResourceName("server"), i)
cmd := fmt.Sprintf("rabbitmq-plugins set %s", plugins.AsString(" "))
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
}
r.Log.Info("successfully set plugins",
"namespace", rmq.Namespace,
"name", rmq.Name)
return r.deleteAnnotation(ctx, configMap, pluginsUpdateAnnotation)
}
func (r *RabbitmqClusterReconciler) runQueueRebalanceCommand(ctx context.Context, rmq *rabbitmqv1beta1.RabbitmqCluster) error {
podName := fmt.Sprintf("%s-0", rmq.ChildResourceName("server"))
cmd := "rabbitmq-queues rebalance all"
stdout, stderr, err := r.exec(rmq.Namespace, podName, "rabbitmq", "sh", "-c", cmd)
if err != nil {
r.Log.Error(err, "failed to run queue rebalance",
"namespace", rmq.Namespace,
"name", rmq.Name,
"pod", podName,
"command", cmd,
"stdout", stdout,
"stderr", stderr)
return err
}
return r.deleteAnnotation(ctx, rmq, queueRebalanceAnnotation)
}
func statefulSetNeedsQueueRebalance(sts *appsv1.StatefulSet, rmq *rabbitmqv1beta1.RabbitmqCluster) bool {
return statefulSetBeingUpdated(sts) &&
!rmq.Spec.SkipPostDeploySteps &&
*rmq.Spec.Replicas > 1
}
func allReplicasReadyAndUpdated(sts *appsv1.StatefulSet) bool {
return sts.Status.ReadyReplicas == *sts.Spec.Replicas && !statefulSetBeingUpdated(sts)
}
func statefulSetBeingUpdated(sts *appsv1.StatefulSet) bool {
return sts.Status.CurrentRevision != sts.Status.UpdateRevision
}