Skip to content

Commit f8830a2

Browse files
authored
Read all certificates from ssl.ca.pem, not just the first one (#4049)
* Read all certificates from ssl.ca.pem, not just the first one Reported in confluentinc/confluent-kafka-go#827 * Fix memory leak when reading DER-encoded certificates
1 parent 3ded8ee commit f8830a2

File tree

5 files changed

+82
-28
lines changed

5 files changed

+82
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ configuration property.
5050

5151
* Windows: couldn't read a PKCS#12 keystore correctly because binary mode wasn't explicitly set and Windows defaults to text mode.
5252
* Fixed memory leak when loading SSL certificates (@Mekk, #3930)
53+
* Load all CA certificates from `ssl.ca.pem`, not just the first one.
5354

5455
### Transactional producer fixes
5556

src/rdkafka_cert.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ static rd_kafka_cert_t *rd_kafka_cert_new(const rd_kafka_conf_t *conf,
253253
X509_free(x509);
254254
goto fail;
255255
}
256+
257+
X509_free(x509);
256258
} break;
257259

258260
case RD_KAFKA_CERT_ENC_PEM: {

src/rdkafka_ssl.c

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,32 +1019,55 @@ static int rd_kafka_ssl_set_certs(rd_kafka_t *rk,
10191019
/* CA as PEM string */
10201020
X509 *x509;
10211021
X509_STORE *store;
1022+
BIO *bio;
1023+
int cnt = 0;
10221024

10231025
/* Get the OpenSSL trust store */
10241026
store = SSL_CTX_get_cert_store(ctx);
10251027
rd_assert(store != NULL);
10261028

10271029
rd_kafka_dbg(rk, SECURITY, "SSL",
1028-
"Loading CA certificate from string");
1030+
"Loading CA certificate(s) from string");
1031+
1032+
bio =
1033+
BIO_new_mem_buf((void *)rk->rk_conf.ssl.ca_pem, -1);
1034+
rd_assert(bio != NULL);
1035+
1036+
/* Add all certificates to cert store */
1037+
while ((x509 = PEM_read_bio_X509(
1038+
bio, NULL, rd_kafka_transport_ssl_passwd_cb,
1039+
rk))) {
1040+
if (!X509_STORE_add_cert(store, x509)) {
1041+
rd_snprintf(errstr, errstr_size,
1042+
"failed to add ssl.ca.pem "
1043+
"certificate "
1044+
"#%d to CA cert store: ",
1045+
cnt);
1046+
X509_free(x509);
1047+
BIO_free(bio);
1048+
return -1;
1049+
}
10291050

1030-
x509 = rd_kafka_ssl_X509_from_string(
1031-
rk, rk->rk_conf.ssl.ca_pem);
1032-
if (!x509) {
1033-
rd_snprintf(errstr, errstr_size,
1034-
"ssl.ca.pem failed: "
1035-
"not in PEM format?: ");
1036-
return -1;
1051+
X509_free(x509);
1052+
cnt++;
10371053
}
10381054

1039-
if (!X509_STORE_add_cert(store, x509)) {
1055+
if (!BIO_eof(bio) || !cnt) {
10401056
rd_snprintf(errstr, errstr_size,
1041-
"failed to add ssl.ca.pem to "
1042-
"CA cert store: ");
1043-
X509_free(x509);
1057+
"failed to read certificate #%d "
1058+
"from ssl.ca.pem: "
1059+
"not in PEM format?: ",
1060+
cnt);
1061+
BIO_free(bio);
10441062
return -1;
10451063
}
10461064

1047-
X509_free(x509);
1065+
BIO_free(bio);
1066+
1067+
rd_kafka_dbg(rk, SECURITY, "SSL",
1068+
"Loaded %d CA certificate(s) from string",
1069+
cnt);
1070+
10481071

10491072
ca_probe = rd_false;
10501073
}

tests/0097-ssl_verify.cpp

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ static const std::string envname[RdKafka::CERT__CNT][RdKafka::CERT_ENC__CNT] = {
5151
{
5252
"SSL_pkcs",
5353
"SSL_ca_der",
54-
"SSL_ca_pem",
54+
"SSL_all_cas_pem" /* Contains multiple CA certs */,
5555
}};
5656

5757

@@ -118,26 +118,45 @@ class TestVerifyCb : public RdKafka::SslCertificateVerifyCb {
118118
};
119119

120120

121+
/**
122+
* @brief Set SSL PEM cert/key using configuration property.
123+
*
124+
* The cert/key is loadded from environment variables set up by trivup.
125+
*
126+
* @param loc_prop ssl.X.location property that will be cleared.
127+
* @param pem_prop ssl.X.pem property that will be set.
128+
* @param cert_type Certificate type.
129+
*/
121130
static void conf_location_to_pem(RdKafka::Conf *conf,
122131
std::string loc_prop,
123-
std::string pem_prop) {
132+
std::string pem_prop,
133+
RdKafka::CertificateType cert_type) {
124134
std::string loc;
125135

126-
127-
if (conf->get(loc_prop, loc) != RdKafka::Conf::CONF_OK)
128-
Test::Fail("Failed to get " + loc_prop);
129-
130136
std::string errstr;
131137
if (conf->set(loc_prop, "", errstr) != RdKafka::Conf::CONF_OK)
132138
Test::Fail("Failed to reset " + loc_prop + ": " + errstr);
133139

140+
const char *p;
141+
p = test_getenv(envname[cert_type][RdKafka::CERT_ENC_PEM].c_str(), NULL);
142+
if (!p)
143+
Test::Fail(
144+
"Invalid test environment: "
145+
"Missing " +
146+
envname[cert_type][RdKafka::CERT_ENC_PEM] +
147+
" env variable: make sure trivup is up to date");
148+
149+
loc = p;
150+
151+
134152
/* Read file */
135153
std::ifstream ifs(loc.c_str());
136154
std::string pem((std::istreambuf_iterator<char>(ifs)),
137155
std::istreambuf_iterator<char>());
138156

139-
Test::Say("Read " + loc_prop + "=" + loc +
140-
" from disk and changed to in-memory " + pem_prop + "\n");
157+
Test::Say("Read env " + envname[cert_type][RdKafka::CERT_ENC_PEM] + "=" +
158+
loc + " from disk and changed to in-memory " + pem_prop +
159+
" string\n");
141160

142161
if (conf->set(pem_prop, pem, errstr) != RdKafka::Conf::CONF_OK)
143162
Test::Fail("Failed to set " + pem_prop + ": " + errstr);
@@ -178,7 +197,8 @@ static void conf_location_to_setter(RdKafka::Conf *conf,
178197
loc = p;
179198

180199
Test::Say(tostr() << "Reading " << loc_prop << " file " << loc << " as "
181-
<< encnames[encoding] << "\n");
200+
<< encnames[encoding] << " from env "
201+
<< envname[cert_type][encoding] << "\n");
182202

183203
/* Read file */
184204
std::ifstream ifs(loc.c_str(), std::ios::binary | std::ios::ate);
@@ -200,8 +220,8 @@ static void conf_location_to_setter(RdKafka::Conf *conf,
200220

201221

202222
typedef enum {
203-
USE_LOCATION, /* use ssl.key.location */
204-
USE_CONF, /* use ssl.key.pem */
223+
USE_LOCATION, /* use ssl.X.location */
224+
USE_CONF, /* use ssl.X.pem */
205225
USE_SETTER, /* use conf->set_ssl_cert(), this supports multiple formats */
206226
} cert_load_t;
207227

@@ -245,20 +265,22 @@ static void do_test_verify(const int line,
245265
/* Get ssl.key.location, read its contents, and replace with
246266
* ssl.key.pem. Same with ssl.certificate.location -> ssl.certificate.pem. */
247267
if (load_key == USE_CONF)
248-
conf_location_to_pem(conf, "ssl.key.location", "ssl.key.pem");
268+
conf_location_to_pem(conf, "ssl.key.location", "ssl.key.pem",
269+
RdKafka::CERT_PRIVATE_KEY);
249270
else if (load_key == USE_SETTER)
250271
conf_location_to_setter(conf, "ssl.key.location", RdKafka::CERT_PRIVATE_KEY,
251272
key_enc);
252273

253274
if (load_pub == USE_CONF)
254275
conf_location_to_pem(conf, "ssl.certificate.location",
255-
"ssl.certificate.pem");
276+
"ssl.certificate.pem", RdKafka::CERT_PUBLIC_KEY);
256277
else if (load_pub == USE_SETTER)
257278
conf_location_to_setter(conf, "ssl.certificate.location",
258279
RdKafka::CERT_PUBLIC_KEY, pub_enc);
259280

260281
if (load_ca == USE_CONF)
261-
conf_location_to_pem(conf, "ssl.ca.location", "ssl.ca.pem");
282+
conf_location_to_pem(conf, "ssl.ca.location", "ssl.ca.pem",
283+
RdKafka::CERT_CA);
262284
else if (load_ca == USE_SETTER)
263285
conf_location_to_setter(conf, "ssl.ca.location", RdKafka::CERT_CA, ca_enc);
264286

@@ -391,6 +413,12 @@ int main_0097_ssl_verify(int argc, char **argv) {
391413
do_test_verify(__LINE__, true, USE_LOCATION, RdKafka::CERT_ENC_PEM,
392414
USE_SETTER, RdKafka::CERT_ENC_DER, USE_SETTER,
393415
RdKafka::CERT_ENC_DER);
416+
do_test_verify(__LINE__, true, USE_LOCATION, RdKafka::CERT_ENC_PEM,
417+
USE_SETTER, RdKafka::CERT_ENC_DER, USE_SETTER,
418+
RdKafka::CERT_ENC_PEM); /* env: SSL_all_cas_pem */
419+
do_test_verify(__LINE__, true, USE_LOCATION, RdKafka::CERT_ENC_PEM,
420+
USE_SETTER, RdKafka::CERT_ENC_DER, USE_CONF,
421+
RdKafka::CERT_ENC_PEM); /* env: SSL_all_cas_pem */
394422
do_test_verify(__LINE__, true, USE_SETTER, RdKafka::CERT_ENC_PKCS12,
395423
USE_SETTER, RdKafka::CERT_ENC_PKCS12, USE_SETTER,
396424
RdKafka::CERT_ENC_PKCS12);

tests/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
trivup >= 0.11.0
1+
trivup >= 0.12.0
22
jsoncomment

0 commit comments

Comments
 (0)