Skip to content

Bug 1904380 - Forwarding logs to Kafka using Chained certificates fails with error "state=error: certificate verify failed (unable to get local issuer certificate) #936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/openshift/api v0.0.0-20200602204738-768b7001fe69
github.com/openshift/elasticsearch-operator v0.0.0-20200722044541-14fae5dcddfd
github.com/operator-framework/operator-sdk v0.18.1
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0
github.com/prometheus/procfs v0.0.10 // indirect
github.com/rogpeppe/go-internal v1.5.2 // indirect
github.com/spf13/pflag v1.0.5
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,10 @@ github.com/otiai10/mint v1.3.1 h1:BCmzIS3n71sGfHB5NMNDB3lHYPz8fWSkCAErHed//qc=
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pavel-v-chernykh/keystore-go v1.0.0 h1:q7+IYDgDRi7uutqeqndr838MAw5t8Gp1i5GpXwqMTxs=
github.com/pavel-v-chernykh/keystore-go v2.1.0+incompatible h1:Jd6xfriVlJ6hWPvYOE0Ni0QWcNTLRehfGPFxr3eSL80=
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0 h1:xKxUVGoB9VJU+lgQLPN0KURjw+XCVVSpHfQEeyxk3zo=
github.com/pavel-v-chernykh/keystore-go/v4 v4.1.0/go.mod h1:2ejgys4qY+iNVW1IittZhyRYA6MNv8TgM6VHqojbB9g=
github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down Expand Up @@ -1005,6 +1009,7 @@ golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20191202143827-86a70503ff7e/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200220183623-bac4c82f6975/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/logforwarding/fluent/fluent_secure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ var _ = Describe("[ClusterLogForwarder]", func() {
f = NewFixture(c.NS.Name, secureMessage)

// Receiver acts as TLS server.
privateCA = certificate.NewCA(nil)
serverCert = certificate.NewCert(privateCA, f.Receiver.Host()) // Receiver is server.
clientCert = certificate.NewCert(privateCA)
privateCA = certificate.NewCA(nil, "Root CA")
serverCert = certificate.NewCert(privateCA, "Server", f.Receiver.Host()) // Receiver is server.
clientCert = certificate.NewCert(privateCA, "Client")
sharedKey = "top-secret"
// The Receiver Sources act as TLS servers.
for i, s := range []*fluentd.Source{
Expand Down
19 changes: 15 additions & 4 deletions test/e2e/logforwarding/kafka/forward_to_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/ViaQ/logerr/log"
loggingv1 "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
v1 "github.com/openshift/cluster-logging-operator/pkg/apis/logging/v1"
"github.com/openshift/cluster-logging-operator/test/helpers"
"github.com/openshift/cluster-logging-operator/test/helpers/kafka"
apps "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -63,6 +62,9 @@ var _ = Describe("[ClusterLogForwarder] Forwards logs", func() {
e2e.LogStores[app.Name].ClusterLocalEndpoint(),
kafka.DefaultTopic,
),
Secret: &loggingv1.OutputSecretSpec{
Name: kafka.DeploymentName,
},
},
},
Pipelines: []loggingv1.PipelineSpec{
Expand Down Expand Up @@ -152,30 +154,39 @@ var _ = Describe("[ClusterLogForwarder] Forwards logs", func() {
Type: loggingv1.OutputTypeKafka,
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
OutputTypeSpec: loggingv1.OutputTypeSpec{
Kafka: &v1.Kafka{
Kafka: &loggingv1.Kafka{
Topic: kafka.AppLogsTopic,
},
},
Secret: &loggingv1.OutputSecretSpec{
Name: kafka.DeploymentName,
},
},
{
Name: fmt.Sprintf("%s-audit-out", kafka.DeploymentName),
Type: loggingv1.OutputTypeKafka,
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
OutputTypeSpec: loggingv1.OutputTypeSpec{
Kafka: &v1.Kafka{
Kafka: &loggingv1.Kafka{
Topic: kafka.AuditLogsTopic,
},
},
Secret: &loggingv1.OutputSecretSpec{
Name: kafka.DeploymentName,
},
},
{
Name: fmt.Sprintf("%s-infra-out", kafka.DeploymentName),
Type: loggingv1.OutputTypeKafka,
URL: fmt.Sprintf("tls://%s", e2e.LogStores[app.Name].ClusterLocalEndpoint()),
OutputTypeSpec: loggingv1.OutputTypeSpec{
Kafka: &v1.Kafka{
Kafka: &loggingv1.Kafka{
Topic: kafka.InfraLogsTopic,
},
},
Secret: &loggingv1.OutputSecretSpec{
Name: kafka.DeploymentName,
},
},
},
Pipelines: []loggingv1.PipelineSpec{
Expand Down
12 changes: 6 additions & 6 deletions test/helpers/certificate/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ func (ck *CertKey) StartServer(handler http.Handler, clientCA *CertKey) *httptes
}

// NewCA creates a new dummy CA cert signed by signer, or self-signed if signer is nil.
func NewCA(signer *CertKey) *CertKey {
func NewCA(signer *CertKey, org string) *CertKey {
return New(&x509.Certificate{
SerialNumber: big.NewInt(1234),
Subject: pkix.Name{
Organization: []string{"Testing, INC."},
Organization: []string{org},
// Country: []string{"US"},
// Province: []string{""},
// Locality: []string{"San Francisco"},
Expand All @@ -118,7 +118,7 @@ func NewCA(signer *CertKey) *CertKey {
// NewCert creates a dummy server cert signed by signer, or self-signed if signer is nil.
// The addrs list can contain strings (DNS names) or net.IP addresses, if addrs
// is empty will use "localhost", v4 and v6 loopback
func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
func NewCert(signer *CertKey, org string, addrs ...interface{}) *CertKey {
var (
dns []string
ips []net.IP
Expand All @@ -138,7 +138,7 @@ func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
}
return New(&x509.Certificate{
SerialNumber: big.NewInt(1234),
Subject: pkix.Name{Organization: []string{"Testing, INC."}},
Subject: pkix.Name{Organization: []string{org}},
NotBefore: time.Now(),
NotAfter: time.Now().AddDate(10, 0, 0),
SubjectKeyId: []byte{1, 2, 3, 4, 6},
Expand All @@ -151,7 +151,7 @@ func NewCert(signer *CertKey, addrs ...interface{}) *CertKey {
}

// NewServer creates a dummy client cert signed by signer, or self-signed if signer is nil.
func NewClient(signer *CertKey) *CertKey {
func NewClient(signer *CertKey, org string) *CertKey {
// Its the same as a server cert but with no DNS/IP addrs.
return NewCert(signer)
return NewCert(signer, org)
}
54 changes: 54 additions & 0 deletions test/helpers/certificate/certificate_jks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package certificate

import (
"bytes"
"crypto/x509"
"strconv"
"time"

"github.com/openshift/cluster-logging-operator/test"
"github.com/pavel-v-chernykh/keystore-go/v4"
)

func JKSKeyStore(certKey *CertKey, password string) []byte {
ks := keystore.New()
keyPKCS8Bytes, err := x509.MarshalPKCS8PrivateKey(certKey.PrivateKey)
test.Must(err)
pke := keystore.PrivateKeyEntry{
CreationTime: time.Now(),
PrivateKey: keyPKCS8Bytes,
CertificateChain: []keystore.Certificate{
{
Type: "X509",
Content: certKey.DERBytes,
},
},
}
err = ks.SetPrivateKeyEntry("alias", pke, []byte(password))
test.Must(err)
var buf bytes.Buffer
err = ks.Store(&buf, []byte(password))
test.Must(err)
return buf.Bytes()
}

func JKSTrustStore(certKeys []*CertKey, password string) []byte {
ks := keystore.New()
for i := range certKeys {
err := ks.SetTrustedCertificateEntry(
strconv.Itoa(i),
keystore.TrustedCertificateEntry{
CreationTime: time.Now(),
Certificate: keystore.Certificate{
Type: "X509",
Content: certKeys[i].DERBytes,
},
},
)
test.Must(err)
}
var buf bytes.Buffer
err := ks.Store(&buf, []byte(password))
test.Must(err)
return buf.Bytes()
}
10 changes: 5 additions & 5 deletions test/helpers/certificate/certificate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
var _ = Describe("Certificate", func() {

It("enables client-server TLS connection with local CA", func() {
ca := certificate.NewCA(nil) // Self-signed CA
cert := certificate.NewCert(ca, "localhost", net.IPv4(127, 0, 0, 1), net.IPv6loopback)
ca := certificate.NewCA(nil, "Root CA") // Self-signed CA
cert := certificate.NewCert(ca, "Server", "localhost", net.IPv4(127, 0, 0, 1), net.IPv6loopback)
server := cert.StartServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "success!") }), nil)
defer server.Close()
Expand All @@ -34,13 +34,13 @@ var _ = Describe("Certificate", func() {
})

It("enables TLS mutual authentication", func() {
ca := certificate.NewCA(nil) // Self-signed CA
serverCert := certificate.NewCert(ca)
ca := certificate.NewCA(nil, "Root CA") // Self-signed CA
serverCert := certificate.NewCert(ca, "Server")
server := serverCert.StartServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, "success!") }), ca)
defer server.Close()

clientCert := certificate.NewClient(ca)
clientCert := certificate.NewClient(ca, "Client")
client := ca.Client(clientCert)
resp, err := client.Get(server.URL)
ExpectOK(err)
Expand Down
23 changes: 23 additions & 0 deletions test/helpers/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (tc *E2ETestFramework) createKafkaBroker() (*apps.StatefulSet, error) {
return nil, err
}

if err := tc.createKafkaBrokerSecret(); err != nil {
return nil, err
}

if err := tc.createKafkaBrokerService(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -357,6 +361,25 @@ func (tc *E2ETestFramework) createKafkaBrokerConfigMap() error {
return nil
}

func (tc *E2ETestFramework) createKafkaBrokerSecret() error {
s := kafka.NewBrokerSecret(OpenshiftLoggingNS)

tc.AddCleanup(func() error {
var zerograce int64
opts := metav1.DeleteOptions{
GracePeriodSeconds: &zerograce,
}
return tc.KubeClient.CoreV1().Secrets(OpenshiftLoggingNS).Delete(context.TODO(), s.GetName(), opts)
})

opts := metav1.CreateOptions{}
if _, err := tc.KubeClient.CoreV1().Secrets(OpenshiftLoggingNS).Create(context.TODO(), s, opts); err != nil {
return err
}

return nil
}

func (tc *E2ETestFramework) createZookeeperConfigMap() error {
cm := kafka.NewZookeeperConfigMap(OpenshiftLoggingNS)

Expand Down
53 changes: 46 additions & 7 deletions test/helpers/kafka/broker.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package kafka

import (
"bytes"
"fmt"
"strconv"

"github.com/openshift/cluster-logging-operator/pkg/factory"
"github.com/openshift/cluster-logging-operator/pkg/k8shandler"
"github.com/openshift/cluster-logging-operator/test/helpers/certificate"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -20,7 +23,7 @@ const (
kafkaBrokerProvider = "openshift"
kafkaNodeReader = "kafka-node-reader"
kafkaNodeReaderBinding = "kafka-node-reader-binding"
kafkaInsidePort = 9092
kafkaInsidePort = 9093
kafkaOutsidePort = 9094
kafkaJMXPort = 5555
)
Expand Down Expand Up @@ -84,7 +87,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
InitContainers: []v1.Container{
{
Name: "init-config",
Image: "solsson/kafka-initutils@sha256:f6d9850c6c3ad5ecc35e717308fddb47daffbde18eb93e98e031128fe8b899ef",
Image: KafkaInitUtilsImage,
Env: []v1.EnvVar{
{
Name: "NODE_NAME",
Expand Down Expand Up @@ -138,7 +141,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
Containers: []v1.Container{
{
Name: kafkaBrokerContainerName,
Image: "solsson/kafka:2.4.1",
Image: KafkaImage,
Env: []v1.EnvVar{
{
Name: "CLASSPATH",
Expand All @@ -150,7 +153,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
},
{
Name: "JMX_PORT",
Value: "5555",
Value: strconv.Itoa(kafkaJMXPort),
},
},
Ports: []v1.ContainerPort{
Expand All @@ -159,7 +162,7 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
ContainerPort: kafkaInsidePort,
},
{
Name: "outide",
Name: "outside",
ContainerPort: kafkaOutsidePort,
},
{
Expand Down Expand Up @@ -204,6 +207,10 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
Name: "brokerconfig",
MountPath: "/etc/kafka-configmap",
},
{
Name: "brokercerts",
MountPath: "/etc/kafka-certs",
},
{
Name: "config",
MountPath: "/etc/kafka",
Expand Down Expand Up @@ -234,6 +241,14 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
},
},
},
{
Name: "brokercerts",
VolumeSource: v1.VolumeSource{
Secret: &v1.SecretVolumeSource{
SecretName: DeploymentName,
},
},
},
{
Name: "brokerlogs",
VolumeSource: v1.VolumeSource{
Expand Down Expand Up @@ -262,8 +277,12 @@ func NewBrokerStatefuleSet(namespace string) *apps.StatefulSet {
func NewBrokerService(namespace string) *v1.Service {
ports := []v1.ServicePort{
{
Name: "server",
Port: kafkaInsidePort,
Name: "plaintext",
Port: 9092,
},
{
Name: "ssl",
Port: 9093,
},
}
return factory.NewService(DeploymentName, namespace, kafkaBrokerComponent, ports)
Expand Down Expand Up @@ -313,7 +332,27 @@ func NewBrokerConfigMap(namespace string) *v1.ConfigMap {
data := map[string]string{
"init.sh": initKafkaScript,
"server.properties": serverProperties,
"client.properties": clientProperties,
"log4j.properties": log4jProperties,
}
return k8shandler.NewConfigMap(DeploymentName, namespace, data)
}

func NewBrokerSecret(namespace string) *v1.Secret {
rootCA := certificate.NewCA(nil, "Root CA")
intermediateCA := certificate.NewCA(rootCA, "Intermediate CA")
serverCert := certificate.NewCert(intermediateCA, "Server", fmt.Sprintf("%s.%s.svc.cluster.local", DeploymentName, namespace))

data := map[string][]byte{
"server.jks": certificate.JKSKeyStore(serverCert, "server"),
"ca-bundle.jks": certificate.JKSTrustStore([]*certificate.CertKey{rootCA, intermediateCA}, "ca-bundle"),
"ca-bundle.crt": bytes.Join([][]byte{rootCA.CertificatePEM(), intermediateCA.CertificatePEM()}, []byte{}),
}

secret := k8shandler.NewSecret(
DeploymentName,
namespace,
data,
)
return secret
}
Loading