Skip to content

Commit 174671a

Browse files
committed
Bug 1904380 - Forwarding logs to Kafka using Chained certificates fails with error "state=error: certificate verify failed (unable to get local issuer certificate)
- Upgraded Kafka to 2.7.0 and pushed images to quay.io - Switched the kafka logforwarding test to run over TLS - The Kafka broker certificate chain is Server - Intermediate CA - Root CA - Added Java Key Store generation functions - Removed Kafka consumer output to stdio to avoid recursive logs
1 parent f6ea670 commit 174671a

29 files changed

+1659
-35
lines changed

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
github.com/openshift/api v0.0.0-20200602204738-768b7001fe69
2424
github.com/openshift/elasticsearch-operator v0.0.0-20200722044541-14fae5dcddfd
2525
github.com/operator-framework/operator-sdk v0.18.1
26+
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0
2627
github.com/prometheus/procfs v0.0.10 // indirect
2728
github.com/rogpeppe/go-internal v1.5.2 // indirect
2829
github.com/spf13/pflag v1.0.5

go.sum

+5
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,10 @@ github.com/otiai10/mint v1.3.1 h1:BCmzIS3n71sGfHB5NMNDB3lHYPz8fWSkCAErHed//qc=
758758
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
759759
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
760760
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
761+
github.com/pavel-v-chernykh/keystore-go v1.0.0 h1:q7+IYDgDRi7uutqeqndr838MAw5t8Gp1i5GpXwqMTxs=
762+
github.com/pavel-v-chernykh/keystore-go v2.1.0+incompatible h1:Jd6xfriVlJ6hWPvYOE0Ni0QWcNTLRehfGPFxr3eSL80=
763+
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0 h1:xKxUVGoB9VJU+lgQLPN0KURjw+XCVVSpHfQEeyxk3zo=
764+
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0/go.mod h1:2ejgys4qY+iNVW1IittZhyRYA6MNv8TgM6VHqojbB9g=
761765
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
762766
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
763767
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
@@ -1005,6 +1009,7 @@ golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPh
10051009
golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10061010
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10071011
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
1012+
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10081013
golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
10091014
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
10101015
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=

test/e2e/logforwarding/fluent/fluent_secure_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ var _ = Describe("[ClusterLogForwarder]", func() {
3131
f = NewFixture(c.NS.Name, secureMessage)
3232

3333
// Receiver acts as TLS server.
34-
privateCA = certificate.NewCA(nil)
35-
serverCert = certificate.NewCert(privateCA, f.Receiver.Host()) // Receiver is server.
36-
clientCert = certificate.NewCert(privateCA)
34+
privateCA = certificate.NewCA(nil, "Root CA")
35+
serverCert = certificate.NewCert(privateCA, "Server", f.Receiver.Host()) // Receiver is server.
36+
clientCert = certificate.NewCert(privateCA, "Client")
3737
sharedKey = "top-secret"
3838
// The Receiver Sources act as TLS servers.
3939
for i, s := range []*fluentd.Source{

test/e2e/logforwarding/kafka/forward_to_kafka_test.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/ViaQ/logerr/log"
1212
loggingv1 "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
13-
v1 "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
1413
"github.com/openshift/cluster-logging-operator/test/helpers"
1514
"github.com/openshift/cluster-logging-operator/test/helpers/kafka"
1615
apps "k8s.io/api/apps/v1"
@@ -63,6 +62,9 @@ var _ = Describe("[ClusterLogForwarder] Forwards logs", func() {
6362
e2e.LogStores[app.Name].ClusterLocalEndpoint(),
6463
kafka.DefaultTopic,
6564
),
65+
Secret: &loggingv1.OutputSecretSpec{
66+
Name: kafka.DeploymentName,
67+
},
6668
},
6769
},
6870
Pipelines: []loggingv1.PipelineSpec{
@@ -152,30 +154,39 @@ var _ = Describe("[ClusterLogForwarder] Forwards logs", func() {
152154
Type: loggingv1.OutputTypeKafka,
153155
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
154156
OutputTypeSpec: loggingv1.OutputTypeSpec{
155-
Kafka: &v1.Kafka{
157+
Kafka: &loggingv1.Kafka{
156158
Topic: kafka.AppLogsTopic,
157159
},
158160
},
161+
Secret: &loggingv1.OutputSecretSpec{
162+
Name: kafka.DeploymentName,
163+
},
159164
},
160165
{
161166
Name: fmt.Sprintf("%s-audit-out", kafka.DeploymentName),
162167
Type: loggingv1.OutputTypeKafka,
163168
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
164169
OutputTypeSpec: loggingv1.OutputTypeSpec{
165-
Kafka: &v1.Kafka{
170+
Kafka: &loggingv1.Kafka{
166171
Topic: kafka.AuditLogsTopic,
167172
},
168173
},
174+
Secret: &loggingv1.OutputSecretSpec{
175+
Name: kafka.DeploymentName,
176+
},
169177
},
170178
{
171179
Name: fmt.Sprintf("%s-infra-out", kafka.DeploymentName),
172180
Type: loggingv1.OutputTypeKafka,
173181
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
174182
OutputTypeSpec: loggingv1.OutputTypeSpec{
175-
Kafka: &v1.Kafka{
183+
Kafka: &loggingv1.Kafka{
176184
Topic: kafka.InfraLogsTopic,
177185
},
178186
},
187+
Secret: &loggingv1.OutputSecretSpec{
188+
Name: kafka.DeploymentName,
189+
},
179190
},
180191
},
181192
Pipelines: []loggingv1.PipelineSpec{

test/helpers/certificate/certificate.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ func (ck *CertKey) StartServer(handler http.Handler, clientCA *CertKey) *httptes
9595
}
9696

9797
// NewCA creates a new dummy CA cert signed by signer, or self-signed if signer is nil.
98-
func NewCA(signer *CertKey) *CertKey {
98+
func NewCA(signer *CertKey, org string) *CertKey {
9999
return New(&x509.Certificate{
100100
SerialNumber: big.NewInt(1234),
101101
Subject: pkix.Name{
102-
Organization: []string{"Testing, INC."},
102+
Organization: []string{org},
103103
// Country: []string{"US"},
104104
// Province: []string{""},
105105
// Locality: []string{"San Francisco"},
@@ -118,7 +118,7 @@ func NewCA(signer *CertKey) *CertKey {
118118
// NewCert creates a dummy server cert signed by signer, or self-signed if signer is nil.
119119
// The addrs list can contain strings (DNS names) or net.IP addresses, if addrs
120120
// is empty will use "localhost", v4 and v6 loopback
121-
func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
121+
func NewCert(signer *CertKey, org string, addrs ...interface{}) *CertKey {
122122
var (
123123
dns []string
124124
ips []net.IP
@@ -138,7 +138,7 @@ func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
138138
}
139139
return New(&x509.Certificate{
140140
SerialNumber: big.NewInt(1234),
141-
Subject: pkix.Name{Organization: []string{"Testing, INC."}},
141+
Subject: pkix.Name{Organization: []string{org}},
142142
NotBefore: time.Now(),
143143
NotAfter: time.Now().AddDate(10, 0, 0),
144144
SubjectKeyId: []byte{1, 2, 3, 4, 6},
@@ -151,7 +151,7 @@ func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
151151
}
152152

153153
// NewServer creates a dummy client cert signed by signer, or self-signed if signer is nil.
154-
func NewClient(signer *CertKey) *CertKey {
154+
func NewClient(signer *CertKey, org string) *CertKey {
155155
// Its the same as a server cert but with no DNS/IP addrs.
156-
return NewCert(signer)
156+
return NewCert(signer, org)
157157
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package certificate
2+
3+
import (
4+
"bytes"
5+
"crypto/x509"
6+
"strconv"
7+
"time"
8+
9+
"github.com/openshift/cluster-logging-operator/test"
10+
"github.com/pavel-v-chernykh/keystore-go/v4"
11+
)
12+
13+
func JKSKeyStore(certKey *CertKey, password string) []byte {
14+
ks := keystore.New()
15+
keyPKCS8Bytes, err := x509.MarshalPKCS8PrivateKey(certKey.PrivateKey)
16+
test.Must(err)
17+
pke := keystore.PrivateKeyEntry{
18+
CreationTime: time.Now(),
19+
PrivateKey: keyPKCS8Bytes,
20+
CertificateChain: []keystore.Certificate{
21+
{
22+
Type: "X509",
23+
Content: certKey.DERBytes,
24+
},
25+
},
26+
}
27+
err = ks.SetPrivateKeyEntry("alias", pke, []byte(password))
28+
test.Must(err)
29+
var buf bytes.Buffer
30+
err = ks.Store(&buf, []byte(password))
31+
test.Must(err)
32+
return buf.Bytes()
33+
}
34+
35+
func JKSTrustStore(certKeys []*CertKey, password string) []byte {
36+
ks := keystore.New()
37+
for i := range certKeys {
38+
err := ks.SetTrustedCertificateEntry(
39+
strconv.Itoa(i),
40+
keystore.TrustedCertificateEntry{
41+
CreationTime: time.Now(),
42+
Certificate: keystore.Certificate{
43+
Type: "X509",
44+
Content: certKeys[i].DERBytes,
45+
},
46+
},
47+
)
48+
test.Must(err)
49+
}
50+
var buf bytes.Buffer
51+
err := ks.Store(&buf, []byte(password))
52+
test.Must(err)
53+
return buf.Bytes()
54+
}

test/helpers/certificate/certificate_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616
var _ = Describe("Certificate", func() {
1717

1818
It("enables client-server TLS connection with local CA", func() {
19-
ca := certificate.NewCA(nil) // Self-signed CA
20-
cert := certificate.NewCert(ca, "localhost", net.IPv4(127, 0, 0, 1), net.IPv6loopback)
19+
ca := certificate.NewCA(nil, "Root CA") // Self-signed CA
20+
cert := certificate.NewCert(ca, "Server", "localhost", net.IPv4(127, 0, 0, 1), net.IPv6loopback)
2121
server := cert.StartServer(http.HandlerFunc(
2222
func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "success!") }), nil)
2323
defer server.Close()
@@ -34,13 +34,13 @@ var _ = Describe("Certificate", func() {
3434
})
3535

3636
It("enables TLS mutual authentication", func() {
37-
ca := certificate.NewCA(nil) // Self-signed CA
38-
serverCert := certificate.NewCert(ca)
37+
ca := certificate.NewCA(nil, "Root CA") // Self-signed CA
38+
serverCert := certificate.NewCert(ca, "Server")
3939
server := serverCert.StartServer(http.HandlerFunc(
4040
func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "success!") }), ca)
4141
defer server.Close()
4242

43-
clientCert := certificate.NewClient(ca)
43+
clientCert := certificate.NewClient(ca, "Client")
4444
client := ca.Client(clientCert)
4545
resp, err := client.Get(server.URL)
4646
ExpectOK(err)

test/helpers/kafka.go

+23
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,10 @@ func (tc *E2ETestFramework) createKafkaBroker() (*apps.StatefulSet, error) {
176176
return nil, err
177177
}
178178

179+
if err := tc.createKafkaBrokerSecret(); err != nil {
180+
return nil, err
181+
}
182+
179183
if err := tc.createKafkaBrokerService(); err != nil {
180184
return nil, err
181185
}
@@ -357,6 +361,25 @@ func (tc *E2ETestFramework) createKafkaBrokerConfigMap() error {
357361
return nil
358362
}
359363

364+
func (tc *E2ETestFramework) createKafkaBrokerSecret() error {
365+
s := kafka.NewBrokerSecret(OpenshiftLoggingNS)
366+
367+
tc.AddCleanup(func() error {
368+
var zerograce int64
369+
opts := metav1.DeleteOptions{
370+
GracePeriodSeconds: &zerograce,
371+
}
372+
return tc.KubeClient.CoreV1().Secrets(OpenshiftLoggingNS).Delete(context.TODO(), s.GetName(), opts)
373+
})
374+
375+
opts := metav1.CreateOptions{}
376+
if _, err := tc.KubeClient.CoreV1().Secrets(OpenshiftLoggingNS).Create(context.TODO(), s, opts); err != nil {
377+
return err
378+
}
379+
380+
return nil
381+
}
382+
360383
func (tc *E2ETestFramework) createZookeeperConfigMap() error {
361384
cm := kafka.NewZookeeperConfigMap(OpenshiftLoggingNS)
362385

test/helpers/kafka/broker.go

+46-7
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package kafka
22

33
import (
4+
"bytes"
45
"fmt"
6+
"strconv"
57

68
"github.com/openshift/cluster-logging-operator/pkg/factory"
79
"github.com/openshift/cluster-logging-operator/pkg/k8shandler"
10+
"github.com/openshift/cluster-logging-operator/test/helpers/certificate"
811
apps "k8s.io/api/apps/v1"
912
v1 "k8s.io/api/core/v1"
1013
rbacv1 "k8s.io/api/rbac/v1"
@@ -20,7 +23,7 @@ const (
2023
kafkaBrokerProvider = "openshift"
2124
kafkaNodeReader = "kafka-node-reader"
2225
kafkaNodeReaderBinding = "kafka-node-reader-binding"
23-
kafkaInsidePort = 9092
26+
kafkaInsidePort = 9093
2427
kafkaOutsidePort = 9094
2528
kafkaJMXPort = 5555
2629
)
@@ -84,7 +87,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
8487
InitContainers: []v1.Container{
8588
{
8689
Name: "init-config",
87-
Image: "solsson/kafka-initutils@sha256:f6d9850c6c3ad5ecc35e717308fddb47daffbde18eb93e98e031128fe8b899ef",
90+
Image: KafkaInitUtilsImage,
8891
Env: []v1.EnvVar{
8992
{
9093
Name: "NODE_NAME",
@@ -138,7 +141,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
138141
Containers: []v1.Container{
139142
{
140143
Name: kafkaBrokerContainerName,
141-
Image: "solsson/kafka:2.4.1",
144+
Image: KafkaImage,
142145
Env: []v1.EnvVar{
143146
{
144147
Name: "CLASSPATH",
@@ -150,7 +153,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
150153
},
151154
{
152155
Name: "JMX_PORT",
153-
Value: "5555",
156+
Value: strconv.Itoa(kafkaJMXPort),
154157
},
155158
},
156159
Ports: []v1.ContainerPort{
@@ -159,7 +162,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
159162
ContainerPort: kafkaInsidePort,
160163
},
161164
{
162-
Name: "outide",
165+
Name: "outside",
163166
ContainerPort: kafkaOutsidePort,
164167
},
165168
{
@@ -204,6 +207,10 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
204207
Name: "brokerconfig",
205208
MountPath: "/etc/kafka-configmap",
206209
},
210+
{
211+
Name: "brokercerts",
212+
MountPath: "/etc/kafka-certs",
213+
},
207214
{
208215
Name: "config",
209216
MountPath: "/etc/kafka",
@@ -234,6 +241,14 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
234241
},
235242
},
236243
},
244+
{
245+
Name: "brokercerts",
246+
VolumeSource: v1.VolumeSource{
247+
Secret: &v1.SecretVolumeSource{
248+
SecretName: DeploymentName,
249+
},
250+
},
251+
},
237252
{
238253
Name: "brokerlogs",
239254
VolumeSource: v1.VolumeSource{
@@ -262,8 +277,12 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
262277
func NewBrokerService(namespace string) *v1.Service {
263278
ports := []v1.ServicePort{
264279
{
265-
Name: "server",
266-
Port: kafkaInsidePort,
280+
Name: "plaintext",
281+
Port: 9092,
282+
},
283+
{
284+
Name: "ssl",
285+
Port: 9093,
267286
},
268287
}
269288
return factory.NewService(DeploymentName, namespace, kafkaBrokerComponent, ports)
@@ -313,7 +332,27 @@ func NewBrokerConfigMap(namespace string) *v1.ConfigMap {
313332
data := map[string]string{
314333
"init.sh": initKafkaScript,
315334
"server.properties": serverProperties,
335+
"client.properties": clientProperties,
316336
"log4j.properties": log4jProperties,
317337
}
318338
return k8shandler.NewConfigMap(DeploymentName, namespace, data)
319339
}
340+
341+
func NewBrokerSecret(namespace string) *v1.Secret {
342+
rootCA := certificate.NewCA(nil, "Root CA")
343+
intermediateCA := certificate.NewCA(rootCA, "Intermediate CA")
344+
serverCert := certificate.NewCert(intermediateCA, "Server", fmt.Sprintf("%s.%s.svc.cluster.local", DeploymentName, namespace))
345+
346+
data := map[string][]byte{
347+
"server.jks": certificate.JKSKeyStore(serverCert, "server"),
348+
"ca-bundle.jks": certificate.JKSTrustStore([]*certificate.CertKey{rootCA, intermediateCA}, "ca-bundle"),
349+
"ca-bundle.crt": bytes.Join([][]byte{rootCA.CertificatePEM(), intermediateCA.CertificatePEM()}, []byte{}),
350+
}
351+
352+
secret := k8shandler.NewSecret(
353+
DeploymentName,
354+
namespace,
355+
data,
356+
)
357+
return secret
358+
}

0 commit comments

Comments
 (0)