Skip to content

Commit 41bbbbf

Browse files
ferozjillaableaseChunyiLyu
authoredNov 13, 2020
Share TLS config with management, amqp 1.0, (web) mqtt and (web) stomp plugins (#451)
* Share TLS config with the management, amqp 1.0, (web) mqtt and (web) stomp plugins (#451) This commit: - Supports TLS for management - Supports TLS for rabbtmq_(web)_mqtt and rabbitmq_(web)_stomp - Sets hardcoded path for CA certificate to 'ca.crt' - Adds system tests to verify remote access via HTTPS to the MGMT dashboard, mqtt, and stomp plugins Co-authored-by: Alex Blease <[email protected]> Co-authored-by: Chunyi Lyu <[email protected]>
1 parent f6f9bc8 commit 41bbbbf

15 files changed

+752
-128
lines changed
 

‎api/v1beta1/rabbitmqcluster_types.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -238,12 +238,9 @@ type TLSSpec struct {
238238
// The Secret must store these as tls.key and tls.crt, respectively.
239239
SecretName string `json:"secretName,omitempty"`
240240
// Name of a Secret in the same Namespace as the RabbitmqCluster, containing the Certificate Authority's public certificate for TLS.
241-
// This can be the same as SecretName.
242-
// Used for mTLS.
241+
// The Secret must store this as ca.crt.
242+
// Used for mTLS, and TLS for rabbitmq_web_stomp and rabbitmq_web_mqtt.
243243
CaSecretName string `json:"caSecretName,omitempty"`
244-
// The Secret defined in CaSecretName must store the Certificate Authority's public certificate under the key specified in CaCertName.
245-
// Used for mTLS.
246-
CaCertName string `json:"caCertName,omitempty"`
247244
}
248245

249246
// kubebuilder validating tags 'Pattern' and 'MaxLength' must be specified on string type.

‎config/crd/bases/rabbitmq.com_rabbitmqclusters.yaml

+2-6
Original file line numberDiff line numberDiff line change
@@ -3734,15 +3734,11 @@ spec:
37343734
type: integer
37353735
tls:
37363736
properties:
3737-
caCertName:
3738-
description: The Secret defined in CaSecretName must store the
3739-
Certificate Authority's public certificate under the key specified
3740-
in CaCertName. Used for mTLS.
3741-
type: string
37423737
caSecretName:
37433738
description: Name of a Secret in the same Namespace as the RabbitmqCluster,
37443739
containing the Certificate Authority's public certificate for
3745-
TLS. This can be the same as SecretName. Used for mTLS.
3740+
TLS. The Secret must store this as ca.crt. Used for mTLS, and
3741+
TLS for rabbitmq_web_stomp and rabbitmq_web_mqtt.
37463742
type: string
37473743
secretName:
37483744
description: Name of a Secret in the same Namespace as the RabbitmqCluster,

‎controllers/rabbitmqcluster_controller_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ package controllers_test
1313
import (
1414
"context"
1515
"fmt"
16-
"k8s.io/utils/pointer"
1716
"time"
1817

18+
"k8s.io/utils/pointer"
19+
1920
"k8s.io/apimachinery/pkg/util/intstr"
2021

2122
. "github.com/onsi/ginkgo"
@@ -1066,7 +1067,7 @@ var _ = Describe("RabbitmqClusterController", func() {
10661067
TargetPort: amqpTargetPort,
10671068
},
10681069
corev1.ServicePort{
1069-
Name: "management",
1070+
Name: "http",
10701071
Port: 15672,
10711072
Protocol: corev1.ProtocolTCP,
10721073
TargetPort: managementTargetPort,
@@ -1190,7 +1191,7 @@ func waitForClusterDeletion(ctx context.Context, rabbitmqCluster *rabbitmqv1beta
11901191

11911192
}
11921193

1193-
func verifyError(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, expectedError string) {
1194+
func verifyTLSErrorEvents(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster, expectedError string) {
11941195
tlsEventTimeout := 5 * time.Second
11951196
tlsRetry := 1 * time.Second
11961197
Eventually(func() string { return aggregateEventMsgs(ctx, rabbitmqCluster, "TLSError") }, tlsEventTimeout, tlsRetry).Should(ContainSubstring(expectedError))

‎controllers/reconcile_tls.go

+16-14
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package controllers
33
import (
44
"context"
55
"fmt"
6+
67
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
78
corev1 "k8s.io/api/core/v1"
89
"k8s.io/apimachinery/pkg/api/errors"
@@ -11,48 +12,49 @@ import (
1112
)
1213

1314
func (r *RabbitmqClusterReconciler) checkTLSSecrets(ctx context.Context, rabbitmqCluster *rabbitmqv1beta1.RabbitmqCluster) (ctrl.Result, error) {
14-
logger := r.Log
1515
secretName := rabbitmqCluster.Spec.TLS.SecretName
16-
logger.Info("TLS set, looking for secret", "secret", secretName, "namespace", rabbitmqCluster.Namespace)
16+
r.Log.Info("TLS enabled, looking for secret", "secret", secretName, "namespace", rabbitmqCluster.Namespace)
1717

1818
// check if secret exists
1919
secret := &corev1.Secret{}
2020
if err := r.Get(ctx, types.NamespacedName{Namespace: rabbitmqCluster.Namespace, Name: secretName}, secret); err != nil {
2121
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError",
22-
fmt.Sprintf("Failed to get TLS secret %v in namespace %v: %v", secretName, rabbitmqCluster.Namespace, err.Error()))
22+
fmt.Sprintf("Failed to get TLS secret %s in namespace %s: %v", secretName, rabbitmqCluster.Namespace, err.Error()))
23+
r.Log.Error(err, "Error setting up TLS", "namespace", rabbitmqCluster.Namespace, "name", rabbitmqCluster.Name)
2324
return ctrl.Result{}, err
2425
}
2526
// check if secret has the right keys
2627
_, hasTLSKey := secret.Data["tls.key"]
2728
_, hasTLSCert := secret.Data["tls.crt"]
2829
if !hasTLSCert || !hasTLSKey {
29-
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError",
30-
fmt.Sprintf("The TLS secret %v in namespace %v must have the fields tls.crt and tls.key", secretName, rabbitmqCluster.Namespace))
31-
32-
return ctrl.Result{}, errors.NewBadRequest("The TLS secret must have the fields tls.crt and tls.key")
30+
err := errors.NewBadRequest(fmt.Sprintf("TLS secret %s in namespace %s does not have the fields tls.crt and tls.key", secretName, rabbitmqCluster.Namespace))
31+
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError", err.Error())
32+
r.Log.Error(err, "Error setting up TLS", "namespace", rabbitmqCluster.Namespace, "name", rabbitmqCluster.Name)
33+
return ctrl.Result{}, err
3334
}
3435

3536
// Mutual TLS: check if CA certificate is stored in a separate secret
3637
if rabbitmqCluster.MutualTLSEnabled() {
3738
if !rabbitmqCluster.SingleTLSSecret() {
3839
secretName := rabbitmqCluster.Spec.TLS.CaSecretName
39-
logger.Info("mutual TLS set, looking for CA certificate secret", "secret", secretName, "namespace", rabbitmqCluster.Namespace)
40+
r.Log.Info("mutual TLS enabled, looking for CA certificate secret", "secret", secretName, "namespace", rabbitmqCluster.Namespace)
4041

4142
// check if secret exists
4243
secret = &corev1.Secret{}
4344
if err := r.Get(ctx, types.NamespacedName{Namespace: rabbitmqCluster.Namespace, Name: secretName}, secret); err != nil {
4445
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError",
4546
fmt.Sprintf("Failed to get CA certificate secret %v in namespace %v: %v", secretName, rabbitmqCluster.Namespace, err.Error()))
47+
r.Log.Error(err, "Error setting up TLS", "namespace", rabbitmqCluster.Namespace, "name", rabbitmqCluster.Name)
4648
return ctrl.Result{}, err
4749
}
4850
}
49-
// Mutual TLS: verify that CA certificate is present in secret
50-
_, hasCaCert := secret.Data[rabbitmqCluster.Spec.TLS.CaCertName]
51-
if !hasCaCert {
52-
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError",
53-
fmt.Sprintf("The TLS secret %v in namespace %v must have the field %v", rabbitmqCluster.Spec.TLS.CaSecretName, rabbitmqCluster.Namespace, rabbitmqCluster.Spec.TLS.CaCertName))
5451

55-
return ctrl.Result{}, errors.NewBadRequest(fmt.Sprintf("The TLS secret must have the field %s", rabbitmqCluster.Spec.TLS.CaCertName))
52+
// Mutual TLS: verify that CA certificate is present in secret
53+
if _, hasCaCert := secret.Data["ca.crt"]; !hasCaCert {
54+
err := errors.NewBadRequest(fmt.Sprintf("TLS secret %s in namespace %s does not have the field ca.crt", rabbitmqCluster.Spec.TLS.CaSecretName, rabbitmqCluster.Namespace))
55+
r.Recorder.Event(rabbitmqCluster, corev1.EventTypeWarning, "TLSError", err.Error())
56+
r.Log.Error(err, "Error setting up TLS", "namespace", rabbitmqCluster.Namespace, "name", rabbitmqCluster.Name)
57+
return ctrl.Result{}, err
5658
}
5759
}
5860
return ctrl.Result{}, nil

‎controllers/reconcile_tls_test.go

+25-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package controllers_test
33
import (
44
"context"
55
"fmt"
6+
67
rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
78

89
. "github.com/onsi/ginkgo"
@@ -32,11 +33,10 @@ var _ = Describe("Reconcile TLS", func() {
3233

3334
Context("Mutual TLS with single secret", func() {
3435
It("Deploys successfully", func() {
35-
tlsSecretWithCACert(ctx, "tls-secret", defaultNamespace, "caCERT")
36+
tlsSecretWithCACert(ctx, "tls-secret", defaultNamespace)
3637
tlsSpec := rabbitmqv1beta1.TLSSpec{
3738
SecretName: "tls-secret",
3839
CaSecretName: "tls-secret",
39-
CaCertName: "caCERT",
4040
}
4141
cluster = rabbitmqClusterWithTLS(ctx, "mutual-tls-success", defaultNamespace, tlsSpec)
4242
waitForClusterCreation(ctx, cluster, client)
@@ -45,23 +45,33 @@ var _ = Describe("Reconcile TLS", func() {
4545
Expect(err).NotTo(HaveOccurred())
4646
volumeMount := corev1.VolumeMount{
4747
Name: "rabbitmq-tls",
48-
MountPath: "/etc/rabbitmq-tls/caCERT",
49-
SubPath: "caCERT",
48+
MountPath: "/etc/rabbitmq-tls/ca.crt",
49+
SubPath: "ca.crt",
5050
ReadOnly: true,
5151
}
5252
Expect(sts.Spec.Template.Spec.Containers[0].VolumeMounts).To(ContainElement(volumeMount))
5353
})
5454

5555
It("Does not deploy if the cert name does not match the contents of the secret", func() {
56-
tlsSecretWithCACert(ctx, "tls-secret-missing", defaultNamespace, "ca.c")
56+
tlsData := map[string]string{
57+
"tls.crt": "this is a tls cert",
58+
"tls.key": "this is a tls key",
59+
"wrong-key": "certificate",
60+
}
61+
62+
_, err := createSecret(ctx, "tls-secret-missing", defaultNamespace, tlsData)
63+
64+
if !apierrors.IsAlreadyExists(err) {
65+
Expect(err).NotTo(HaveOccurred())
66+
}
67+
5768
tlsSpec := rabbitmqv1beta1.TLSSpec{
5869
SecretName: "tls-secret-missing",
5970
CaSecretName: "tls-secret-missing",
60-
CaCertName: "ca.crt",
6171
}
6272
cluster = rabbitmqClusterWithTLS(ctx, "tls-secret-missing", defaultNamespace, tlsSpec)
6373

64-
verifyError(ctx, cluster, fmt.Sprintf("The TLS secret tls-secret-missing in namespace %s must have the field ca.crt", defaultNamespace))
74+
verifyTLSErrorEvents(ctx, cluster, fmt.Sprintf("TLS secret tls-secret-missing in namespace %s does not have the field ca.crt", defaultNamespace))
6575
})
6676
})
6777

@@ -72,10 +82,9 @@ var _ = Describe("Reconcile TLS", func() {
7282
tlsSpec := rabbitmqv1beta1.TLSSpec{
7383
SecretName: "rabbitmq-tls-secret-does-not-exist",
7484
CaSecretName: "ca-cert-secret",
75-
CaCertName: "ca.crt",
7685
}
7786
cluster = rabbitmqClusterWithTLS(ctx, "rabbitmq-tls-secret-does-not-exist", defaultNamespace, tlsSpec)
78-
verifyError(ctx, cluster, "Failed to get CA certificate secret")
87+
verifyTLSErrorEvents(ctx, cluster, "Failed to get CA certificate secret")
7988

8089
_, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
8190
Expect(err).To(HaveOccurred())
@@ -105,10 +114,9 @@ var _ = Describe("Reconcile TLS", func() {
105114
tlsSpec := rabbitmqv1beta1.TLSSpec{
106115
SecretName: "tls-secret",
107116
CaSecretName: "ca-cert-secret-invalid",
108-
CaCertName: "ca.crt",
109117
}
110118
cluster = rabbitmqClusterWithTLS(ctx, "rabbitmq-mutual-tls-missing", defaultNamespace, tlsSpec)
111-
verifyError(ctx, cluster, fmt.Sprintf("The TLS secret ca-cert-secret-invalid in namespace %s must have the field ca.crt", defaultNamespace))
119+
verifyTLSErrorEvents(ctx, cluster, fmt.Sprintf("TLS secret ca-cert-secret-invalid in namespace %s does not have the field ca.crt", defaultNamespace))
112120
})
113121
})
114122
})
@@ -163,7 +171,7 @@ var _ = Describe("Reconcile TLS", func() {
163171
})
164172

165173
It("fails to deploy the RabbitmqCluster", func() {
166-
verifyError(ctx, cluster, fmt.Sprintf("The TLS secret rabbitmq-tls-malformed in namespace %s must have the fields tls.crt and tls.key", defaultNamespace))
174+
verifyTLSErrorEvents(ctx, cluster, fmt.Sprintf("TLS secret rabbitmq-tls-malformed in namespace %s does not have the fields tls.crt and tls.key", defaultNamespace))
167175
})
168176
})
169177

@@ -175,7 +183,7 @@ var _ = Describe("Reconcile TLS", func() {
175183
}
176184
cluster = rabbitmqClusterWithTLS(ctx, "rabbitmq-tls-secret-does-not-exist", defaultNamespace, tlsSpec)
177185

178-
verifyError(ctx, cluster, "Failed to get TLS secret")
186+
verifyTLSErrorEvents(ctx, cluster, "Failed to get TLS secret")
179187

180188
_, err := clientSet.AppsV1().StatefulSets(cluster.Namespace).Get(ctx, cluster.ChildResourceName("server"), metav1.GetOptions{})
181189
Expect(err).To(HaveOccurred())
@@ -192,15 +200,14 @@ var _ = Describe("Reconcile TLS", func() {
192200
statefulSet(ctx, cluster)
193201
})
194202
})
195-
196203
})
197204
})
198205

199-
func tlsSecretWithCACert(ctx context.Context, secretName, namespace, caCertName string) {
206+
func tlsSecretWithCACert(ctx context.Context, secretName, namespace string) {
200207
tlsData := map[string]string{
201-
"tls.crt": "this is a tls cert",
202-
"tls.key": "this is a tls key",
203-
caCertName: "certificate",
208+
"tls.crt": "this is a tls cert",
209+
"tls.key": "this is a tls key",
210+
"ca.crt": "certificate",
204211
}
205212

206213
_, err := createSecret(ctx, secretName, namespace, tlsData)

‎go.mod

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module github.com/rabbitmq/cluster-operator
33
go 1.15
44

55
require (
6-
cloud.google.com/go v0.47.0 // indirect
76
github.com/Azure/go-autorest/autorest v0.9.2 // indirect
87
github.com/Azure/go-autorest/autorest/adal v0.8.0 // indirect
98
github.com/cespare/xxhash/v2 v2.1.1 // indirect
@@ -13,9 +12,9 @@ require (
1312
github.com/go-logr/logr v0.1.0
1413
github.com/go-logr/zapr v0.1.1 // indirect
1514
github.com/go-stomp/stomp v2.0.8+incompatible
16-
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
1715
github.com/gophercloud/gophercloud v0.5.0 // indirect
1816
github.com/mattn/go-colorable v0.1.4 // indirect
17+
github.com/michaelklishin/rabbit-hole/v2 v2.5.0
1918
github.com/onsi/ginkgo v1.14.2
2019
github.com/onsi/gomega v1.10.3
2120
github.com/pelletier/go-toml v1.8.1 // indirect
@@ -26,7 +25,6 @@ require (
2625
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
2726
go.uber.org/multierr v1.2.0 // indirect
2827
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb
29-
golang.org/x/oauth2 v0.0.0-20191122200657-5d9234df094c // indirect
3028
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
3129
gopkg.in/ini.v1 v1.62.0
3230
k8s.io/api v0.18.6

‎go.sum

+163
Large diffs are not rendered by default.

‎internal/resource/configmap.go

+52-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,15 @@ disk_free_limit.absolute = 2GB`
3737
defaultTLSConf = `
3838
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
3939
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
40-
listeners.ssl.default = 5671`
40+
listeners.ssl.default = 5671
41+
42+
management.ssl.certfile = /etc/rabbitmq-tls/tls.crt
43+
management.ssl.keyfile = /etc/rabbitmq-tls/tls.key
44+
management.ssl.port = 15671
45+
`
46+
caCertPath = "/etc/rabbitmq-tls/ca.crt"
47+
tlsCertPath = "/etc/rabbitmq-tls/tls.crt"
48+
tlsKeyPath = "/etc/rabbitmq-tls/tls.key"
4149
)
4250

4351
type ServerConfigMapBuilder struct {
@@ -81,15 +89,57 @@ func (builder *ServerConfigMapBuilder) Update(object runtime.Object) error {
8189
if err := cfg.Append([]byte(defaultTLSConf)); err != nil {
8290
return err
8391
}
92+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
93+
if _, err := defaultSection.NewKey("mqtt.listeners.ssl.default", "8883"); err != nil {
94+
return err
95+
}
96+
}
97+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
98+
if _, err := defaultSection.NewKey("stomp.listeners.ssl.1", "61614"); err != nil {
99+
return err
100+
}
101+
}
84102
}
85103

86104
if builder.Instance.MutualTLSEnabled() {
87-
if _, err := defaultSection.NewKey("ssl_options.cacertfile", "/etc/rabbitmq-tls/"+builder.Instance.Spec.TLS.CaCertName); err != nil {
105+
if _, err := defaultSection.NewKey("ssl_options.cacertfile", caCertPath); err != nil {
88106
return err
89107
}
90108
if _, err := defaultSection.NewKey("ssl_options.verify", "verify_peer"); err != nil {
91109
return err
92110
}
111+
112+
if _, err := defaultSection.NewKey("management.ssl.cacertfile", caCertPath); err != nil {
113+
return err
114+
}
115+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
116+
if _, err := defaultSection.NewKey("web_mqtt.ssl.port", "15676"); err != nil {
117+
return err
118+
}
119+
if _, err := defaultSection.NewKey("web_mqtt.ssl.cacertfile", caCertPath); err != nil {
120+
return err
121+
}
122+
if _, err := defaultSection.NewKey("web_mqtt.ssl.certfile", tlsCertPath); err != nil {
123+
return err
124+
}
125+
if _, err := defaultSection.NewKey("web_mqtt.ssl.keyfile", tlsKeyPath); err != nil {
126+
return err
127+
}
128+
}
129+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
130+
if _, err := defaultSection.NewKey("web_stomp.ssl.port", "15673"); err != nil {
131+
return err
132+
}
133+
if _, err := defaultSection.NewKey("web_stomp.ssl.cacertfile", caCertPath); err != nil {
134+
return err
135+
}
136+
if _, err := defaultSection.NewKey("web_stomp.ssl.certfile", tlsCertPath); err != nil {
137+
return err
138+
}
139+
if _, err := defaultSection.NewKey("web_stomp.ssl.keyfile", tlsKeyPath); err != nil {
140+
return err
141+
}
142+
}
93143
}
94144

95145
if builder.Instance.MemoryLimited() {

‎internal/resource/configmap_test.go

+100-2
Original file line numberDiff line numberDiff line change
@@ -268,11 +268,54 @@ CONSOLE_LOG=new`
268268
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
269269
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
270270
listeners.ssl.default = 5671
271+
272+
management.ssl.certfile = /etc/rabbitmq-tls/tls.crt
273+
management.ssl.keyfile = /etc/rabbitmq-tls/tls.key
274+
management.ssl.port = 15671
271275
`)
272276

273277
Expect(configMapBuilder.Update(configMap)).To(Succeed())
274278
Expect(configMap.Data).To(HaveKeyWithValue("rabbitmq.conf", expectedRabbitmqConf))
275279
})
280+
281+
When("MQTT, STOMP and AMQP 1.0 plugins are enabled", func() {
282+
It("adds TLS config for the additional plugins", func() {
283+
instance = rabbitmqv1beta1.RabbitmqCluster{
284+
ObjectMeta: metav1.ObjectMeta{
285+
Name: "rabbit-tls",
286+
},
287+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
288+
TLS: rabbitmqv1beta1.TLSSpec{
289+
SecretName: "tls-secret",
290+
},
291+
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
292+
AdditionalPlugins: []rabbitmqv1beta1.Plugin{
293+
"rabbitmq_mqtt",
294+
"rabbitmq_stomp",
295+
"rabbitmq_amqp_1_0",
296+
},
297+
},
298+
},
299+
}
300+
301+
expectedRabbitmqConf := iniString(defaultRabbitmqConf(builder.Instance.Name) + `
302+
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
303+
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
304+
listeners.ssl.default = 5671
305+
306+
management.ssl.certfile = /etc/rabbitmq-tls/tls.crt
307+
management.ssl.keyfile = /etc/rabbitmq-tls/tls.key
308+
management.ssl.port = 15671
309+
310+
mqtt.listeners.ssl.default = 8883
311+
312+
stomp.listeners.ssl.1 = 61614
313+
`)
314+
315+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
316+
Expect(configMap.Data).To(HaveKeyWithValue("rabbitmq.conf", expectedRabbitmqConf))
317+
})
318+
})
276319
})
277320

278321
Context("Mutual TLS", func() {
@@ -285,7 +328,6 @@ listeners.ssl.default = 5671
285328
TLS: rabbitmqv1beta1.TLSSpec{
286329
SecretName: "tls-secret",
287330
CaSecretName: "tls-mutual-secret",
288-
CaCertName: "ca.certificate",
289331
},
290332
},
291333
}
@@ -294,13 +336,69 @@ listeners.ssl.default = 5671
294336
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
295337
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
296338
listeners.ssl.default = 5671
297-
ssl_options.cacertfile = /etc/rabbitmq-tls/ca.certificate
339+
340+
management.ssl.certfile = /etc/rabbitmq-tls/tls.crt
341+
management.ssl.keyfile = /etc/rabbitmq-tls/tls.key
342+
management.ssl.port = 15671
343+
344+
ssl_options.cacertfile = /etc/rabbitmq-tls/ca.crt
298345
ssl_options.verify = verify_peer
346+
management.ssl.cacertfile = /etc/rabbitmq-tls/ca.crt
299347
`)
300348

301349
Expect(configMapBuilder.Update(configMap)).To(Succeed())
302350
Expect(configMap.Data).To(HaveKeyWithValue("rabbitmq.conf", expectedRabbitmqConf))
303351
})
352+
353+
When("Web MQTT and Web STOMP are enabled", func() {
354+
It("adds TLS config for the additional plugins", func() {
355+
instance = rabbitmqv1beta1.RabbitmqCluster{
356+
ObjectMeta: metav1.ObjectMeta{
357+
Name: "rabbit-tls",
358+
},
359+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
360+
TLS: rabbitmqv1beta1.TLSSpec{
361+
SecretName: "tls-secret",
362+
CaSecretName: "tls-mutual-secret",
363+
},
364+
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
365+
AdditionalPlugins: []rabbitmqv1beta1.Plugin{
366+
"rabbitmq_web_mqtt",
367+
"rabbitmq_web_stomp",
368+
},
369+
},
370+
},
371+
}
372+
373+
expectedRabbitmqConf := iniString(defaultRabbitmqConf(builder.Instance.Name) + `
374+
ssl_options.certfile = /etc/rabbitmq-tls/tls.crt
375+
ssl_options.keyfile = /etc/rabbitmq-tls/tls.key
376+
listeners.ssl.default = 5671
377+
378+
management.ssl.certfile = /etc/rabbitmq-tls/tls.crt
379+
management.ssl.keyfile = /etc/rabbitmq-tls/tls.key
380+
management.ssl.port = 15671
381+
382+
ssl_options.cacertfile = /etc/rabbitmq-tls/ca.crt
383+
ssl_options.verify = verify_peer
384+
management.ssl.cacertfile = /etc/rabbitmq-tls/ca.crt
385+
386+
web_mqtt.ssl.port = 15676
387+
web_mqtt.ssl.cacertfile = /etc/rabbitmq-tls/ca.crt
388+
web_mqtt.ssl.certfile = /etc/rabbitmq-tls/tls.crt
389+
web_mqtt.ssl.keyfile = /etc/rabbitmq-tls/tls.key
390+
391+
web_stomp.ssl.port = 15673
392+
web_stomp.ssl.cacertfile = /etc/rabbitmq-tls/ca.crt
393+
web_stomp.ssl.certfile = /etc/rabbitmq-tls/tls.crt
394+
web_stomp.ssl.keyfile = /etc/rabbitmq-tls/tls.key
395+
396+
`)
397+
398+
Expect(configMapBuilder.Update(configMap)).To(Succeed())
399+
Expect(configMap.Data).To(HaveKeyWithValue("rabbitmq.conf", expectedRabbitmqConf))
400+
})
401+
})
304402
})
305403

306404
Context("Memory Limits", func() {

‎internal/resource/service.go

+42-2
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ func (builder *ServiceBuilder) updatePorts(servicePorts []corev1.ServicePort) []
117117
TargetPort: intstr.FromInt(5672),
118118
Name: "amqp",
119119
},
120-
"management": {
120+
"http": {
121121
Protocol: corev1.ProtocolTCP,
122122
Port: 15672,
123123
TargetPort: intstr.FromInt(15672),
124-
Name: "management",
124+
Name: "http",
125125
},
126126
}
127127
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
@@ -163,6 +163,46 @@ func (builder *ServiceBuilder) updatePorts(servicePorts []corev1.ServicePort) []
163163
TargetPort: intstr.FromInt(5671),
164164
Name: "amqps",
165165
}
166+
servicePortsMap["management-tls"] = corev1.ServicePort{
167+
Protocol: corev1.ProtocolTCP,
168+
Port: 15671,
169+
TargetPort: intstr.FromInt(15671),
170+
Name: "management-tls",
171+
}
172+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
173+
servicePortsMap["stomps"] = corev1.ServicePort{
174+
Protocol: corev1.ProtocolTCP,
175+
Port: 61614,
176+
Name: "stomps",
177+
TargetPort: intstr.FromInt(61614),
178+
}
179+
}
180+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
181+
servicePortsMap["mqtts"] = corev1.ServicePort{
182+
Protocol: corev1.ProtocolTCP,
183+
Port: 8883,
184+
Name: "mqtts",
185+
TargetPort: intstr.FromInt(8883),
186+
}
187+
}
188+
}
189+
if builder.Instance.MutualTLSEnabled() {
190+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
191+
servicePortsMap["stomps"] = corev1.ServicePort{
192+
Protocol: corev1.ProtocolTCP,
193+
Port: 15673,
194+
Name: "web-stomp-tls",
195+
TargetPort: intstr.FromInt(15673),
196+
}
197+
}
198+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
199+
servicePortsMap["mqtts"] = corev1.ServicePort{
200+
Protocol: corev1.ProtocolTCP,
201+
Port: 15676,
202+
Name: "web-mqtt-tls",
203+
TargetPort: intstr.FromInt(15676),
204+
}
205+
}
166206
}
167207

168208
var updatedServicePorts []corev1.ServicePort

‎internal/resource/service_test.go

+109-14
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ var _ = Context("Services", func() {
7272
})
7373

7474
Context("TLS", func() {
75-
It("opens port 5671 on the service", func() {
75+
It("opens ports for amqps and management-tls on the service", func() {
7676
instance := &rabbitmqv1beta1.RabbitmqCluster{
7777
ObjectMeta: v1.ObjectMeta{
7878
Name: "foo",
@@ -94,13 +94,108 @@ var _ = Context("Services", func() {
9494
}
9595

9696
Expect(serviceBuilder.Update(svc)).To(Succeed())
97-
Expect(svc.Spec.Ports).Should(ContainElement(corev1.ServicePort{
98-
Name: "amqps",
99-
Protocol: "TCP",
100-
Port: 5671,
101-
TargetPort: intstr.FromInt(5671),
97+
Expect(svc.Spec.Ports).Should(ContainElements([]corev1.ServicePort{
98+
{
99+
Name: "amqps",
100+
Protocol: corev1.ProtocolTCP,
101+
Port: 5671,
102+
TargetPort: intstr.FromInt(5671),
103+
},
104+
{
105+
Name: "management-tls",
106+
Protocol: corev1.ProtocolTCP,
107+
Port: 15671,
108+
TargetPort: intstr.FromInt(15671),
109+
},
102110
}))
103111
})
112+
113+
When("mqtt and stomp are enabled", func() {
114+
It("opens ports for those plugins", func() {
115+
instance := &rabbitmqv1beta1.RabbitmqCluster{
116+
ObjectMeta: v1.ObjectMeta{
117+
Name: "foo",
118+
Namespace: "foo-namespace",
119+
},
120+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
121+
TLS: rabbitmqv1beta1.TLSSpec{
122+
SecretName: "tls-secret",
123+
},
124+
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
125+
AdditionalPlugins: []rabbitmqv1beta1.Plugin{"rabbitmq_mqtt", "rabbitmq_stomp"},
126+
},
127+
},
128+
}
129+
builder.Instance = instance
130+
serviceBuilder := builder.Service()
131+
svc := &corev1.Service{
132+
ObjectMeta: metav1.ObjectMeta{
133+
Name: "foo-service",
134+
Namespace: "foo-namespace",
135+
},
136+
}
137+
138+
Expect(serviceBuilder.Update(svc)).To(Succeed())
139+
Expect(svc.Spec.Ports).Should(ContainElements([]corev1.ServicePort{
140+
{
141+
Name: "mqtts",
142+
Protocol: corev1.ProtocolTCP,
143+
Port: 8883,
144+
TargetPort: intstr.FromInt(8883),
145+
},
146+
{
147+
Name: "stomps",
148+
Protocol: corev1.ProtocolTCP,
149+
Port: 61614,
150+
TargetPort: intstr.FromInt(61614),
151+
},
152+
}))
153+
})
154+
})
155+
156+
When("rabbitmq_web_mqtt and rabbitmq_web_stomp are enabled", func() {
157+
It("opens ports for those plugins", func() {
158+
instance := &rabbitmqv1beta1.RabbitmqCluster{
159+
ObjectMeta: v1.ObjectMeta{
160+
Name: "foo",
161+
Namespace: "foo-namespace",
162+
},
163+
Spec: rabbitmqv1beta1.RabbitmqClusterSpec{
164+
TLS: rabbitmqv1beta1.TLSSpec{
165+
SecretName: "tls-secret",
166+
CaSecretName: "ca-secret",
167+
},
168+
Rabbitmq: rabbitmqv1beta1.RabbitmqClusterConfigurationSpec{
169+
AdditionalPlugins: []rabbitmqv1beta1.Plugin{"rabbitmq_web_mqtt", "rabbitmq_web_stomp"},
170+
},
171+
},
172+
}
173+
builder.Instance = instance
174+
serviceBuilder := builder.Service()
175+
svc := &corev1.Service{
176+
ObjectMeta: metav1.ObjectMeta{
177+
Name: "foo-service",
178+
Namespace: "foo-namespace",
179+
},
180+
}
181+
182+
Expect(serviceBuilder.Update(svc)).To(Succeed())
183+
Expect(svc.Spec.Ports).Should(ContainElements([]corev1.ServicePort{
184+
{
185+
Name: "web-mqtt-tls",
186+
Protocol: corev1.ProtocolTCP,
187+
Port: 15676,
188+
TargetPort: intstr.FromInt(15676),
189+
},
190+
{
191+
Name: "web-stomp-tls",
192+
Protocol: corev1.ProtocolTCP,
193+
Port: 15673,
194+
TargetPort: intstr.FromInt(15673),
195+
},
196+
}))
197+
})
198+
})
104199
})
105200

106201
Context("Annotations", func() {
@@ -298,13 +393,13 @@ var _ = Context("Services", func() {
298393
TargetPort: intstr.FromInt(5672),
299394
Protocol: corev1.ProtocolTCP,
300395
}
301-
managementPort := corev1.ServicePort{
302-
Name: "management",
396+
httpPort := corev1.ServicePort{
397+
Name: "http",
303398
Port: 15672,
304399
TargetPort: intstr.FromInt(15672),
305400
Protocol: corev1.ProtocolTCP,
306401
}
307-
Expect(svc.Spec.Ports).To(ConsistOf(amqpPort, managementPort))
402+
Expect(svc.Spec.Ports).To(ConsistOf(amqpPort, httpPort))
308403
})
309404

310405
DescribeTable("plugins exposing ports",
@@ -349,8 +444,8 @@ var _ = Context("Services", func() {
349444
{
350445
Protocol: corev1.ProtocolTCP,
351446
Port: 15672,
447+
Name: "http",
352448
TargetPort: intstr.FromInt(15672),
353-
Name: "management",
354449
NodePort: 1234,
355450
},
356451
}
@@ -366,16 +461,16 @@ var _ = Context("Services", func() {
366461
TargetPort: intstr.FromInt(5672),
367462
NodePort: 12345,
368463
}
369-
expectedManagementServicePort := corev1.ServicePort{
464+
expectedHTTPServicePort := corev1.ServicePort{
370465
Protocol: corev1.ProtocolTCP,
371466
Port: 15672,
467+
Name: "http",
372468
TargetPort: intstr.FromInt(15672),
373-
Name: "management",
374469
NodePort: 1234,
375470
}
376471

377472
Expect(svc.Spec.Ports).To(ContainElement(expectedAmqpServicePort))
378-
Expect(svc.Spec.Ports).To(ContainElement(expectedManagementServicePort))
473+
Expect(svc.Spec.Ports).To(ContainElement(expectedHTTPServicePort))
379474
})
380475

381476
It("unsets nodePort after updating from NodePort to ClusterIP", func() {
@@ -519,7 +614,7 @@ var _ = Context("Services", func() {
519614
Protocol: corev1.ProtocolTCP,
520615
},
521616
corev1.ServicePort{
522-
Name: "management",
617+
Name: "http",
523618
Port: 15672,
524619
TargetPort: intstr.FromInt(15672),
525620
Protocol: corev1.ProtocolTCP,

‎internal/resource/statefulset.go

+39-9
Original file line numberDiff line numberDiff line change
@@ -437,12 +437,31 @@ func (builder *StatefulSetBuilder) podTemplateSpec(previousPodAnnotations map[st
437437
}
438438

439439
tlsSpec := builder.Instance.Spec.TLS
440-
if tlsSpec.SecretName != "" {
441-
// add tls port
440+
if builder.Instance.TLSEnabled() {
442441
ports = append(ports, corev1.ContainerPort{
443442
Name: "amqps",
444443
ContainerPort: 5671,
445-
})
444+
},
445+
corev1.ContainerPort{
446+
Name: "management-tls",
447+
ContainerPort: 15671,
448+
},
449+
)
450+
451+
// enable tls ports for plugins
452+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_mqtt") {
453+
ports = append(ports, corev1.ContainerPort{
454+
Name: "mqtts",
455+
ContainerPort: 8883,
456+
})
457+
}
458+
459+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_stomp") {
460+
ports = append(ports, corev1.ContainerPort{
461+
Name: "stomps",
462+
ContainerPort: 61614,
463+
})
464+
}
446465

447466
// add tls volume
448467
filePermissions := int32(400)
@@ -474,14 +493,12 @@ func (builder *StatefulSetBuilder) podTemplateSpec(previousPodAnnotations map[st
474493
})
475494

476495
if builder.Instance.MutualTLSEnabled() {
477-
caCertName := builder.Instance.Spec.TLS.CaCertName
478-
479496
if builder.Instance.SingleTLSSecret() {
480497
//Mount CaCert in TLS Secret
481498
rabbitmqContainerVolumeMounts = append(rabbitmqContainerVolumeMounts, corev1.VolumeMount{
482499
Name: "rabbitmq-tls",
483-
MountPath: fmt.Sprintf("/etc/rabbitmq-tls/%s", caCertName),
484-
SubPath: caCertName,
500+
MountPath: "/etc/rabbitmq-tls/ca.crt",
501+
SubPath: "ca.crt",
485502
ReadOnly: true,
486503
})
487504
} else {
@@ -501,11 +518,24 @@ func (builder *StatefulSetBuilder) podTemplateSpec(previousPodAnnotations map[st
501518
//Mount new volume to same path as TLS cert and key
502519
rabbitmqContainerVolumeMounts = append(rabbitmqContainerVolumeMounts, corev1.VolumeMount{
503520
Name: "rabbitmq-mutual-tls",
504-
MountPath: fmt.Sprintf("/etc/rabbitmq-tls/%s", caCertName),
505-
SubPath: caCertName,
521+
MountPath: "/etc/rabbitmq-tls/ca.crt",
522+
SubPath: "ca.crt",
506523
ReadOnly: true,
507524
})
508525
}
526+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_mqtt") {
527+
ports = append(ports, corev1.ContainerPort{
528+
Name: "web-mqtt-tls",
529+
ContainerPort: 15676,
530+
})
531+
}
532+
533+
if builder.Instance.AdditionalPluginEnabled("rabbitmq_web_stomp") {
534+
ports = append(ports, corev1.ContainerPort{
535+
Name: "web-stomp-tls",
536+
ContainerPort: 15673,
537+
})
538+
}
509539
}
510540
}
511541

‎internal/resource/statefulset_test.go

+52-11
Original file line numberDiff line numberDiff line change
@@ -600,23 +600,46 @@ var _ = Describe("StatefulSet", func() {
600600
}))
601601
})
602602

603-
It("opens port 5671 on the rabbitmq container", func() {
603+
It("opens tls ports for amqps and management-tls on the rabbitmq container", func() {
604604
instance.Spec.TLS.SecretName = "tls-secret"
605605
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
606606

607607
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
608-
Expect(rabbitmqContainerSpec.Ports).To(ContainElement(corev1.ContainerPort{
609-
Name: "amqps",
610-
ContainerPort: 5671,
608+
Expect(rabbitmqContainerSpec.Ports).To(ContainElements([]corev1.ContainerPort{
609+
{
610+
Name: "amqps",
611+
ContainerPort: 5671,
612+
},
613+
{
614+
Name: "management-tls",
615+
ContainerPort: 15671,
616+
},
611617
}))
612618
})
613619

614-
Context("Mutual TLS (same secret)", func() {
620+
It("opens tls ports when mqtt and stomp are configured", func() {
621+
instance.Spec.TLS.SecretName = "tls-secret"
622+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{rabbitmqv1beta1.Plugin("rabbitmq_mqtt"), rabbitmqv1beta1.Plugin("rabbitmq_stomp")}
623+
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
624+
625+
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
615626

627+
Expect(rabbitmqContainerSpec.Ports).To(ContainElements([]corev1.ContainerPort{
628+
{
629+
Name: "mqtts",
630+
ContainerPort: 8883,
631+
},
632+
{
633+
Name: "stomps",
634+
ContainerPort: 61614,
635+
},
636+
}))
637+
})
638+
639+
Context("Mutual TLS (same secret)", func() {
616640
It("add a TLS CA cert volume mount to the rabbitmq container", func() {
617641
instance.Spec.TLS.SecretName = "tls-secret"
618642
instance.Spec.TLS.CaSecretName = "tls-secret"
619-
instance.Spec.TLS.CaCertName = "ca.crt"
620643
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
621644

622645
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
@@ -627,28 +650,46 @@ var _ = Describe("StatefulSet", func() {
627650
ReadOnly: true,
628651
}))
629652
})
653+
654+
It("opens tls ports when rabbitmq_web_mqtt and rabbitmq_web_stomp are configured", func() {
655+
instance.Spec.TLS.SecretName = "tls-secret"
656+
instance.Spec.TLS.CaSecretName = "tls-secret"
657+
instance.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{rabbitmqv1beta1.Plugin("rabbitmq_web_mqtt"), rabbitmqv1beta1.Plugin("rabbitmq_web_stomp")}
658+
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
659+
660+
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
661+
662+
Expect(rabbitmqContainerSpec.Ports).To(ContainElements([]corev1.ContainerPort{
663+
{
664+
Name: "web-mqtt-tls",
665+
ContainerPort: 15676,
666+
},
667+
{
668+
Name: "web-stomp-tls",
669+
ContainerPort: 15673,
670+
},
671+
}))
672+
})
630673
})
631674

632675
Context("Mutual TLS (different secret)", func() {
633-
634676
It("add a TLS CA cert volume mount to the rabbitmq container", func() {
635677
instance.Spec.TLS.SecretName = "tls-secret"
636678
instance.Spec.TLS.CaSecretName = "mutual-tls-secret"
637-
instance.Spec.TLS.CaCertName = "caCertificate"
638679
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
639680

640681
rabbitmqContainerSpec := extractContainer(statefulSet.Spec.Template.Spec.Containers, "rabbitmq")
641682
Expect(rabbitmqContainerSpec.VolumeMounts).To(ContainElement(corev1.VolumeMount{
642683
Name: "rabbitmq-mutual-tls",
643-
MountPath: "/etc/rabbitmq-tls/caCertificate",
644-
SubPath: "caCertificate",
684+
MountPath: "/etc/rabbitmq-tls/ca.crt",
685+
SubPath: "ca.crt",
645686
ReadOnly: true,
646687
}))
647688
})
689+
648690
It("adds a mutual TLS volume to the pod template spec", func() {
649691
instance.Spec.TLS.SecretName = "tls-secret"
650692
instance.Spec.TLS.CaSecretName = "mutual-tls-secret"
651-
instance.Spec.TLS.CaCertName = "caCertificate"
652693
Expect(stsBuilder.Update(statefulSet)).To(Succeed())
653694

654695
filePermissions := int32(400)

‎system_tests/system_tests.go

+77-25
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ package system_tests
1111

1212
import (
1313
"context"
14+
"crypto/tls"
15+
"crypto/x509"
1416
"encoding/json"
17+
"io/ioutil"
1518

1619
. "github.com/onsi/ginkgo"
1720
. "github.com/onsi/gomega"
@@ -46,7 +49,7 @@ var _ = Describe("Operator", func() {
4649
waitForRabbitmqRunning(cluster)
4750

4851
hostname = kubernetesNodeIp(ctx, clientSet)
49-
port = rabbitmqNodePort(ctx, clientSet, cluster, "management")
52+
port = rabbitmqNodePort(ctx, clientSet, cluster, "http")
5053

5154
var err error
5255
username, password, err = getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name)
@@ -246,7 +249,7 @@ CONSOLE_LOG=new`
246249
waitForRabbitmqRunning(cluster)
247250

248251
hostname = kubernetesNodeIp(ctx, clientSet)
249-
port = rabbitmqNodePort(ctx, clientSet, cluster, "management")
252+
port = rabbitmqNodePort(ctx, clientSet, cluster, "http")
250253

251254
var err error
252255
username, password, err = getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name)
@@ -302,7 +305,7 @@ CONSOLE_LOG=new`
302305
It("works", func() {
303306
username, password, err := getUsernameAndPassword(ctx, clientSet, cluster.Namespace, cluster.Name)
304307
hostname := kubernetesNodeIp(ctx, clientSet)
305-
port := rabbitmqNodePort(ctx, clientSet, cluster, "management")
308+
port := rabbitmqNodePort(ctx, clientSet, cluster, "http")
306309
Expect(err).NotTo(HaveOccurred())
307310
assertHttpReady(hostname, port)
308311

@@ -316,49 +319,98 @@ CONSOLE_LOG=new`
316319
Context("TLS", func() {
317320
When("TLS is correctly configured", func() {
318321
var (
319-
cluster *rabbitmqv1beta1.RabbitmqCluster
320-
hostname string
321-
amqpsNodePort string
322-
username string
323-
password string
324-
caFilePath string
322+
cluster *rabbitmqv1beta1.RabbitmqCluster
323+
hostname string
324+
amqpsNodePort string
325+
managementTLSNodePort string
326+
username string
327+
password string
328+
caFilePath string
325329
)
326330

327331
BeforeEach(func() {
328332
cluster = newRabbitmqCluster(namespace, "tls-test-rabbit")
333+
// Enable additional plugins that can share TLS config.
334+
cluster.Spec.Rabbitmq.AdditionalPlugins = []rabbitmqv1beta1.Plugin{
335+
"rabbitmq_mqtt",
336+
"rabbitmq_stomp",
337+
}
329338
Expect(createRabbitmqCluster(ctx, rmqClusterClient, cluster)).To(Succeed())
330339
waitForRabbitmqRunning(cluster)
331340

332-
// Passing a single hostname for certificate creation works because
341+
// Passing a single hostname for certificate creation
333342
// the AMPQS client is connecting using the same hostname
334343
hostname = kubernetesNodeIp(ctx, clientSet)
335344
caFilePath = createTLSSecret("rabbitmq-tls-test-secret", namespace, hostname)
336345

337-
// Update CR with TLS secret name
346+
// Update RabbitmqCluster with TLS secret name
338347
Expect(updateRabbitmqCluster(ctx, rmqClusterClient, cluster.Name, cluster.Namespace, func(cluster *rabbitmqv1beta1.RabbitmqCluster) {
339348
cluster.Spec.TLS.SecretName = "rabbitmq-tls-test-secret"
340349
})).To(Succeed())
341350
waitForTLSUpdate(cluster)
342-
amqpsNodePort = rabbitmqNodePort(ctx, clientSet, cluster, "amqps")
343351
})
344352

345353
AfterEach(func() {
346354
Expect(rmqClusterClient.Delete(context.TODO(), cluster)).To(Succeed())
347355
Expect(k8sDeleteSecret("rabbitmq-tls-test-secret", namespace)).To(Succeed())
348356
})
349357

350-
It("talks amqps with RabbitMQ", func() {
351-
var err error
352-
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", "tls-test-rabbit")
353-
Expect(err).NotTo(HaveOccurred())
358+
It("RabbitMQ responds to requests over secured protocols", func() {
359+
By("talking AMQPS", func() {
360+
amqpsNodePort = rabbitmqNodePort(ctx, clientSet, cluster, "amqps")
361+
var err error
362+
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", "tls-test-rabbit")
363+
Expect(err).NotTo(HaveOccurred())
354364

355-
// try to publish and consume a message on a amqps url
356-
sentMessage := "Hello Rabbitmq!"
357-
Expect(publishToQueueAMQPS(sentMessage, username, password, hostname, amqpsNodePort, caFilePath)).To(Succeed())
365+
// try to publish and consume a message on a amqps url
366+
sentMessage := "Hello Rabbitmq!"
367+
Expect(publishToQueueAMQPS(sentMessage, username, password, hostname, amqpsNodePort, caFilePath)).To(Succeed())
358368

359-
recievedMessage, err := getMessageFromQueueAMQPS(username, password, hostname, amqpsNodePort, caFilePath)
360-
Expect(err).NotTo(HaveOccurred())
361-
Expect(recievedMessage).To(Equal(sentMessage))
369+
recievedMessage, err := getMessageFromQueueAMQPS(username, password, hostname, amqpsNodePort, caFilePath)
370+
Expect(err).NotTo(HaveOccurred())
371+
Expect(recievedMessage).To(Equal(sentMessage))
372+
})
373+
374+
By("connecting to management API over TLS", func() {
375+
var err error
376+
377+
managementTLSNodePort = rabbitmqNodePort(ctx, clientSet, cluster, "management-tls")
378+
379+
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", "tls-test-rabbit")
380+
Expect(err).NotTo(HaveOccurred())
381+
382+
err = connectHTTPS(username, password, hostname, managementTLSNodePort, caFilePath)
383+
Expect(err).NotTo(HaveOccurred())
384+
})
385+
386+
By("talking MQTTS", func() {
387+
var err error
388+
// TLSConfig()
389+
cfg := new(tls.Config)
390+
cfg.RootCAs = x509.NewCertPool()
391+
ca, err := ioutil.ReadFile(caFilePath)
392+
Expect(err).NotTo(HaveOccurred())
393+
394+
cfg.RootCAs.AppendCertsFromPEM(ca)
395+
396+
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", "tls-test-rabbit")
397+
Expect(err).NotTo(HaveOccurred())
398+
publishAndConsumeMQTTMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "mqtts"), username, password, false, cfg)
399+
})
400+
401+
By("talking STOMPS", func() {
402+
var err error
403+
cfg := new(tls.Config)
404+
cfg.RootCAs = x509.NewCertPool()
405+
ca, err := ioutil.ReadFile(caFilePath)
406+
Expect(err).NotTo(HaveOccurred())
407+
408+
cfg.RootCAs.AppendCertsFromPEM(ca)
409+
410+
username, password, err = getUsernameAndPassword(ctx, clientSet, "rabbitmq-system", "tls-test-rabbit")
411+
Expect(err).NotTo(HaveOccurred())
412+
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomps"), username, password, cfg)
413+
})
362414
})
363415
})
364416

@@ -410,13 +462,13 @@ CONSOLE_LOG=new`
410462

411463
It("publishes and consumes a message", func() {
412464
By("MQTT")
413-
publishAndConsumeMQTTMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "mqtt"), username, password, false)
465+
publishAndConsumeMQTTMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "mqtt"), username, password, false, nil)
414466

415467
By("MQTT-over-WebSockets")
416-
publishAndConsumeMQTTMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "web-mqtt"), username, password, true)
468+
publishAndConsumeMQTTMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "web-mqtt"), username, password, true, nil)
417469

418470
By("STOMP")
419-
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password)
471+
publishAndConsumeSTOMPMsg(hostname, rabbitmqNodePort(ctx, clientSet, cluster, "stomp"), username, password, nil)
420472

421473
// github.com/go-stomp/stomp does not support STOMP-over-WebSockets
422474
})

‎system_tests/utils.go

+68-14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
k8sresource "k8s.io/apimachinery/pkg/api/resource"
3131

32+
mgmtApi "github.com/michaelklishin/rabbit-hole/v2"
3233
"sigs.k8s.io/controller-runtime/pkg/client"
3334

3435
"github.com/cloudflare/cfssl/csr"
@@ -209,6 +210,28 @@ func publishToQueue(rabbitmqHostName, rabbitmqPort, rabbitmqUsername, rabbitmqPa
209210
return nil
210211
}
211212

213+
func connectHTTPS(username, password, hostname, httpsNodePort, caFilePath string) (err error) {
214+
// create TLS config for https request
215+
cfg := new(tls.Config)
216+
cfg.RootCAs = x509.NewCertPool()
217+
ca, err := ioutil.ReadFile(caFilePath)
218+
if err != nil {
219+
return err
220+
}
221+
222+
cfg.RootCAs.AppendCertsFromPEM(ca)
223+
224+
transport := &http.Transport{TLSClientConfig: cfg}
225+
rmqc, err := mgmtApi.NewTLSClient(fmt.Sprintf("https://%v:%v", hostname, httpsNodePort), username, password, transport)
226+
if err != nil {
227+
return err
228+
}
229+
230+
_, err = rmqc.Overview()
231+
232+
return err
233+
}
234+
212235
func connectAMQPS(username, password, hostname, port, caFilePath string) (conn *amqp.Connection, err error) {
213236
// create TLS config for amqps request
214237
cfg := new(tls.Config)
@@ -729,7 +752,7 @@ func createCertificateChain(hostname string, caCertWriter, certWriter, keyWriter
729752
return nil
730753
}
731754

732-
func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, overWebSocket bool) {
755+
func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, overWebSocket bool, tlsConfig *tls.Config) {
733756
url := fmt.Sprintf("tcp://%s:%s", hostname, nodePort)
734757
if overWebSocket {
735758
url = fmt.Sprintf("ws://%s:%s/ws", hostname, nodePort)
@@ -741,6 +764,13 @@ func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, ove
741764
SetClientID("system tests MQTT plugin").
742765
SetProtocolVersion(4) // RabbitMQ MQTT plugin targets MQTT 3.1.1
743766

767+
if tlsConfig != nil {
768+
url = fmt.Sprintf("ssl://%s:%s", hostname, nodePort)
769+
opts = opts.
770+
AddBroker(url).
771+
SetTLSConfig(tlsConfig)
772+
}
773+
744774
c := mqtt.NewClient(opts)
745775

746776
var token mqtt.Token
@@ -786,24 +816,48 @@ func publishAndConsumeMQTTMsg(hostname, nodePort, username, password string, ove
786816
c.Disconnect(250)
787817
}
788818

789-
func publishAndConsumeSTOMPMsg(hostname, stompNodePort, username, password string) {
819+
func publishAndConsumeSTOMPMsg(hostname, stompNodePort, username, password string, tlsConfig *tls.Config) {
790820
var conn *stomp.Conn
791821
var err error
792-
for retry := 0; retry < 5; retry++ {
793-
fmt.Printf("Attempt #%d to connect using STOMP\n", retry)
794-
conn, err = stomp.Dial("tcp",
795-
fmt.Sprintf("%s:%s", hostname, stompNodePort),
796-
stomp.ConnOpt.Login(username, password),
797-
stomp.ConnOpt.AcceptVersion(stomp.V12), // RabbitMQ STOMP plugin supports STOMP versions 1.0 through 1.2
798-
stomp.ConnOpt.Host("/"), // default virtual host
799-
)
800822

801-
if err == nil {
802-
break
823+
// Create a secure tls.Conn and pass to stomp.Connect, otherwise use Stomp.Dial
824+
if tlsConfig != nil {
825+
secureConn, err := tls.Dial("tcp", fmt.Sprintf("%s:%s", hostname, stompNodePort), tlsConfig)
826+
Expect(err).NotTo(HaveOccurred())
827+
defer secureConn.Close()
828+
829+
for retry := 0; retry < 5; retry++ {
830+
fmt.Printf("Attempt #%d to connect using STOMPS\n", retry)
831+
conn, err = stomp.Connect(secureConn,
832+
stomp.ConnOpt.Login(username, password),
833+
stomp.ConnOpt.AcceptVersion(stomp.V12), // RabbitMQ STOMP plugin supports STOMP versions 1.0 through 1.2
834+
stomp.ConnOpt.Host("/"), // default virtual host
835+
)
836+
837+
if err == nil {
838+
break
839+
}
840+
841+
time.Sleep(2 * time.Second)
842+
}
843+
} else {
844+
for retry := 0; retry < 5; retry++ {
845+
fmt.Printf("Attempt #%d to connect using STOMP\n", retry)
846+
conn, err = stomp.Dial("tcp",
847+
fmt.Sprintf("%s:%s", hostname, stompNodePort),
848+
stomp.ConnOpt.Login(username, password),
849+
stomp.ConnOpt.AcceptVersion(stomp.V12),
850+
stomp.ConnOpt.Host("/"),
851+
)
852+
853+
if err == nil {
854+
break
855+
}
856+
857+
time.Sleep(2 * time.Second)
803858
}
804-
805-
time.Sleep(2 * time.Second)
806859
}
860+
807861
Expect(err).ToNot(HaveOccurred())
808862

809863
queue := "/queue/system-tests-stomp"

0 commit comments

Comments
 (0)
Please sign in to comment.