diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 997b2cc..c704fee 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -288,6 +288,59 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null) + Set SASL/OAUTHBEARER token and metadata. */ +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken) +{ + zend_long lifetime_ms; + const char **extensions = NULL; + char *header_key, *header_value, *token, *principal_name, *errstr = NULL; + size_t token_len, principal_name_len, errstr_size = 0, extension_size = 0; + kafka_object *intern; + rd_kafka_resp_err_t err; + HashTable *ht_extensions = NULL; + HashPosition extensionsPos; + zval *z_header_value; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 3, 4) + Z_PARAM_STRING(token, token_len) + Z_PARAM_LONG(lifetime_ms) + Z_PARAM_STRING(principal_name, principal_name_len) + Z_PARAM_OPTIONAL + Z_PARAM_ARRAY_HT_OR_NULL(ht_extensions) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + if (ht_extensions) { + for (zend_hash_internal_pointer_reset_ex(ht_extensions, &extensionsPos); + (z_header_value = zend_hash_get_current_data_ex(ht_extensions, &extensionsPos)) != NULL && + (header_key = kafka_hash_get_current_key_ex(ht_extensions, &extensionsPos)) != NULL; + zend_hash_move_forward_ex(ht_extensions, &extensionsPos)) { + convert_to_string_ex(z_header_value); + extensions = realloc(extensions, (extension_size + 1) * sizeof (header_key)); + extensions[extension_size] = header_key; + header_value = Z_STRVAL_P(z_header_value); + extensions = realloc(extensions, (extension_size + 2) * sizeof (header_value)); + extensions[extension_size+1] = Z_STRVAL_P(z_header_value); + extension_size+=2; + } + } + + err = rd_kafka_oauthbearer_set_token(intern->rk, token, lifetime_ms, principal_name, extensions, extension_size, errstr, errstr_size); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } + + free(extensions); +} +/* }}} */ + #define COPY_CONSTANT(name) \ REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT) diff --git a/simple_kafka_client.stub.php b/simple_kafka_client.stub.php index 0f9e3c4..e222788 100644 --- a/simple_kafka_client.stub.php +++ b/simple_kafka_client.stub.php @@ -15,4 +15,6 @@ public function queryWatermarkOffsets(string $topic, int $partition, int &$low, public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} public function setOAuthBearerTokenFailure(string $errorString): void {} + + public function setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null): void {} } diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h index 85c57b8..26c0671 100644 --- a/simple_kafka_client_arginfo.h +++ b/simple_kafka_client_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */ + * Stub hash: 54165f3ef5d3833ee646b825574c959323fd612b */ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) @@ -27,12 +27,20 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_se ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, 0, 3, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, token, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetimeMs, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principalName, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 1, "null") +ZEND_END_ARG_INFO() + ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure); +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken); static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { @@ -41,5 +49,6 @@ static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerToken, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/tests/set_oauthbearer_token.phpt b/tests/set_oauthbearer_token.phpt new file mode 100644 index 0000000..683ebbc --- /dev/null +++ b/tests/set_oauthbearer_token.phpt @@ -0,0 +1,24 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->setOAuthBearerToken('token', 100000 + time() * 1000, 'principal', ['test'=>'key']); +$producer->poll(-1); +echo 'Done'; +--EXPECT-- +Done