diff --git a/consumer.c b/consumer.c index edc2f68..44a219c 100644 --- a/consumer.c +++ b/consumer.c @@ -339,7 +339,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe) } /* }}} */ -/* {{{ proto Message SimpleKafkaClient\Consumer::consume() +/* {{{ proto Message SimpleKafkaClient\Consumer::consume(int $timeoutMs) Consume message or get error event, triggers callbacks */ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) { diff --git a/consumer.stub.php b/consumer.stub.php index 83ee8d7..d4d3bea 100644 --- a/consumer.stub.php +++ b/consumer.stub.php @@ -8,7 +8,7 @@ class Consumer { public function __construct(Configuration $configuration) {} - public function assign(array $topics): void {} + public function assign(?array $topics): void {} public function getAssignment(): array {} diff --git a/consumer_arginfo.h b/consumer_arginfo.h index 6ce7cfe..dbcb3a8 100644 --- a/consumer_arginfo.h +++ b/consumer_arginfo.h @@ -1,18 +1,20 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: b42d1bb68767786f0b655714b388a3361a45551d */ + * Stub hash: ba3bc0a741bc6eab7a23a15ca6d83c24e99b23de */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 1, IS_VOID, 0) - ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0) + ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 1) ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getAssignment, 0, 0, IS_ARRAY, 0) ZEND_END_ARG_INFO() -#define arginfo_class_SimpleKafkaClient_Consumer_subscribe arginfo_class_SimpleKafkaClient_Consumer_assign +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_subscribe, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 0) +ZEND_END_ARG_INFO() #define arginfo_class_SimpleKafkaClient_Consumer_getSubscription arginfo_class_SimpleKafkaClient_Consumer_getAssignment diff --git a/kafka.stub.php b/kafka.stub.php index 8de4102..d0fbe3f 100644 --- a/kafka.stub.php +++ b/kafka.stub.php @@ -8,8 +8,6 @@ abstract class Kafka { public function getMetadata(bool $allTopics, int $timeoutMs, Topic $topic): Metadata {} - public function getTopicHandle(string $topic): Topic {} - public function getOutQLen(): int {} public function poll(int $timeoutMs): int {} diff --git a/kafka_arginfo.h b/kafka_arginfo.h index 5a0e399..d4c9478 100644 --- a/kafka_arginfo.h +++ b/kafka_arginfo.h @@ -7,10 +7,6 @@ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKaf ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0) ZEND_END_ARG_INFO() -ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getTopicHandle, 0, 1, SimpleKafkaClient\\Topic, 0) - ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) -ZEND_END_ARG_INFO() - ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, 0, 0, IS_LONG, 0) ZEND_END_ARG_INFO() @@ -33,7 +29,6 @@ ZEND_END_ARG_INFO() ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata); -ZEND_METHOD(SimpleKafkaClient_Kafka, getTopicHandle); ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen); ZEND_METHOD(SimpleKafkaClient_Kafka, poll); ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets); @@ -42,7 +37,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes); static const zend_function_entry class_SimpleKafkaClient_SimpleKafkaClient_methods[] = { ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, ZEND_ACC_PUBLIC) - ZEND_ME(SimpleKafkaClient_Kafka, getTopicHandle, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, ZEND_ACC_PUBLIC) diff --git a/producer.c b/producer.c index a518810..1c012fb 100644 --- a/producer.c +++ b/producer.c @@ -140,6 +140,50 @@ ZEND_METHOD(SimpleKafkaClient_Producer, purge) } /* }}} */ + +/* {{{ proto SimpleKafkaClient\ProducerTopic SimpleKafkaClient\Producer::getTopicHandle(string $topic) + Returns an SimpleKafkaClient\ProducerTopic object */ +ZEND_METHOD(SimpleKafkaClient_Producer, getTopicHandle) +{ + char *topic; + size_t topic_len; + rd_kafka_topic_t *rkt; + kafka_object *intern; + kafka_topic_object *topic_intern; + + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) + Z_PARAM_STRING(topic, topic_len) + ZEND_PARSE_PARAMETERS_END(); + + intern = get_kafka_object(getThis()); + if (!intern) { + return; + } + + rkt = rd_kafka_topic_new(intern->rk, topic, NULL); + + if (!rkt) { + return; + } + + if (object_init_ex(return_value, ce_kafka_producer_topic) != SUCCESS) { + return; + } + + topic_intern = Z_KAFKA_P(kafka_topic_object, return_value); + if (!topic_intern) { + return; + } + + topic_intern->rkt = rkt; + topic_intern->zrk = *getThis(); + + Z_ADDREF_P(&topic_intern->zrk); + + zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern); +} +/* }}} */ + /* {{{ proto int SimpleKafkaClient\Producer::initTransactions(int timeout_ms) Initializes transactions, needs to be done before producing and starting a transaction */ ZEND_METHOD(SimpleKafkaClient_Producer, initTransactions) diff --git a/producer.stub.php b/producer.stub.php index 8997695..f193737 100644 --- a/producer.stub.php +++ b/producer.stub.php @@ -19,4 +19,6 @@ public function abortTransaction(int $timeoutMs): void {} public function flush(int $timeoutMs): int {} public function purge(int $purgeFlags): int {} + + public function getTopicHandle(string $topic): ProducerTopic {} } diff --git a/producer_arginfo.h b/producer_arginfo.h index be10f45..28b0435 100644 --- a/producer_arginfo.h +++ b/producer_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: b31742490463035fecff0a1cee01effed6e3e1d6 */ + * Stub hash: ae03dd8127a9e4799e241bc490de200ff18a4178 */ ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Producer___construct, 0, 0, 1) ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0) @@ -24,6 +24,10 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer ZEND_ARG_TYPE_INFO(0, purgeFlags, IS_LONG, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Producer_getTopicHandle, 0, 1, SimpleKafkaClient\\ProducerTopic, 0) + ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0) +ZEND_END_ARG_INFO() + ZEND_METHOD(SimpleKafkaClient_Producer, __construct); ZEND_METHOD(SimpleKafkaClient_Producer, initTransactions); @@ -32,6 +36,7 @@ ZEND_METHOD(SimpleKafkaClient_Producer, commitTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, abortTransaction); ZEND_METHOD(SimpleKafkaClient_Producer, flush); ZEND_METHOD(SimpleKafkaClient_Producer, purge); +ZEND_METHOD(SimpleKafkaClient_Producer, getTopicHandle); static const zend_function_entry class_SimpleKafkaClient_Producer_methods[] = { @@ -42,5 +47,6 @@ static const zend_function_entry class_SimpleKafkaClient_Producer_methods[] = { ZEND_ME(SimpleKafkaClient_Producer, abortTransaction, arginfo_class_SimpleKafkaClient_Producer_abortTransaction, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, flush, arginfo_class_SimpleKafkaClient_Producer_flush, ZEND_ACC_PUBLIC) ZEND_ME(SimpleKafkaClient_Producer, purge, arginfo_class_SimpleKafkaClient_Producer_purge, ZEND_ACC_PUBLIC) + ZEND_ME(SimpleKafkaClient_Producer, getTopicHandle, arginfo_class_SimpleKafkaClient_Producer_getTopicHandle, ZEND_ACC_PUBLIC) ZEND_FE_END }; diff --git a/simple_kafka_client.c b/simple_kafka_client.c index 3f35e3a..2511300 100644 --- a/simple_kafka_client.c +++ b/simple_kafka_client.c @@ -155,49 +155,6 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata) } /* }}} */ -/* {{{ proto SimpleKafkaClient\Topic SimpleKafkaClient\Kafka::getTopicHandle(string $topic) - Returns an SimpleKafkaClient\Topic object */ -ZEND_METHOD(SimpleKafkaClient_Kafka, getTopicHandle) -{ - char *topic; - size_t topic_len; - rd_kafka_topic_t *rkt; - kafka_object *intern; - kafka_topic_object *topic_intern; - - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) - Z_PARAM_STRING(topic, topic_len) - ZEND_PARSE_PARAMETERS_END(); - - intern = get_kafka_object(getThis()); - if (!intern) { - return; - } - - rkt = rd_kafka_topic_new(intern->rk, topic, NULL); - - if (!rkt) { - return; - } - - if (object_init_ex(return_value, ce_kafka_producer_topic) != SUCCESS) { - return; - } - - topic_intern = Z_KAFKA_P(kafka_topic_object, return_value); - if (!topic_intern) { - return; - } - - topic_intern->rkt = rkt; - topic_intern->zrk = *getThis(); - - Z_ADDREF_P(&topic_intern->zrk); - - zend_hash_index_add_ptr(&intern->topics, (zend_ulong)topic_intern, topic_intern); -} -/* }}} */ - /* {{{ proto int SimpleKafkaClient\Kafka::getOutQLen() Returns the current out queue length */ ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen)