diff --git a/configuration.c b/configuration.c index ca442cb..d0a0c82 100644 --- a/configuration.c +++ b/configuration.c @@ -68,6 +68,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */ cbs->offset_commit = NULL; kafka_conf_callback_dtor(cbs->log); cbs->log = NULL; + kafka_conf_callback_dtor(cbs->oauthbearer_refresh); + cbs->oauthbearer_refresh = NULL; } /* }}} */ static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */ @@ -87,6 +89,7 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f kafka_conf_callback_copy(&to->stats, from->stats); kafka_conf_callback_copy(&to->offset_commit, from->offset_commit); kafka_conf_callback_copy(&to->log, from->log); + kafka_conf_callback_copy(&to->oauthbearer_refresh, from->oauthbearer_refresh); } /* }}} */ static void kafka_conf_free(zend_object *object) /* {{{ */ @@ -304,6 +307,33 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil zval_ptr_dtor(&args[3]); } +static void kafka_conf_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) +{ + kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; + zval args[2]; + + if (!opaque) { + return; + } + + if (!cbs->oauthbearer_refresh) { + return; + } + + ZVAL_NULL(&args[0]); + ZVAL_NULL(&args[1]); + + ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); + if (oauthbearer_config) { + ZVAL_STRING(&args[1], oauthbearer_config); + } + + kafka_call_function(&cbs->oauthbearer_refresh->fci, &cbs->oauthbearer_refresh->fcc, NULL, 2, args); + + zval_ptr_dtor(&args[0]); + zval_ptr_dtor(&args[1]); +} + /* {{{ proto SimpleKafkaClient\Configuration::__construct() */ ZEND_METHOD(SimpleKafkaClient_Configuration, __construct) { @@ -579,6 +609,38 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Configuration::setOAuthBearerTokenRefreshCb(callable $callback) + Sets the OAuthBearer token refresh callback */ +ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb) +{ + zend_fcall_info fci; + zend_fcall_info_cache fcc; + kafka_conf_object *intern; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_FUNC(fci, fcc) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_conf_object(getThis()); + if (!intern) { + return; + } + + Z_ADDREF_P(&fci.function_name); + + if (intern->cbs.oauthbearer_refresh) { + zval_ptr_dtor(&intern->cbs.oauthbearer_refresh->fci.function_name); + } else { + intern->cbs.oauthbearer_refresh = ecalloc(1, sizeof(*intern->cbs.oauthbearer_refresh)); + } + + intern->cbs.oauthbearer_refresh->fci = fci; + intern->cbs.oauthbearer_refresh->fcc = fcc; + + rd_kafka_conf_set_oauthbearer_token_refresh_cb(intern->conf, kafka_conf_oauthbearer_token_refresh_cb); +} +/* }}} */ + void kafka_conf_init(INIT_FUNC_ARGS) { zend_class_entry tmpce; diff --git a/configuration.stub.php b/configuration.stub.php index e656858..c32f9e1 100644 --- a/configuration.stub.php +++ b/configuration.stub.php @@ -23,4 +23,6 @@ public function setRebalanceCb(callable $callback): void {} public function setOffsetCommitCb(callable $callback): void {} public function setLogCb(callable $callback): void {} + + public function setOAuthBearerTokenRefreshCb(callable $callback): void {} } diff --git a/configuration_arginfo.h b/configuration_arginfo.h index 2846d5f..9f766de 100644 --- a/configuration_arginfo.h +++ b/configuration_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 1790726e9dd0d0664baa412bb345663c4dab71b5 */ + * Stub hash: b372876d55f3b02bd30dd4c09d20f305b070718c */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Configuration___construct, 0, 0, 0) ZEND_END_ARG_INFO() @@ -26,6 +26,8 @@ ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Configuration_setLogCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb +#define arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb + ZEND_METHOD(SimpleKafkaClient_Configuration, __construct); ZEND_METHOD(SimpleKafkaClient_Configuration, dump); @@ -36,6 +38,7 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setStatsCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setRebalanceCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setOffsetCommitCb); ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb); +ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb); static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[] = { @@ -48,5 +51,6 @@ static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[] ZEND_ME(SimpleKafkaClient_Configuration, setRebalanceCb, arginfo_class_SimpleKafkaClient_Configuration_setRebalanceCb, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Configuration, setOffsetCommitCb, arginfo_class_SimpleKafkaClient_Configuration_setOffsetCommitCb, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Configuration, setLogCb, arginfo_class_SimpleKafkaClient_Configuration_setLogCb, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb, arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/php_simple_kafka_client_int.h b/php_simple_kafka_client_int.h index c68e314..d25d115 100644 --- a/php_simple_kafka_client_int.h +++ b/php_simple_kafka_client_int.h @@ -56,6 +56,7 @@ typedef struct _kafka_conf_callbacks { kafka_conf_callback *consume; kafka_conf_callback *offset_commit; kafka_conf_callback *log; + kafka_conf_callback *oauthbearer_refresh; } kafka_conf_callbacks; typedef struct _kafka_conf_object { diff --git a/tests/oauthbearer_cb.phpt b/tests/oauthbearer_cb.phpt new file mode 100644 index 0000000..0cfe3b5 --- /dev/null +++ b/tests/oauthbearer_cb.phpt @@ -0,0 +1,26 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); +$conf->set('sasl.oauthbearer.config', 'principalClaimName=azp'); +$conf->setOAuthBearerTokenRefreshCb(function($kafka, $oAuthBearerConfig) { + var_dump($oAuthBearerConfig); +}); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->poll(-1); +--EXPECT-- +string(22) "principalClaimName=azp"