diff --git a/consumer.c b/consumer.c index 0f86588..19a5b5f 100644 --- a/consumer.c +++ b/consumer.c @@ -40,47 +40,7 @@ #include "Zend/zend_exceptions.h" #include "consumer_arginfo.h" -static zend_class_entry * ce; -static zend_object_handlers handlers; - -static void kafka_consumer_free(zend_object *object) /* {{{ */ -{ - kafka_object *intern = php_kafka_from_obj(kafka_object, object); - rd_kafka_resp_err_t err; - kafka_conf_callbacks_dtor(&intern->cbs); - - if (intern->rk) { - err = rd_kafka_consumer_close(intern->rk); - - if (err) { - php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err)); - } - - rd_kafka_destroy(intern->rk); - intern->rk = NULL; - } - - kafka_conf_callbacks_dtor(&intern->cbs); - - zend_object_std_dtor(&intern->std); -} -/* }}} */ - -static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */ -{ - zend_object* retval; - kafka_object *intern; - - intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type)); - zend_object_std_init(&intern->std, class_type); - object_properties_init(&intern->std, class_type); - - retval = &intern->std; - retval->handlers = &handlers; - - return retval; -} -/* }}} */ +zend_class_entry * ce_kafka_consumer; static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */ @@ -165,7 +125,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign) ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1) Z_PARAM_OPTIONAL - Z_PARAM_ARRAY_HT(htopars) + Z_PARAM_ARRAY_HT_OR_NULL(htopars) ZEND_PARSE_PARAMETERS_END(); intern = get_kafka_object(getThis()); @@ -703,16 +663,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets) ZVAL_LONG(highResult, high); } /* }}} */ - -void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */ -{ - zend_class_entry tmpce; - - INIT_NS_CLASS_ENTRY(tmpce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods); - ce = zend_register_internal_class(&tmpce); - ce->create_object = kafka_consumer_new; - - handlers = kafka_default_object_handlers; - handlers.free_obj = kafka_consumer_free; - handlers.offset = XtOffsetOf(kafka_object, std); -} diff --git a/consumer.stub.php b/consumer.stub.php index d4d3bea..37e20f6 100644 --- a/consumer.stub.php +++ b/consumer.stub.php @@ -8,7 +8,7 @@ class Consumer { public function __construct(Configuration $configuration) {} - public function assign(?array $topics): void {} + public function assign(?array $topics = null): void {} public function getAssignment(): array {} diff --git a/consumer_arginfo.h b/consumer_arginfo.h index dbcb3a8..0691ebe 100644 --- a/consumer_arginfo.h +++ b/consumer_arginfo.h @@ -1,12 +1,12 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: ba3bc0a741bc6eab7a23a15ca6d83c24e99b23de */ + * Stub hash: 091c6b60081bb08ec174ef87b9cc6d2b3fbba461 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 1, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 1) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 0, IS_VOID, 0) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topics, IS_ARRAY, 1, "null") ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getAssignment, 0, 0, IS_ARRAY, 0) diff --git a/kafka_arginfo.h b/kafka_arginfo.h index d4c9478..0b7cd60 100644 --- a/kafka_arginfo.h +++ b/kafka_arginfo.h @@ -1,20 +1,20 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: aac20095e4ad448dfdc0f3a25d87cbb17f9f1581 */ + * Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */ -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) +ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0) ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0) ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, 0, 0, IS_LONG, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, 0, 1, IS_LONG, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0) ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, 0, 5, IS_VOID, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0) ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0) ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0) @@ -22,7 +22,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKa ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, 0, 2, IS_ARRAY, 0) +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0) ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0) ZEND_END_ARG_INFO() @@ -35,11 +35,11 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); -static const zend_function_entry class_SimpleKafkaClient_SimpleKafkaClient_methods[] = { - ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, ZEND_ACC_PUBLIC) +static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = { + ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/metadata_arginfo.h b/metadata_arginfo.h index d8bbf75..5fde003 100644 --- a/metadata_arginfo.h +++ b/metadata_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: da83d0319c899361606dfa0ccf0fd439aeeabfbb */ + * Stub hash: cbb5ab5aee4d07e0673bef67dcc2d045303ebfbd */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_getOrigBrokerId, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() diff --git a/metadata_partition_arginfo.h b/metadata_partition_arginfo.h index 1eadd7f..53d7639 100644 --- a/metadata_partition_arginfo.h +++ b/metadata_partition_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 934cef11a377e54b4d5f8cea75e6d590ec071d50 */ + * Stub hash: 207c49cb01d8b564c1419d2c24d332cc321420f5 */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Partition_getId, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() diff --git a/metadata_topic_arginfo.h b/metadata_topic_arginfo.h index 08a5fb0..2a77930 100644 --- a/metadata_topic_arginfo.h +++ b/metadata_topic_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 9d73f729b3dca2b6ac7fd5fdc39ba23d768ca792 */ + * Stub hash: db8552307bc3c0d4d6035ff10c00b7e2a39a152a */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Topic_getName, 0, 0, IS_STRING, 0) ZEND_END_ARG_INFO() diff --git a/php_simple_kafka_client_int.h b/php_simple_kafka_client_int.h index 39e26d6..c68e314 100644 --- a/php_simple_kafka_client_int.h +++ b/php_simple_kafka_client_int.h @@ -123,6 +123,7 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #endif extern zend_class_entry * ce_kafka_conf; +extern zend_class_entry * ce_kafka_consumer; extern zend_class_entry * ce_kafka_error_exception; extern zend_class_entry * ce_kafka_exception; extern zend_class_entry * ce_kafka_producer; @@ -189,7 +190,6 @@ static inline char *kafka_hash_get_current_key_ex(HashTable *ht, HashPosition *p void kafka_error_init(); void create_kafka_error(zval *return_value, const rd_kafka_error_t *error); void kafka_conf_init(INIT_FUNC_ARGS); -void kafka_consumer_init(INIT_FUNC_ARGS); void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs); void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from); void kafka_message_init(INIT_FUNC_ARGS); diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 98e116b..5b77132 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -42,6 +42,7 @@ #include "ext/standard/info.h" #include "php_simple_kafka_client_int.h" #include "Zend/zend_exceptions.h" +#include "consumer_arginfo.h" #include "functions_arginfo.h" #include "producer_arginfo.h" #include "kafka_arginfo.h" @@ -66,7 +67,17 @@ static void kafka_free(zend_object *object) /* {{{ */ kafka_object *intern = php_kafka_from_obj(kafka_object, object); if (intern->rk) { - zend_hash_destroy(&intern->topics); + if (RD_KAFKA_CONSUMER == intern->type) { + rd_kafka_resp_err_t err; + + err = rd_kafka_consumer_close(intern->rk); + + if (err) { + php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err)); + } + } else if (RD_KAFKA_PRODUCER == intern->type) { + zend_hash_destroy(&intern->topics); + } rd_kafka_destroy(intern->rk); intern->rk = NULL; @@ -332,7 +343,7 @@ PHP_MINIT_FUNCTION(simple_kafka_client) kafka_object_handlers.free_obj = kafka_free; kafka_object_handlers.offset = XtOffsetOf(kafka_object, std); - INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_SimpleKafkaClient_methods); + INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_Kafka_methods); ce_kafka = zend_register_internal_class(&ce); ce_kafka->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS; ce_kafka->create_object = kafka_new; @@ -340,9 +351,12 @@ PHP_MINIT_FUNCTION(simple_kafka_client) INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Producer", class_SimpleKafkaClient_Producer_methods); ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka); + INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods); + ce_kafka_consumer = zend_register_internal_class(&ce); + ce_kafka_consumer->create_object = kafka_new; + kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU); kafka_error_init(); - kafka_consumer_init(INIT_FUNC_ARGS_PASSTHRU); kafka_message_init(INIT_FUNC_ARGS_PASSTHRU); kafka_metadata_init(INIT_FUNC_ARGS_PASSTHRU); kafka_metadata_topic_partition_init(INIT_FUNC_ARGS_PASSTHRU); diff --git a/tests/conf_callbacks_integration.phpt b/tests/conf_callbacks_integration.phpt index d909066..d349ed6 100644 --- a/tests/conf_callbacks_integration.phpt +++ b/tests/conf_callbacks_integration.phpt @@ -42,8 +42,9 @@ $conf->set('statistics.interval.ms', 10); $conf->set('log_level', (string) LOG_DEBUG); $conf->set('debug', 'all'); -$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) { - echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n"; +$offsetCommitCount = 0; +$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) use (&$offsetCommitCount) { + ++$offsetCommitCount; }); $statsCbCalled = false; @@ -102,22 +103,14 @@ while (true) { $consumer->commit($msg); } +var_dump($offsetCommitCount); var_dump($statsCbCalled); var_dump($logCbCalled); var_dump($topicsAssigned); var_dump($delivered); --EXPECT-- -Offset 1 committed. -Offset 2 committed. -Offset 3 committed. -Offset 4 committed. -Offset 5 committed. -Offset 6 committed. -Offset 7 committed. -Offset 8 committed. -Offset 9 committed. -Offset 10 committed. +int(10) bool(true) bool(true) bool(true) diff --git a/topic_partition_arginfo.h b/topic_partition_arginfo.h index 5b6b556..334f3e3 100644 --- a/topic_partition_arginfo.h +++ b/topic_partition_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 72b2c9a25e8751ae022cc233f4b7a0e382be72f8 */ + * Stub hash: 95f09c698079d00927dd2d02910325d6aff76157 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition___construct, 0, 0, 2) ZEND_ARG_TYPE_INFO(0, topicName, IS_STRING, 0)