From c80fddd58b355cde248773e474031947372441ea Mon Sep 17 00:00:00 2001 From: nick <nickjobszh@gmail.com> Date: Sun, 25 Apr 2021 00:41:43 +0200 Subject: [PATCH 1/5] save work --- simple_kafka_client.c | 51 +++++++++++++++++++++++++++++++++++ simple_kafka_client.stub.php | 2 ++ simple_kafka_client_arginfo.h | 11 +++++++- 3 files changed, 63 insertions(+), 1 deletion(-) diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 997b2cc..ab07079 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -288,6 +288,57 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure) } /* }}} */ +/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null) + Set SASL/OAUTHBEARER token and metadata. */ +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken) +{ + zend_long lifetime_ms; + const char **extensions = NULL; + char *header_key, *header_value, *token, *principal_name, *errstr; + size_t token_len, principal_name_len, errstr_size, extension_size = 0; + kafka_object *intern; + rd_kafka_resp_err_t err; + HashTable *ht_extensions = NULL; + HashPosition extensionsPos; + zval *z_header_value; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 3, 4) + Z_PARAM_STRING(token, token_len) + Z_PARAM_LONG(lifetime_ms) + Z_PARAM_STRING(principal_name, principal_name_len) + Z_PARAM_OPTIONAL + Z_PARAM_ARRAY_HT_OR_NULL(ht_extensions) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + if (ht_extensions) { + for (zend_hash_internal_pointer_reset_ex(ht_extensions, &extensionsPos); + (z_header_value = zend_hash_get_current_data_ex(ht_extensions, &extensionsPos)) != NULL && + (header_key = kafka_hash_get_current_key_ex(ht_extensions, &extensionsPos)) != NULL; + zend_hash_move_forward_ex(ht_extensions, &extensionsPos)) { + convert_to_string_ex(z_header_value); + extensions = realloc(extensions, (extension_size + 1) * sizeof (header_key)); + extensions[extension_size] = header_key; + header_value = Z_STRVAL_P(z_header_value); + extensions = realloc(extensions, (extension_size + 2) * sizeof (header_value)); + extensions[extension_size+1] = Z_STRVAL_P(z_header_value); + extension_size+=2; + } + } + + err = rd_kafka_oauthbearer_set_token(intern->rk, token, lifetime_ms, principal_name, extensions, extension_size, errstr, errstr_size); + + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { + zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); + return; + } +} +/* }}} */ + #define COPY_CONSTANT(name) \ REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT) diff --git a/simple_kafka_client.stub.php b/simple_kafka_client.stub.php index 0f9e3c4..e222788 100644 --- a/simple_kafka_client.stub.php +++ b/simple_kafka_client.stub.php @@ -15,4 +15,6 @@ public function queryWatermarkOffsets(string $topic, int $partition, int &$low, public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} public function setOAuthBearerTokenFailure(string $errorString): void {} + + public function setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null): void {} } diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h index 85c57b8..26c0671 100644 --- a/simple_kafka_client_arginfo.h +++ b/simple_kafka_client_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */ + * Stub hash: 54165f3ef5d3833ee646b825574c959323fd612b */ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) @@ -27,12 +27,20 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_se ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, 0, 3, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, token, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, lifetimeMs, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, principalName, IS_STRING, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 1, "null") +ZEND_END_ARG_INFO() + ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure); +ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken); static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { @@ -41,5 +49,6 @@ static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerToken, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, ZEND_ACC_PUBLIC) ZEND_FE_END }; From b4f5bd7291f3c05cab844af0647a59f1a7c0d870 Mon Sep 17 00:00:00 2001 From: nick <nickjobszh@gmail.com> Date: Sun, 25 Apr 2021 01:02:46 +0200 Subject: [PATCH 2/5] add test --- tests/set_oauthbearer_token.phpt | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 tests/set_oauthbearer_token.phpt diff --git a/tests/set_oauthbearer_token.phpt b/tests/set_oauthbearer_token.phpt new file mode 100644 index 0000000..efc39c5 --- /dev/null +++ b/tests/set_oauthbearer_token.phpt @@ -0,0 +1,24 @@ +--TEST-- +Produce, consume +--SKIPIF-- +<?php +require __DIR__ . '/integration-tests-check.php'; +--FILE-- +<?php +require __DIR__ . '/integration-tests-check.php'; + +$conf = new SimpleKafkaClient\Configuration(); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('security.protocol', 'SASL_PLAINTEXT'); +$conf->set('sasl.mechanisms', 'OAUTHBEARER'); + +$conf->setErrorCb(function($kafka, $errorCode, $errorString) { + var_dump($errorString); +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$producer->setOAuthBearerToken('token', 100000 + microtime() * 1000, 'principal', ['test'=>'key']); +$producer->poll(-1); +echo 'Done'; +--EXPECT-- +Done From 44c9ba139b15ea7129c4de1ae4557635b6c934f2 Mon Sep 17 00:00:00 2001 From: nick <nickjobszh@gmail.com> Date: Sun, 25 Apr 2021 01:06:46 +0200 Subject: [PATCH 3/5] init vars --- simple_kafka_client.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/simple_kafka_client.c b/simple_kafka_client.c index ab07079..c23b4d2 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -294,8 +294,8 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken) { zend_long lifetime_ms; const char **extensions = NULL; - char *header_key, *header_value, *token, *principal_name, *errstr; - size_t token_len, principal_name_len, errstr_size, extension_size = 0; + char *header_key, *header_value, *token, *principal_name, *errstr = NULL; + size_t token_len, principal_name_len, errstr_size = 0, extension_size = 0; kafka_object *intern; rd_kafka_resp_err_t err; HashTable *ht_extensions = NULL; From bf8878015e626940744418a94f7ed07266128cfa Mon Sep 17 00:00:00 2001 From: nick <nickjobszh@gmail.com> Date: Sun, 25 Apr 2021 01:10:58 +0200 Subject: [PATCH 4/5] fix test --- tests/set_oauthbearer_token.phpt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/set_oauthbearer_token.phpt b/tests/set_oauthbearer_token.phpt index efc39c5..683ebbc 100644 --- a/tests/set_oauthbearer_token.phpt +++ b/tests/set_oauthbearer_token.phpt @@ -17,7 +17,7 @@ $conf->setErrorCb(function($kafka, $errorCode, $errorString) { }); $producer = new SimpleKafkaClient\Producer($conf); -$producer->setOAuthBearerToken('token', 100000 + microtime() * 1000, 'principal', ['test'=>'key']); +$producer->setOAuthBearerToken('token', 100000 + time() * 1000, 'principal', ['test'=>'key']); $producer->poll(-1); echo 'Done'; --EXPECT-- From b414b40febdc9d924721d8c88a85e7d6579eff40 Mon Sep 17 00:00:00 2001 From: nick <nickjobszh@gmail.com> Date: Sun, 25 Apr 2021 01:40:30 +0200 Subject: [PATCH 5/5] add free --- simple_kafka_client.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/simple_kafka_client.c b/simple_kafka_client.c index c23b4d2..c704fee 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -336,6 +336,8 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken) zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); return; } + + free(extensions); } /* }}} */