diff --git a/consumer.c b/consumer.c index 44a219c..0f86588 100644 --- a/consumer.c +++ b/consumer.c @@ -40,18 +40,12 @@ #include "Zend/zend_exceptions.h" #include "consumer_arginfo.h" -typedef struct _object_intern { - rd_kafka_t *rk; - kafka_conf_callbacks cbs; - zend_object std; -} object_intern; - static zend_class_entry * ce; static zend_object_handlers handlers; static void kafka_consumer_free(zend_object *object) /* {{{ */ { - object_intern *intern = php_kafka_from_obj(object_intern, object); + kafka_object *intern = php_kafka_from_obj(kafka_object, object); rd_kafka_resp_err_t err; kafka_conf_callbacks_dtor(&intern->cbs); @@ -75,9 +69,9 @@ static void kafka_consumer_free(zend_object *object) /* {{{ */ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */ { zend_object* retval; - object_intern *intern; + kafka_object *intern; - intern = ecalloc(1, sizeof(object_intern)+ zend_object_properties_size(class_type)); + intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type)); zend_object_std_init(&intern->std, class_type); object_properties_init(&intern->std, class_type); @@ -88,18 +82,6 @@ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */ } /* }}} */ -static object_intern * get_object(zval *zconsumer) /* {{{ */ -{ - object_intern *oconsumer = Z_KAFKA_P(object_intern, zconsumer); - - if (!oconsumer->rk) { - zend_throw_exception_ex(NULL, 0, "SimpleKafkaClient\\Consumer::__construct() has not been called"); - return NULL; - } - - return oconsumer; -} /* }}} */ - static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */ size_t len; @@ -125,7 +107,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct) zval *zconf; char errstr[512]; rd_kafka_t *rk; - object_intern *intern; + kafka_object *intern; kafka_conf_object *conf_intern; rd_kafka_conf_t *conf = NULL; @@ -133,7 +115,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct) Z_PARAM_OBJECT_OF_CLASS(zconf, ce_kafka_conf) ZEND_PARSE_PARAMETERS_END(); - intern = Z_KAFKA_P(object_intern, getThis()); + intern = Z_KAFKA_P(kafka_object, getThis()); conf_intern = get_kafka_conf_object(zconf); if (conf_intern) { @@ -175,7 +157,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign) HashTable *htopars = NULL; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; - object_intern *intern; + kafka_object *intern; if (zend_parse_parameters(ZEND_NUM_ARGS(), "|h!", &htopars) == FAILURE) { return; @@ -186,7 +168,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign) Z_PARAM_ARRAY_HT(htopars) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -219,12 +201,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment) { rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; - object_intern *intern; + kafka_object *intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -247,7 +229,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe) { HashTable *htopics; HashPosition pos; - object_intern *intern; + kafka_object *intern; rd_kafka_topic_partition_list_t *topics; rd_kafka_resp_err_t err; zval *zv; @@ -256,7 +238,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe) Z_PARAM_ARRAY_HT(htopics) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -287,13 +269,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription) { rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; - object_intern *intern; + kafka_object *intern; int i; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -319,13 +301,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription) Unsubscribe from the current subscription set */ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe) { - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -343,7 +325,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe) Consume message or get error event, triggers callbacks */ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) { - object_intern *intern; + kafka_object *intern; zend_long timeout_ms; rd_kafka_message_t *rkmessage, rkmessage_tmp = {0}; @@ -351,7 +333,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) Z_PARAM_LONG(timeout_ms) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -374,7 +356,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume) static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ { zval *zarg = NULL; - object_intern *intern; + kafka_object *intern; rd_kafka_topic_partition_list_t *offsets = NULL; rd_kafka_resp_err_t err; @@ -383,7 +365,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */ Z_PARAM_ZVAL(zarg) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -476,12 +458,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync) Close connection */ ZEND_METHOD(SimpleKafkaClient_Consumer, close) { - object_intern *intern; + kafka_object *intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -499,7 +481,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata) zval *only_zrkt = NULL; zend_long timeout_ms; rd_kafka_resp_err_t err; - object_intern *intern; + kafka_object *intern; const rd_kafka_metadata_t *metadata; kafka_topic_object *only_orkt = NULL; @@ -510,7 +492,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata) Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -540,14 +522,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle) char *topic; size_t topic_len; rd_kafka_topic_t *rkt; - object_intern *intern; + kafka_object *intern; kafka_topic_object *topic_intern; ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1) Z_PARAM_STRING(topic, topic_len) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -577,7 +559,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) { HashTable *htopars = NULL; zend_long timeout_ms; - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; @@ -586,7 +568,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) Z_PARAM_LONG(timeout_ms) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -615,7 +597,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets) ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) { HashTable *htopars = NULL; - object_intern *intern; + kafka_object *intern; rd_kafka_resp_err_t err; rd_kafka_topic_partition_list_t *topics; @@ -623,7 +605,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) Z_PARAM_ARRAY_HT(htopars) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -650,7 +632,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions) ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes) { HashTable *htopars = NULL; - object_intern *intern; + kafka_object *intern; rd_kafka_topic_partition_list_t *topicPartitions; zend_long timeout_ms; rd_kafka_resp_err_t err; @@ -660,7 +642,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes) Z_PARAM_LONG(timeout_ms) ZEND_PARSE_PARAMETERS_END(); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -686,7 +668,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes) Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets) { - object_intern *intern; + kafka_object *intern; char *topic; size_t topic_length; long low, high; @@ -705,7 +687,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets) ZVAL_DEREF(lowResult); ZVAL_DEREF(highResult); - intern = get_object(getThis()); + intern = get_kafka_object(getThis()); if (!intern) { return; } @@ -732,5 +714,5 @@ void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */ handlers = kafka_default_object_handlers; handlers.free_obj = kafka_consumer_free; - handlers.offset = XtOffsetOf(object_intern, std); + handlers.offset = XtOffsetOf(kafka_object, std); } diff --git a/tests/produce_consume_transactional.phpt b/tests/produce_consume_transactional.phpt index f8c40b5..9ff2c47 100644 --- a/tests/produce_consume_transactional.phpt +++ b/tests/produce_consume_transactional.phpt @@ -37,7 +37,7 @@ $topicName = sprintf("test_kafka_%s", uniqid()); $topic = $producer->getTopicHandle($topicName); -if (!$producer->getMetadata(false, 2*1000, $topic)) { +if (!$producer->getMetadata(false, 5*1000, $topic)) { echo "Failed to get metadata, is broker down?\n"; }