Skip to content

Commit f23e9dd

Browse files
Merge pull request #2068 from syedriko/bz_1904380
Bug 1904380 - Forwarding logs to Kafka using Chained certificates fals with error "state=error: certificate verify failed (unable to get local issuer certificate)
2 parents 2f207cc + 9850289 commit f23e9dd

File tree

1 file changed

+155
-0
lines changed

1 file changed

+155
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
diff --git a/lib/fluent/plugin/in_kafka.rb b/lib/fluent/plugin/in_kafka.rb
2+
index 4cbf3f2b..1019a77f 100644
3+
--- a/lib/fluent/plugin/in_kafka.rb
4+
+++ b/lib/fluent/plugin/in_kafka.rb
5+
@@ -185,17 +185,17 @@ class Fluent::KafkaInput < Fluent::Input
6+
7+
logger = @get_kafka_client_log ? log : nil
8+
if @scram_mechanism != nil && @username != nil && @password != nil
9+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
10+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
11+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
12+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
13+
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl)
14+
elsif @username != nil && @password != nil
15+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
16+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
17+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
18+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system,sasl_plain_username: @username, sasl_plain_password: @password,
19+
sasl_over_ssl: @sasl_over_ssl)
20+
else
21+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
22+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
23+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
24+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab)
25+
end
26+
diff --git a/lib/fluent/plugin/in_kafka_group.rb b/lib/fluent/plugin/in_kafka_group.rb
27+
index 086783ea..c2c911fa 100644
28+
--- a/lib/fluent/plugin/in_kafka_group.rb
29+
+++ b/lib/fluent/plugin/in_kafka_group.rb
30+
@@ -165,17 +165,17 @@ class Fluent::KafkaGroupInput < Fluent::Input
31+
32+
logger = @get_kafka_client_log ? log : nil
33+
if @scram_mechanism != nil && @username != nil && @password != nil
34+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
35+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
36+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
37+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
38+
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
39+
elsif @username != nil && @password != nil
40+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
41+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
42+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
43+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password,
44+
sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
45+
else
46+
- @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
47+
+ @kafka = Kafka.new(seed_brokers: @brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
48+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key),
49+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab,
50+
ssl_verify_hostname: @ssl_verify_hostname)
51+
diff --git a/lib/fluent/plugin/kafka_plugin_util.rb b/lib/fluent/plugin/kafka_plugin_util.rb
52+
index 16951119..aa187c3b 100644
53+
--- a/lib/fluent/plugin/kafka_plugin_util.rb
54+
+++ b/lib/fluent/plugin/kafka_plugin_util.rb
55+
@@ -32,8 +32,12 @@ module Fluent
56+
end
57+
end
58+
59+
+ def invalid_path?(path)
60+
+ path.nil? || path.respond_to?(:strip) && path.strip.empty?
61+
+ end
62+
+
63+
def read_ssl_file(path)
64+
- return nil if path.nil? || path.respond_to?(:strip) && path.strip.empty?
65+
+ return nil if invalid_path?(path)
66+
67+
if path.is_a?(Array)
68+
path.map { |fp| File.read(fp) }
69+
@@ -42,6 +46,13 @@ module Fluent
70+
end
71+
end
72+
73+
+ END_CERTIFICATE = "\n-----END CERTIFICATE-----\n"
74+
+
75+
+ def read_ssl_ca_certs
76+
+ return nil if @ssl_ca_cert.nil?
77+
+ @ssl_ca_cert.reject { |p| invalid_path?(p) }.flat_map { |c| File.read(c).split(END_CERTIFICATE).map { |c| c += END_CERTIFICATE} }
78+
+ end
79+
+
80+
def pickup_ssl_endpoint(node)
81+
ssl_endpoint = node['endpoints'].find {|e| e.start_with?('SSL')}
82+
raise 'no SSL endpoint found on Zookeeper' unless ssl_endpoint
83+
diff --git a/lib/fluent/plugin/out_kafka.rb b/lib/fluent/plugin/out_kafka.rb
84+
index c4710d51..2ce7704e 100644
85+
--- a/lib/fluent/plugin/out_kafka.rb
86+
+++ b/lib/fluent/plugin/out_kafka.rb
87+
@@ -106,16 +106,16 @@ DESC
88+
begin
89+
if @seed_brokers.length > 0
90+
if @scram_mechanism != nil && @username != nil && @password != nil
91+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
92+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
93+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
94+
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
95+
ssl_verify_hostname: @ssl_verify_hostname)
96+
elsif @username != nil && @password != nil
97+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
98+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
99+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
100+
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
101+
else
102+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
103+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, ssl_ca_cert: read_ssl_ca_certs(),
104+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
105+
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
106+
end
107+
diff --git a/lib/fluent/plugin/out_kafka2.rb b/lib/fluent/plugin/out_kafka2.rb
108+
index 8ed52db9..08dd18d9 100644
109+
--- a/lib/fluent/plugin/out_kafka2.rb
110+
+++ b/lib/fluent/plugin/out_kafka2.rb
111+
@@ -95,17 +95,17 @@ DESC
112+
begin
113+
logger = @get_kafka_client_log ? log : nil
114+
if @scram_mechanism != nil && @username != nil && @password != nil
115+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
116+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
117+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
118+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_scram_username: @username, sasl_scram_password: @password,
119+
sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
120+
elsif @username != nil && @password != nil
121+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
122+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
123+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
124+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl,
125+
ssl_verify_hostname: @ssl_verify_hostname)
126+
else
127+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
128+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, connect_timeout: @connect_timeout, socket_timeout: @socket_timeout, ssl_ca_cert: read_ssl_ca_certs(),
129+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_client_cert_chain: read_ssl_file(@ssl_client_cert_chain),
130+
ssl_ca_certs_from_system: @ssl_ca_certs_from_system, sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl,
131+
ssl_verify_hostname: @ssl_verify_hostname)
132+
diff --git a/lib/fluent/plugin/out_kafka_buffered.rb b/lib/fluent/plugin/out_kafka_buffered.rb
133+
index a26b0c3a..d6d0681b 100644
134+
--- a/lib/fluent/plugin/out_kafka_buffered.rb
135+
+++ b/lib/fluent/plugin/out_kafka_buffered.rb
136+
@@ -130,16 +130,16 @@ DESC
137+
if @seed_brokers.length > 0
138+
logger = @get_kafka_client_log ? log : nil
139+
if @scram_mechanism != nil && @username != nil && @password != nil
140+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
141+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
142+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
143+
sasl_scram_username: @username, sasl_scram_password: @password, sasl_scram_mechanism: @scram_mechanism, sasl_over_ssl: @sasl_over_ssl,
144+
ssl_verify_hostname: @ssl_verify_hostname)
145+
elsif @username != nil && @password != nil
146+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
147+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
148+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
149+
sasl_plain_username: @username, sasl_plain_password: @password, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
150+
else
151+
- @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_file(@ssl_ca_cert),
152+
+ @kafka = Kafka.new(seed_brokers: @seed_brokers, client_id: @client_id, logger: logger, ssl_ca_cert: read_ssl_ca_certs(),
153+
ssl_client_cert: read_ssl_file(@ssl_client_cert), ssl_client_cert_key: read_ssl_file(@ssl_client_cert_key), ssl_ca_certs_from_system: @ssl_ca_certs_from_system,
154+
sasl_gssapi_principal: @principal, sasl_gssapi_keytab: @keytab, sasl_over_ssl: @sasl_over_ssl, ssl_verify_hostname: @ssl_verify_hostname)
155+
end

0 commit comments

Comments
 (0)