Skip to content

Commit 3b50e1e

Browse files
Added rd_kafka_sasl_set_credentials() (#4033)
Co-authored-by: Jos Visser <[email protected]> Co-authored-by: edenhill <[email protected]> Co-authored-by: edenhill <[email protected]>
1 parent 42e530c commit 3b50e1e

14 files changed

+288
-16
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ configuration property.
4242
* Bundled zlib upgraded to version 1.2.13.
4343
* Added `on_broker_state_change()` interceptor
4444
* The C++ API no longer returns strings by const value, which enables better move optimization in callers.
45+
* Added `rd_kafka_sasl_set_credentials()` API to update SASL credentials.
4546
* Setting `allow.auto.create.topics` will no longer give a warning if used by a producer, since that is an expected use case.
4647
Improvement in documentation for this property.
4748

@@ -65,7 +66,6 @@ configuration property.
6566

6667
* Back-off and retry JoinGroup request if coordinator load is in progress.
6768

68-
6969
# librdkafka v1.9.2
7070

7171
librdkafka v1.9.2 is a maintenance release:

src-cpp/rdkafkacpp.h

+17
Original file line numberDiff line numberDiff line change
@@ -1899,6 +1899,23 @@ class RD_EXPORT Handle {
18991899
* that explicitly mention using this function for freeing.
19001900
*/
19011901
virtual void mem_free(void *ptr) = 0;
1902+
1903+
/**
1904+
* @brief Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by
1905+
* this Kafka client.
1906+
*
1907+
* This function sets or resets the SASL username and password credentials
1908+
* used by this Kafka client. The new credentials will be used the next time
1909+
* this client needs to authenticate to a broker.
1910+
* will not disconnect existing connections that might have been made using
1911+
* the old credentials.
1912+
*
1913+
* @remark This function only applies to the SASL PLAIN and SCRAM mechanisms.
1914+
*
1915+
* @returns NULL on success or an error object on error.
1916+
*/
1917+
virtual Error *sasl_set_credentials(const std::string &username,
1918+
const std::string &password) = 0;
19021919
};
19031920

19041921

src-cpp/rdkafkacpp_int.h

+11
Original file line numberDiff line numberDiff line change
@@ -1144,6 +1144,17 @@ class HandleImpl : virtual public Handle {
11441144
return NULL;
11451145
}
11461146

1147+
Error *sasl_set_credentials(const std::string &username,
1148+
const std::string &password) {
1149+
rd_kafka_error_t *c_error =
1150+
rd_kafka_sasl_set_credentials(rk_, username.c_str(), password.c_str());
1151+
1152+
if (c_error)
1153+
return new ErrorImpl(c_error);
1154+
1155+
return NULL;
1156+
};
1157+
11471158
void *mem_malloc(size_t size) {
11481159
return rd_kafka_mem_malloc(rk_, size);
11491160
}

src/rdkafka.c

+2
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,7 @@ void rd_kafka_destroy_final(rd_kafka_t *rk) {
995995
rd_kafka_anyconf_destroy(_RK_GLOBAL, &rk->rk_conf);
996996
rd_list_destroy(&rk->rk_broker_by_id);
997997

998+
mtx_destroy(&rk->rk_conf.sasl.lock);
998999
rwlock_destroy(&rk->rk_lock);
9991000

10001001
rd_free(rk);
@@ -2205,6 +2206,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
22052206
rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
22062207

22072208
rwlock_init(&rk->rk_lock);
2209+
mtx_init(&rk->rk_conf.sasl.lock, mtx_plain);
22082210
mtx_init(&rk->rk_internal_rkb_lock, mtx_plain);
22092211

22102212
cnd_init(&rk->rk_broker_state_change_cnd);

src/rdkafka.h

+19
Original file line numberDiff line numberDiff line change
@@ -3331,6 +3331,25 @@ RD_EXPORT
33313331
rd_kafka_error_t *rd_kafka_sasl_background_callbacks_enable(rd_kafka_t *rk);
33323332

33333333

3334+
/**
3335+
* @brief Sets SASL credentials used for SASL PLAIN and SCRAM mechanisms by
3336+
* this Kafka client.
3337+
*
3338+
* This function sets or resets the SASL username and password credentials
3339+
* used by this Kafka client. The new credentials will be used the next time
3340+
* this client needs to authenticate to a broker. This function
3341+
* will not disconnect existing connections that might have been made using
3342+
* the old credentials.
3343+
*
3344+
* @remark This function only applies to the SASL PLAIN and SCRAM mechanisms.
3345+
*
3346+
* @returns NULL on success or an error object on error.
3347+
*/
3348+
RD_EXPORT
3349+
rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
3350+
const char *username,
3351+
const char *password);
3352+
33343353
/**
33353354
* @returns a reference to the librdkafka consumer queue.
33363355
* This is the queue served by rd_kafka_consumer_poll().

src/rdkafka_conf.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ typedef enum {
159159

160160
/* Increase in steps of 64 as needed.
161161
* This must be larger than sizeof(rd_kafka_[topic_]conf_t) */
162-
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 31)
162+
#define RD_KAFKA_CONF_PROPS_IDX_MAX (64 * 33)
163163

164164
/**
165165
* @struct rd_kafka_anyconf_t
@@ -276,6 +276,9 @@ struct rd_kafka_conf_s {
276276
char *kinit_cmd;
277277
char *keytab;
278278
int relogin_min_time;
279+
/** Protects .username and .password access after client
280+
* instance has been created (see sasl_set_credentials()). */
281+
mtx_t lock;
279282
char *username;
280283
char *password;
281284
#if WITH_SASL_SCRAM

src/rdkafka_sasl.c

+32
Original file line numberDiff line numberDiff line change
@@ -488,3 +488,35 @@ int rd_kafka_sasl_global_init(void) {
488488
return 0;
489489
#endif
490490
}
491+
492+
/**
493+
* Sets or resets the SASL (PLAIN or SCRAM) credentials used by this
494+
* client when making new connections to brokers.
495+
*
496+
* @returns NULL on success or an error object on error.
497+
*/
498+
rd_kafka_error_t *rd_kafka_sasl_set_credentials(rd_kafka_t *rk,
499+
const char *username,
500+
const char *password) {
501+
502+
if (!username || !password)
503+
return rd_kafka_error_new(RD_KAFKA_RESP_ERR__INVALID_ARG,
504+
"Username and password are required");
505+
506+
mtx_lock(&rk->rk_conf.sasl.lock);
507+
508+
if (rk->rk_conf.sasl.username)
509+
rd_free(rk->rk_conf.sasl.username);
510+
rk->rk_conf.sasl.username = rd_strdup(username);
511+
512+
if (rk->rk_conf.sasl.password)
513+
rd_free(rk->rk_conf.sasl.password);
514+
rk->rk_conf.sasl.password = rd_strdup(password);
515+
516+
mtx_unlock(&rk->rk_conf.sasl.lock);
517+
518+
rd_kafka_all_brokers_wakeup(rk, RD_KAFKA_BROKER_STATE_INIT,
519+
"SASL credentials updated");
520+
521+
return NULL;
522+
}

src/rdkafka_sasl_cyrus.c

+19-1
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ static int rd_kafka_sasl_cyrus_recv(struct rd_kafka_transport_s *rktrans,
9191
const char *out;
9292
unsigned int outlen;
9393

94+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
9495
r = sasl_client_step(state->conn, size > 0 ? buf : NULL, size,
9596
&interact, &out, &outlen);
97+
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
9698

9799
if (r >= 0) {
98100
/* Note: outlen may be 0 here for an empty response */
@@ -148,9 +150,11 @@ static int rd_kafka_sasl_cyrus_recv(struct rd_kafka_transport_s *rktrans,
148150
RD_KAFKA_DBG_SECURITY) {
149151
const char *user, *mech, *authsrc;
150152

153+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
151154
if (sasl_getprop(state->conn, SASL_USERNAME,
152155
(const void **)&user) != SASL_OK)
153156
user = "(unknown)";
157+
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
154158

155159
if (sasl_getprop(state->conn, SASL_MECHNAME,
156160
(const void **)&mech) != SASL_OK)
@@ -356,6 +360,12 @@ static int rd_kafka_sasl_cyrus_cb_getsimple(void *context,
356360
switch (id) {
357361
case SASL_CB_USER:
358362
case SASL_CB_AUTHNAME:
363+
/* Since cyrus expects the returned pointer to be stable
364+
* and not have its content changed, but the username
365+
* and password may be updated at anytime by the application
366+
* calling sasl_set_credentials(), we need to lock
367+
* rk_conf.sasl.lock before each call into cyrus-sasl.
368+
* So when we get here the lock is already held. */
359369
*result = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.username;
360370
break;
361371

@@ -381,6 +391,7 @@ static int rd_kafka_sasl_cyrus_cb_getsecret(sasl_conn_t *conn,
381391
rd_kafka_transport_t *rktrans = context;
382392
const char *password;
383393

394+
/* rk_conf.sasl.lock is already locked */
384395
password = rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.password;
385396

386397
if (!password) {
@@ -472,8 +483,11 @@ static void rd_kafka_sasl_cyrus_close(struct rd_kafka_transport_s *rktrans) {
472483
if (!state)
473484
return;
474485

475-
if (state->conn)
486+
if (state->conn) {
487+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
476488
sasl_dispose(&state->conn);
489+
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
490+
}
477491
rd_free(state);
478492
}
479493

@@ -528,9 +542,11 @@ static int rd_kafka_sasl_cyrus_client_new(rd_kafka_transport_t *rktrans,
528542

529543
memcpy(state->callbacks, callbacks, sizeof(callbacks));
530544

545+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
531546
r = sasl_client_new(rk->rk_conf.sasl.service_name, hostname, NULL,
532547
NULL, /* no local & remote IP checks */
533548
state->callbacks, 0, &state->conn);
549+
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
534550
if (r != SASL_OK) {
535551
rd_snprintf(errstr, errstr_size, "%s",
536552
sasl_errstring(r, NULL, NULL));
@@ -550,8 +566,10 @@ static int rd_kafka_sasl_cyrus_client_new(rd_kafka_transport_t *rktrans,
550566
unsigned int outlen;
551567
const char *mech = NULL;
552568

569+
mtx_lock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
553570
r = sasl_client_start(state->conn, rk->rk_conf.sasl.mechanisms,
554571
NULL, &out, &outlen, &mech);
572+
mtx_unlock(&rktrans->rktrans_rkb->rkb_rk->rk_conf.sasl.lock);
555573

556574
if (r >= 0)
557575
if (rd_kafka_sasl_send(rktrans, out, outlen, errstr,

src/rdkafka_sasl_plain.c

+17-7
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,16 @@ int rd_kafka_sasl_plain_client_new(rd_kafka_transport_t *rktrans,
7474
char *buf;
7575
int of = 0;
7676
int zidlen = 0;
77-
int cidlen = rk->rk_conf.sasl.username
78-
? (int)strlen(rk->rk_conf.sasl.username)
79-
: 0;
80-
int pwlen = rk->rk_conf.sasl.password
81-
? (int)strlen(rk->rk_conf.sasl.password)
82-
: 0;
77+
int cidlen, pwlen;
8378

79+
mtx_lock(&rk->rk_conf.sasl.lock);
80+
81+
cidlen = rk->rk_conf.sasl.username
82+
? (int)strlen(rk->rk_conf.sasl.username)
83+
: 0;
84+
pwlen = rk->rk_conf.sasl.password
85+
? (int)strlen(rk->rk_conf.sasl.password)
86+
: 0;
8487

8588
buf = rd_alloca(zidlen + 1 + cidlen + 1 + pwlen + 1);
8689

@@ -95,6 +98,7 @@ int rd_kafka_sasl_plain_client_new(rd_kafka_transport_t *rktrans,
9598
/* passwd */
9699
memcpy(&buf[of], rk->rk_conf.sasl.password, pwlen);
97100
of += pwlen;
101+
mtx_unlock(&rk->rk_conf.sasl.lock);
98102

99103
rd_rkb_dbg(rkb, SECURITY, "SASLPLAIN",
100104
"Sending SASL PLAIN (builtin) authentication token");
@@ -115,7 +119,13 @@ int rd_kafka_sasl_plain_client_new(rd_kafka_transport_t *rktrans,
115119
static int rd_kafka_sasl_plain_conf_validate(rd_kafka_t *rk,
116120
char *errstr,
117121
size_t errstr_size) {
118-
if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
122+
rd_bool_t both_set;
123+
124+
mtx_lock(&rk->rk_conf.sasl.lock);
125+
both_set = rk->rk_conf.sasl.username && rk->rk_conf.sasl.password;
126+
mtx_unlock(&rk->rk_conf.sasl.lock);
127+
128+
if (!both_set) {
119129
rd_snprintf(errstr, errstr_size,
120130
"sasl.username and sasl.password must be set");
121131
return -1;

src/rdkafka_sasl_scram.c

+19-6
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,8 @@ static int rd_kafka_sasl_scram_build_client_final_message(
397397
int itcnt,
398398
rd_chariov_t *out) {
399399
struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
400-
const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
401-
rd_chariov_t SaslPassword = {.ptr = conf->sasl.password,
402-
.size = strlen(conf->sasl.password)};
400+
rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
401+
rd_chariov_t SaslPassword = RD_ZERO_INIT;
403402
rd_chariov_t SaltedPassword = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)};
404403
rd_chariov_t ClientKey = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)};
405404
rd_chariov_t ServerKey = {.ptr = rd_alloca(EVP_MAX_MD_SIZE)};
@@ -416,6 +415,11 @@ static int rd_kafka_sasl_scram_build_client_final_message(
416415
char *ClientProofB64;
417416
int i;
418417

418+
mtx_lock(&conf->sasl.lock);
419+
rd_strdupa(&SaslPassword.ptr, conf->sasl.password);
420+
mtx_unlock(&conf->sasl.lock);
421+
SaslPassword.size = strlen(SaslPassword.ptr);
422+
419423
/* Constructing the ClientProof attribute (p):
420424
*
421425
* p = Base64-encoded ClientProof
@@ -664,7 +668,7 @@ rd_kafka_sasl_scram_handle_server_final_message(rd_kafka_transport_t *rktrans,
664668
} else if ((attr_v = rd_kafka_sasl_scram_get_attr(
665669
in, 'v', "verifier in server-final-message", errstr,
666670
errstr_size))) {
667-
const rd_kafka_conf_t *conf;
671+
rd_kafka_conf_t *conf;
668672

669673
/* Authentication succesful on server,
670674
* but we need to verify the ServerSignature too. */
@@ -686,9 +690,11 @@ rd_kafka_sasl_scram_handle_server_final_message(rd_kafka_transport_t *rktrans,
686690

687691
conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
688692

693+
mtx_lock(&conf->sasl.lock);
689694
rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY | RD_KAFKA_DBG_BROKER,
690695
"SCRAMAUTH", "Authenticated as %s using %s",
691696
conf->sasl.username, conf->sasl.mechanisms);
697+
mtx_unlock(&conf->sasl.lock);
692698

693699
rd_kafka_sasl_auth_done(rktrans);
694700
return 0;
@@ -711,11 +717,13 @@ rd_kafka_sasl_scram_build_client_first_message(rd_kafka_transport_t *rktrans,
711717
rd_chariov_t *out) {
712718
char *sasl_username;
713719
struct rd_kafka_sasl_scram_state *state = rktrans->rktrans_sasl.state;
714-
const rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
720+
rd_kafka_conf_t *conf = &rktrans->rktrans_rkb->rkb_rk->rk_conf;
715721

716722
rd_kafka_sasl_scram_generate_nonce(&state->cnonce);
717723

724+
mtx_lock(&conf->sasl.lock);
718725
sasl_username = rd_kafka_sasl_safe_string(conf->sasl.username);
726+
mtx_unlock(&conf->sasl.lock);
719727

720728
out->size =
721729
strlen("n,,n=,r=") + strlen(sasl_username) + state->cnonce.size;
@@ -842,8 +850,13 @@ static int rd_kafka_sasl_scram_conf_validate(rd_kafka_t *rk,
842850
char *errstr,
843851
size_t errstr_size) {
844852
const char *mech = rk->rk_conf.sasl.mechanisms;
853+
rd_bool_t both_set;
854+
855+
mtx_lock(&rk->rk_conf.sasl.lock);
856+
both_set = rk->rk_conf.sasl.username && rk->rk_conf.sasl.password;
857+
mtx_unlock(&rk->rk_conf.sasl.lock);
845858

846-
if (!rk->rk_conf.sasl.username || !rk->rk_conf.sasl.password) {
859+
if (!both_set) {
847860
rd_snprintf(errstr, errstr_size,
848861
"sasl.username and sasl.password must be set");
849862
return -1;

0 commit comments

Comments
 (0)