From 383de5212ba9d07d870dc25de83703b4ce4b1495 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 21:19:06 +0200 Subject: [PATCH 1/6] extend properly, remove duplicate code --- consumer.c | 118 ------------------ consumer.stub.php | 6 - consumer_arginfo.h | 27 +--- kafka_arginfo.h | 45 ------- package.xml | 2 +- simple_kafka_client.c | 4 +- ...a.stub.php => simple_kafka_client.stub.php | 0 7 files changed, 4 insertions(+), 198 deletions(-) delete mode 100644 kafka_arginfo.h rename kafka.stub.php => simple_kafka_client.stub.php (100%) diff --git a/consumer.c b/consumer.c index 19a5b5f..8c4a6ca 100644 --- a/consumer.c +++ b/consumer.c @@ -433,48 +433,6 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, close) } /* }}} */ -/* {{{ proto Metadata SimpleKafkaClient\Consumer::getMetadata(bool all_topics, int timeout_ms, SimpleKafkaClient\Topic only_topic = null) - Request Metadata from broker */ -ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata) -{ - zend_bool all_topics; - zval *only_zrkt = NULL; - zend_long timeout_ms; - rd_kafka_resp_err_t err; - kafka_object *intern; - const rd_kafka_metadata_t *metadata; - kafka_topic_object *only_orkt = NULL; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 3) - Z_PARAM_BOOL(all_topics) - Z_PARAM_LONG(timeout_ms) - Z_PARAM_OPTIONAL - Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - if (only_zrkt) { - only_orkt = get_kafka_topic_object(only_zrkt); - if (!only_orkt) { - return; - } - } - - err = rd_kafka_metadata(intern->rk, all_topics, only_orkt ? only_orkt->rkt : NULL, &metadata, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - - kafka_metadata_obj_init(return_value, metadata); -} -/* }}} */ - /* {{{ proto SimpleKafkaClient\ConsumerTopic SimpleKafkaClient\Consumer::getTopicHandle(string $topic) Returns a SimpleKafkaClient\ConsumerTopic object */ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle) @@ -587,79 +545,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) } /* }}} */ -/* {{{ proto void SimpleKafkaClient\Consumer::offsetsForTimes(array $topicPartitions, int $timeout_ms) - Look up the offsets for the given partitions by timestamp. */ -ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes) -{ - HashTable *htopars = NULL; - kafka_object *intern; - rd_kafka_topic_partition_list_t *topicPartitions; - zend_long timeout_ms; - rd_kafka_resp_err_t err; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) - Z_PARAM_ARRAY_HT(htopars) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - topicPartitions = array_arg_to_kafka_topic_partition_list(1, htopars); - if (!topicPartitions) { - return; - } - - err = rd_kafka_offsets_for_times(intern->rk, topicPartitions, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - rd_kafka_topic_partition_list_destroy(topicPartitions); - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - kafka_topic_partition_list_to_array(return_value, topicPartitions); - rd_kafka_topic_partition_list_destroy(topicPartitions); -} -/* }}} */ - -/* {{{ proto void SimpleKafkaClient\Consumer::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) - Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ -ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets) -{ - kafka_object *intern; - char *topic; - size_t topic_length; - long low, high; - zend_long partition, timeout_ms; - zval *lowResult, *highResult; - rd_kafka_resp_err_t err; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) - Z_PARAM_STRING(topic, topic_length) - Z_PARAM_LONG(partition) - Z_PARAM_ZVAL(lowResult) - Z_PARAM_ZVAL(highResult) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - ZVAL_DEREF(lowResult); - ZVAL_DEREF(highResult); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - err = rd_kafka_query_watermark_offsets(intern->rk, topic, partition, &low, &high, timeout_ms); - - if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { - zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err); - return; - } - - ZVAL_LONG(lowResult, low); - ZVAL_LONG(highResult, high); -} -/* }}} */ diff --git a/consumer.stub.php b/consumer.stub.php index 37e20f6..d7e56d1 100644 --- a/consumer.stub.php +++ b/consumer.stub.php @@ -28,15 +28,9 @@ public function commitAsync($messageOrOffsets): void {} public function close(): void {} - public function getMetadata(bool $allTopics, int $timeoutMs, ConsumerTopic $topic): Metadata {} - public function getTopicHandle(string $topic): ConsumerTopic {} public function getCommittedOffsets(array $topics, int $timeoutMs): array {} public function getOffsetPositions(array $topics): array {} - - public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} - - public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {} } diff --git a/consumer_arginfo.h b/consumer_arginfo.h index 0691ebe..c33e529 100644 --- a/consumer_arginfo.h +++ b/consumer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 091c6b60081bb08ec174ef87b9cc6d2b3fbba461 */ + * Stub hash: 378cc029a3673afe02572e7e17fde17e47b2aefd */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) @@ -33,12 +33,6 @@ ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Consumer_close arginfo_class_SimpleKafkaClient_Consumer_unsubscribe -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) - ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) - ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\ConsumerTopic, 0) -ZEND_END_ARG_INFO() - ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, 0, 1, SimpleKafkaClient\\ConsumerTopic, 0) ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_END_ARG_INFO() @@ -52,19 +46,6 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, 0, 2, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, 0, 5, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) - ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - ZEND_METHOD(SimpleKafkaClient_Consumer, __construct); ZEND_METHOD(SimpleKafkaClient_Consumer, assign); @@ -76,12 +57,9 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume); ZEND_METHOD(SimpleKafkaClient_Consumer, commit); ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync); ZEND_METHOD(SimpleKafkaClient_Consumer, close); -ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata); ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle); ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets); ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions); -ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes); -ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets); static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = { @@ -95,11 +73,8 @@ static const zend_function_entry class_SimpleKafkaClient_Consumer_methods[] = { ZEND_ME(SimpleKafkaClient_Consumer, commit, arginfo_class_SimpleKafkaClient_Consumer_commit, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, commitAsync, arginfo_class_SimpleKafkaClient_Consumer_commitAsync, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, close, arginfo_class_SimpleKafkaClient_Consumer_close, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, getMetadata, arginfo_class_SimpleKafkaClient_Consumer_getMetadata, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getTopicHandle, arginfo_class_SimpleKafkaClient_Consumer_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getCommittedOffsets, arginfo_class_SimpleKafkaClient_Consumer_getCommittedOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Consumer, getOffsetPositions, arginfo_class_SimpleKafkaClient_Consumer_getOffsetPositions, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, offsetsForTimes, arginfo_class_SimpleKafkaClient_Consumer_offsetsForTimes, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Consumer, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Consumer_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/kafka_arginfo.h b/kafka_arginfo.h deleted file mode 100644 index 0b7cd60..0000000 --- a/kafka_arginfo.h +++ /dev/null @@ -1,45 +0,0 @@ -/* This is a generated file, edit the .stub.php file instead. - * Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */ - -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) - ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) - ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) - ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - - -ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); -ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); -ZEND_METHOD(SimpleKafkaClient_Kafka, poll); -ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); -ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); - - -static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { - ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) - ZEND_FE_END -}; diff --git a/package.xml b/package.xml index 1a784f8..43365dc 100644 --- a/package.xml +++ b/package.xml @@ -42,7 +42,7 @@ - + diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 5b77132..2896881 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -45,7 +45,7 @@ #include "consumer_arginfo.h" #include "functions_arginfo.h" #include "producer_arginfo.h" -#include "kafka_arginfo.h" +#include "simple_kafka_client_arginfo.h" enum { RD_KAFKA_LOG_PRINT = 100 @@ -352,7 +352,7 @@ PHP_MINIT_FUNCTION(simple_kafka_client) ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka); INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods); - ce_kafka_consumer = zend_register_internal_class(&ce); + ce_kafka_consumer = zend_register_internal_class_ex(&ce, ce_kafka); ce_kafka_consumer->create_object = kafka_new; kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU); diff --git a/kafka.stub.php b/simple_kafka_client.stub.php similarity index 100% rename from kafka.stub.php rename to simple_kafka_client.stub.php From e658ed6389b404ec2da588cb8fea8102e18c3dcb Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 21:43:02 +0200 Subject: [PATCH 2/6] add new arginfo, add offsets test --- simple_kafka_client_arginfo.h | 45 +++++++++++++++++++++++++++++++++++ tests/offsets_for_times.phpt | 36 ++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) create mode 100644 simple_kafka_client_arginfo.h create mode 100644 tests/offsets_for_times.phpt diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h new file mode 100644 index 0000000..0b7cd60 --- /dev/null +++ b/simple_kafka_client_arginfo.h @@ -0,0 +1,45 @@ +/* This is a generated file, edit the .stub.php file instead. + * Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */ + +ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) + ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) + ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) + ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) + ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0) + ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) +ZEND_END_ARG_INFO() + + +ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); +ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); +ZEND_METHOD(SimpleKafkaClient_Kafka, poll); +ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); +ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); + + +static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { + ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) + ZEND_FE_END +}; diff --git a/tests/offsets_for_times.phpt b/tests/offsets_for_times.phpt new file mode 100644 index 0000000..990fa2b --- /dev/null +++ b/tests/offsets_for_times.phpt @@ -0,0 +1,36 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('client.id', 'pure-php-producer'); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$producer = new Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic-offsets'); +$time = time(); +$topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + 'special-message', + 'special-key', + [ + 'special-header' => 'awesome' + ] +); +$result = $producer->flush(20000); + +$topicPartition = new TopicPartition('pure-php-test-topic-offsets', 0, $time); +$result = $producer->offsetsForTimes([$topicPartition], 10000); +var_dump($result[0]->getTopicName()); +var_dump($result[0]->getPartition()); +var_dump($result[0]->getOffset()); +--EXPECT-- +string(27) "pure-php-test-topic-offsets" +int(0) +int(0) From 8cc74cdcee4238aba20f576c4acce7a70f2aa53e Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 22:06:41 +0200 Subject: [PATCH 3/6] fix test, move poll --- producer.c | 20 ++++++++++++++++++++ producer.stub.php | 2 ++ producer_arginfo.h | 6 +++++- simple_kafka_client.c | 20 -------------------- simple_kafka_client.stub.php | 2 -- simple_kafka_client_arginfo.h | 8 +------- tests/offsets_for_times.phpt | 6 +++--- 7 files changed, 31 insertions(+), 33 deletions(-) diff --git a/producer.c b/producer.c index 1c012fb..29c5ba4 100644 --- a/producer.c +++ b/producer.c @@ -120,6 +120,26 @@ ZEND_METHOD(SimpleKafkaClient_Producer, flush) } /* }}} */ +/* {{{ proto int SimpleKafkaClient\Producer::poll(int $timeoutMs) + Polls the provided kafka handle for events */ +ZEND_METHOD(SimpleKafkaClient_Producer, poll) +{ + kafka_object *intern; + zend_long timeout_ms; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_LONG(timeout_ms) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms)); +} +/* }}} */ + /* {{{ proto int SimpleKafkaClient\Producer::purge(int $purge_flags) Purge messages that are in queue or in flight */ ZEND_METHOD(SimpleKafkaClient_Producer, purge) diff --git a/producer.stub.php b/producer.stub.php index f193737..f81610e 100644 --- a/producer.stub.php +++ b/producer.stub.php @@ -18,6 +18,8 @@ public function abortTransaction(int $timeoutMs): void {} public function flush(int $timeoutMs): int {} + public function poll(int $timeoutMs): int {} + public function purge(int $purgeFlags): int {} public function getTopicHandle(string $topic): ProducerTopic {} diff --git a/producer_arginfo.h b/producer_arginfo.h index 28b0435..8c4e343 100644 --- a/producer_arginfo.h +++ b/producer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: ae03dd8127a9e4799e241bc490de200ff18a4178 */ + * Stub hash: 30c864ad8163b67989b699e8e94c4fe6539a5386 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Producer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) @@ -20,6 +20,8 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() +#define arginfo_class_SimpleKafkaClient_Producer_poll arginfo_class_SimpleKafkaClient_Producer_flush + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer_purge, 0, 1, IS_LONG, 0) ZEND_ARG_TYPE_INFO(0, purgeFlags, IS_LONG, 0) ZEND_END_ARG_INFO() @@ -35,6 +37,7 @@ ZEND_METHOD(SimpleKafkaClient_Producer, beginTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, commitTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, abortTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, flush); +ZEND_METHOD(SimpleKafkaClient_Producer, poll); ZEND_METHOD(SimpleKafkaClient_Producer, purge); ZEND_METHOD(SimpleKafkaClient_Producer, getTopicHandle); @@ -46,6 +49,7 @@ static const zend_function_entry class_SimpleKafkaClient_Producer_methods[] = { ZEND_ME(SimpleKafkaClient_Producer, commitTransaction, arginfo_class_SimpleKafkaClient_Producer_commitTransaction, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, abortTransaction, arginfo_class_SimpleKafkaClient_Producer_abortTransaction, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, flush, arginfo_class_SimpleKafkaClient_Producer_flush, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Producer, poll, arginfo_class_SimpleKafkaClient_Producer_poll, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, purge, arginfo_class_SimpleKafkaClient_Producer_purge, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, getTopicHandle, arginfo_class_SimpleKafkaClient_Producer_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_FE_END diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 2896881..629336f 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -184,26 +184,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen) } /* }}} */ -/* {{{ proto int SimpleKafkaClient\Kafka::poll(int $timeoutMs) - Polls the provided kafka handle for events */ -ZEND_METHOD(SimpleKafkaClient_Kafka, poll) -{ - kafka_object *intern; - zend_long timeout_ms; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) - Z_PARAM_LONG(timeout_ms) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - RETURN_LONG(rd_kafka_poll(intern->rk, timeout_ms)); -} -/* }}} */ - /* {{{ proto void SimpleKafkaClient\Kafka::queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeout_ms) Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets) diff --git a/simple_kafka_client.stub.php b/simple_kafka_client.stub.php index d0fbe3f..de742ab 100644 --- a/simple_kafka_client.stub.php +++ b/simple_kafka_client.stub.php @@ -10,8 +10,6 @@ public function getMetadata(bool $allTopics, int $timeoutMs, Topic $topic): Meta public function getOutQLen(): int {} - public function poll(int $timeoutMs): int {} - public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {} public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {} diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h index 0b7cd60..cc393b6 100644 --- a/simple_kafka_client_arginfo.h +++ b/simple_kafka_client_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */ + * Stub hash: e61ec0821ea47152b2ce6b7116ec791c0c712a73 */ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) @@ -10,10 +10,6 @@ ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0) - ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) -ZEND_END_ARG_INFO() - ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) @@ -30,7 +26,6 @@ ZEND_END_ARG_INFO() ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); -ZEND_METHOD(SimpleKafkaClient_Kafka, poll); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); @@ -38,7 +33,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_FE_END diff --git a/tests/offsets_for_times.phpt b/tests/offsets_for_times.phpt index 990fa2b..43e30e9 100644 --- a/tests/offsets_for_times.phpt +++ b/tests/offsets_for_times.phpt @@ -7,11 +7,11 @@ require __DIR__ . '/integration-tests-check.php'; set('client.id', 'pure-php-producer'); $conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); -$producer = new Producer($conf); +$producer = new SimpleKafkaClient\Producer($conf); $topic = $producer->getTopicHandle('pure-php-test-topic-offsets'); $time = time(); $topic->producev( @@ -25,7 +25,7 @@ $topic->producev( ); $result = $producer->flush(20000); -$topicPartition = new TopicPartition('pure-php-test-topic-offsets', 0, $time); +$topicPartition = new SimpleKafkaClient\TopicPartition('pure-php-test-topic-offsets', 0, $time); $result = $producer->offsetsForTimes([$topicPartition], 10000); var_dump($result[0]->getTopicName()); var_dump($result[0]->getPartition()); From f9d63398ae67109d9f9e4d6dc52c3b874d2a01e2 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 22:20:39 +0200 Subject: [PATCH 4/6] fix func args and add test --- simple_kafka_client.c | 2 +- tests/query_watermark_offsets.phpt | 36 ++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 tests/query_watermark_offsets.phpt diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 629336f..eaf7df2 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -196,7 +196,7 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets) zval *lowResult, *highResult; rd_kafka_resp_err_t err; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 2) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 5, 5) Z_PARAM_STRING(topic, topic_length) Z_PARAM_LONG(partition) Z_PARAM_ZVAL(lowResult) diff --git a/tests/query_watermark_offsets.phpt b/tests/query_watermark_offsets.phpt new file mode 100644 index 0000000..39ca66e --- /dev/null +++ b/tests/query_watermark_offsets.phpt @@ -0,0 +1,36 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('client.id', 'pure-php-producer'); +$conf->set('metadata.broker.list', 'kafka:9096'); +$conf->set('compression.codec', 'snappy'); +$conf->set('message.timeout.ms', '5000'); + +$producer = new Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic-watermark'); +$time = time(); +$topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + 'special-message', + 'special-key', + [ + 'special-header' => 'awesome' + ] +); +$result = $producer->flush(20000); +$high = 0; +$low = 0; +$result = $producer->queryWatermarkOffsets('pure-php-test-topic-watermark', 0,$low, $high, 10000); +var_dump($low); +var_dump($high); +--EXPECT-- +int(0) +int(1) From ba516015768819275dc854d50df3767a01c07aed Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 22:22:55 +0200 Subject: [PATCH 5/6] fix test --- tests/query_watermark_offsets.phpt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/query_watermark_offsets.phpt b/tests/query_watermark_offsets.phpt index 39ca66e..fc72446 100644 --- a/tests/query_watermark_offsets.phpt +++ b/tests/query_watermark_offsets.phpt @@ -7,13 +7,13 @@ require __DIR__ . '/integration-tests-check.php'; set('client.id', 'pure-php-producer'); $conf->set('metadata.broker.list', 'kafka:9096'); $conf->set('compression.codec', 'snappy'); $conf->set('message.timeout.ms', '5000'); -$producer = new Producer($conf); +$producer = new SimpleKafkaClient\Producer($conf); $topic = $producer->getTopicHandle('pure-php-test-topic-watermark'); $time = time(); $topic->producev( From e2f9345a121c441592c6e77e5bf4a6ad4cba6649 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 24 Apr 2021 22:26:13 +0200 Subject: [PATCH 6/6] fix test --- tests/query_watermark_offsets.phpt | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/query_watermark_offsets.phpt b/tests/query_watermark_offsets.phpt index fc72446..2cba0bb 100644 --- a/tests/query_watermark_offsets.phpt +++ b/tests/query_watermark_offsets.phpt @@ -9,13 +9,10 @@ require __DIR__ . '/integration-tests-check.php'; $conf = new SimpleKafkaClient\Configuration(); $conf->set('client.id', 'pure-php-producer'); -$conf->set('metadata.broker.list', 'kafka:9096'); -$conf->set('compression.codec', 'snappy'); -$conf->set('message.timeout.ms', '5000'); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); $producer = new SimpleKafkaClient\Producer($conf); $topic = $producer->getTopicHandle('pure-php-test-topic-watermark'); -$time = time(); $topic->producev( RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full