Skip to content

[release-4.6] Bug 1904380: Forwarding logs to Kafka using Chained certificates fails with error "state=error: certificate verify failed (unable to get local issuer certificate) #2083

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions fluentd/fluent-plugin-kafka.source0002.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
diff --git a/lib/fluent/plugin/in_kafka.rb b/lib/fluent/plugin/in_kafka.rb
index 4cbf3f2b..1019a77f 100644
--- a/lib/fluent/plugin/in_kafka.rb
+++ b/lib/fluent/plugin/in_kafka.rb
@@ -185,17 +185,17 @@ class Fluent::KafkaInput < Fluent::Input

logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
sasl_over_ssl: @sasl_over_ssl)
else
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
end
diff --git a/lib/fluent/plugin/in_kafka_group.rb b/lib/fluent/plugin/in_kafka_group.rb
index 086783ea..c2c911fa 100644
--- a/lib/fluent/plugin/in_kafka_group.rb
+++ b/lib/fluent/plugin/in_kafka_group.rb
@@ -165,17 +165,17 @@ class Fluent::KafkaGroupInput < Fluent::Input

logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
else
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
ssl_verify_hostname: @ssl_verify_hostname)
diff --git a/lib/fluent/plugin/kafka_plugin_util.rb b/lib/fluent/plugin/kafka_plugin_util.rb
index 16951119..aa187c3b 100644
--- a/lib/fluent/plugin/kafka_plugin_util.rb
+++ b/lib/fluent/plugin/kafka_plugin_util.rb
@@ -32,8 +32,12 @@ module Fluent
end
end

+ def invalid_path?(path)
+ path.nil? || path.respond_to?(:strip) && path.strip.empty?
+ end
+
def read_ssl_file(path)
- return nil if path.nil? || path.respond_to?(:strip) && path.strip.empty?
+ return nil if invalid_path?(path)

if path.is_a?(Array)
path.map { |fp| File.read(fp) }
@@ -42,6 +46,13 @@ module Fluent
end
end

+ END_CERTIFICATE = "\n-----END CERTIFICATE-----\n"
+
+ def read_ssl_ca_certs
+ return nil if @ssl_ca_cert.nil?
+ @ssl_ca_cert.reject { |p| invalid_path?(p) }.flat_map { |c| File.read(c).split(END_CERTIFICATE).map { |c| c += END_CERTIFICATE} }
+ end
+
def pickup_ssl_endpoint(node)
ssl_endpoint = node['endpoints'].find {|e| e.start_with?('SSL')}
raise 'no SSL endpoint found on Zookeeper' unless ssl_endpoint
diff --git a/lib/fluent/plugin/out_kafka.rb b/lib/fluent/plugin/out_kafka.rb
index c4710d51..2ce7704e 100644
--- a/lib/fluent/plugin/out_kafka.rb
+++ b/lib/fluent/plugin/out_kafka.rb
@@ -106,16 +106,16 @@ DESC
begin
if @seed_brokers.length > 0
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
else
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
end
diff --git a/lib/fluent/plugin/out_kafka2.rb b/lib/fluent/plugin/out_kafka2.rb
index 8ed52db9..08dd18d9 100644
--- a/lib/fluent/plugin/out_kafka2.rb
+++ b/lib/fluent/plugin/out_kafka2.rb
@@ -95,17 +95,17 @@ DESC
begin
logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname)
else
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname)
diff --git a/lib/fluent/plugin/out_kafka_buffered.rb b/lib/fluent/plugin/out_kafka_buffered.rb
index a26b0c3a..d6d0681b 100644
--- a/lib/fluent/plugin/out_kafka_buffered.rb
+++ b/lib/fluent/plugin/out_kafka_buffered.rb
@@ -130,16 +130,16 @@ DESC
if @seed_brokers.length > 0
logger = @get_kafka_client_log ? log : nil
if @scram_mechanism != nil && @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
ssl_verify_hostname: @ssl_verify_hostname)
elsif @username != nil && @password != nil
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
else
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
end