diff --git a/simple_kafka_client.c b/simple_kafka_client.c index eaf7df2..997b2cc 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -261,6 +261,33 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerTokenFailure(string $errorString) + The token refresh callback or event handler should invoke this method upon failure. */ +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure) +{ + char *error_string; + size_t error_string_len; + kafka_object *intern; + rd_kafka_resp_err_t err; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_STRING(error_string, error_string_len) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + err = rd_kafka_oauthbearer_set_token_failure(intern->rk, error_string); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } +} +/* }}} */ + #define COPY_CONSTANT(name) \ REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT) diff --git a/simple_kafka_client.stub.php b/simple_kafka_client.stub.php index de742ab..0f9e3c4 100644 --- a/simple_kafka_client.stub.php +++ b/simple_kafka_client.stub.php @@ -13,4 +13,6 @@ public function getOutQLen(): int {} public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {} public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} + + public function setOAuthBearerTokenFailure(string $errorString): void {} } diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h index cc393b6..85c57b8 100644 --- a/simple_kafka_client_arginfo.h +++ b/simple_kafka_client_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: e61ec0821ea47152b2ce6b7116ec791c0c712a73 */ + * Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) @@ -23,11 +23,16 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_of ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0) +ZEND_END_ARG_INFO() + ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure); static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { @@ -35,5 +40,6 @@ static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/tests/set_oauthbearer_failure.phpt b/tests/set_oauthbearer_failure.phpt new file mode 100644 index 0000000..fe8f6cc --- /dev/null +++ b/tests/set_oauthbearer_failure.phpt @@ -0,0 +1,23 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->setOAuthBearerTokenFailure('something'); +$producer->poll(-1); +--EXPECT-- +string(51) "Failed to acquire SASL OAUTHBEARER token: something"