From 2d9c03dbdf8182946f5a0515dee09aaa920407dc Mon Sep 17 00:00:00 2001
From: nick <nickjobszh@gmail.com>
Date: Sat, 24 Apr 2021 18:43:23 +0200
Subject: [PATCH 1/2] refactor object usage

---
 consumer.c | 86 +++++++++++++++++++++---------------------------------
 1 file changed, 34 insertions(+), 52 deletions(-)

diff --git a/consumer.c b/consumer.c
index 44a219c..0f86588 100644
--- a/consumer.c
+++ b/consumer.c
@@ -40,18 +40,12 @@
 #include "Zend/zend_exceptions.h"
 #include "consumer_arginfo.h"
 
-typedef struct _object_intern {
-    rd_kafka_t              *rk;
-    kafka_conf_callbacks    cbs;
-    zend_object             std;
-} object_intern;
-
 static zend_class_entry * ce;
 static zend_object_handlers handlers;
 
 static void kafka_consumer_free(zend_object *object) /* {{{ */
 {
-    object_intern *intern = php_kafka_from_obj(object_intern, object);
+    kafka_object *intern = php_kafka_from_obj(kafka_object, object);
     rd_kafka_resp_err_t err;
     kafka_conf_callbacks_dtor(&intern->cbs);
 
@@ -75,9 +69,9 @@ static void kafka_consumer_free(zend_object *object) /* {{{ */
 static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
 {
     zend_object* retval;
-    object_intern *intern;
+    kafka_object *intern;
 
-    intern = ecalloc(1, sizeof(object_intern)+ zend_object_properties_size(class_type));
+    intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type));
     zend_object_std_init(&intern->std, class_type);
     object_properties_init(&intern->std, class_type);
 
@@ -88,18 +82,6 @@ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
 }
 /* }}} */
 
-static object_intern * get_object(zval *zconsumer) /* {{{ */
-{
-    object_intern *oconsumer = Z_KAFKA_P(object_intern, zconsumer);
-
-    if (!oconsumer->rk) {
-        zend_throw_exception_ex(NULL, 0, "SimpleKafkaClient\\Consumer::__construct() has not been called");
-        return NULL;
-    }
-
-    return oconsumer;
-} /* }}} */
-
 static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */
 
     size_t len;
@@ -125,7 +107,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct)
     zval *zconf;
     char errstr[512];
     rd_kafka_t *rk;
-    object_intern *intern;
+    kafka_object *intern;
     kafka_conf_object *conf_intern;
     rd_kafka_conf_t *conf = NULL;
 
@@ -133,7 +115,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct)
         Z_PARAM_OBJECT_OF_CLASS(zconf, ce_kafka_conf)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = Z_KAFKA_P(object_intern, getThis());
+    intern = Z_KAFKA_P(kafka_object, getThis());
 
     conf_intern = get_kafka_conf_object(zconf);
     if (conf_intern) {
@@ -175,7 +157,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
     HashTable *htopars = NULL;
     rd_kafka_topic_partition_list_t *topics;
     rd_kafka_resp_err_t err;
-    object_intern *intern;
+    kafka_object *intern;
 
     if (zend_parse_parameters(ZEND_NUM_ARGS(), "|h!", &htopars) == FAILURE) {
         return;
@@ -186,7 +168,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
         Z_PARAM_ARRAY_HT(htopars)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -219,12 +201,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment)
 {
     rd_kafka_resp_err_t err;
     rd_kafka_topic_partition_list_t *topics;
-    object_intern *intern;
+    kafka_object *intern;
 
 	ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
 	ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -247,7 +229,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
 {
     HashTable *htopics;
     HashPosition pos;
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_topic_partition_list_t *topics;
     rd_kafka_resp_err_t err;
     zval *zv;
@@ -256,7 +238,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
         Z_PARAM_ARRAY_HT(htopics)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -287,13 +269,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
 {
     rd_kafka_resp_err_t err;
     rd_kafka_topic_partition_list_t *topics;
-    object_intern *intern;
+    kafka_object *intern;
     int i;
 
 	ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
 	ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -319,13 +301,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
     Unsubscribe from the current subscription set */
 ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
 {
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_resp_err_t err;
 
 	ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
 	ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -343,7 +325,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
    Consume message or get error event, triggers callbacks */
 ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
 {
-    object_intern *intern;
+    kafka_object *intern;
     zend_long timeout_ms;
     rd_kafka_message_t *rkmessage, rkmessage_tmp = {0};
 
@@ -351,7 +333,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
 		Z_PARAM_LONG(timeout_ms)
 	ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -374,7 +356,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
 static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
 {
     zval *zarg = NULL;
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_topic_partition_list_t *offsets = NULL;
     rd_kafka_resp_err_t err;
 
@@ -383,7 +365,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
 		Z_PARAM_ZVAL(zarg)
 	ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -476,12 +458,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync)
    Close connection */
 ZEND_METHOD(SimpleKafkaClient_Consumer, close)
 {
-    object_intern *intern;
+    kafka_object *intern;
 
     ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -499,7 +481,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
     zval *only_zrkt = NULL;
     zend_long timeout_ms;
     rd_kafka_resp_err_t err;
-    object_intern *intern;
+    kafka_object *intern;
     const rd_kafka_metadata_t *metadata;
     kafka_topic_object *only_orkt = NULL;
 
@@ -510,7 +492,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
         Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -540,14 +522,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle)
     char *topic;
     size_t topic_len;
     rd_kafka_topic_t *rkt;
-    object_intern *intern;
+    kafka_object *intern;
     kafka_topic_object *topic_intern;
 
     ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
         Z_PARAM_STRING(topic, topic_len)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -577,7 +559,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
 {
     HashTable *htopars = NULL;
     zend_long timeout_ms;
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_resp_err_t err;
     rd_kafka_topic_partition_list_t *topics;
 
@@ -586,7 +568,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
         Z_PARAM_LONG(timeout_ms)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -615,7 +597,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
 ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
 {
     HashTable *htopars = NULL;
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_resp_err_t err;
     rd_kafka_topic_partition_list_t *topics;
 
@@ -623,7 +605,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
         Z_PARAM_ARRAY_HT(htopars)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -650,7 +632,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
 ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
 {
     HashTable *htopars = NULL;
-    object_intern *intern;
+    kafka_object *intern;
     rd_kafka_topic_partition_list_t *topicPartitions;
     zend_long timeout_ms;
     rd_kafka_resp_err_t err;
@@ -660,7 +642,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
         Z_PARAM_LONG(timeout_ms)
     ZEND_PARSE_PARAMETERS_END();
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -686,7 +668,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
    Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
 ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
 {
-    object_intern *intern;
+    kafka_object *intern;
     char *topic;
     size_t topic_length;
     long low, high;
@@ -705,7 +687,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
     ZVAL_DEREF(lowResult);
     ZVAL_DEREF(highResult);
 
-    intern = get_object(getThis());
+    intern = get_kafka_object(getThis());
     if (!intern) {
         return;
     }
@@ -732,5 +714,5 @@ void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */
 
     handlers = kafka_default_object_handlers;
     handlers.free_obj = kafka_consumer_free;
-    handlers.offset = XtOffsetOf(object_intern, std);
+    handlers.offset = XtOffsetOf(kafka_object, std);
 }

From bf6969b3d9ece2add64e2ead05edb9fb595f1b6a Mon Sep 17 00:00:00 2001
From: nick <nickjobszh@gmail.com>
Date: Sat, 24 Apr 2021 18:47:05 +0200
Subject: [PATCH 2/2] increase timeout

---
 tests/produce_consume_transactional.phpt | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/produce_consume_transactional.phpt b/tests/produce_consume_transactional.phpt
index f8c40b5..9ff2c47 100644
--- a/tests/produce_consume_transactional.phpt
+++ b/tests/produce_consume_transactional.phpt
@@ -37,7 +37,7 @@ $topicName = sprintf("test_kafka_%s", uniqid());
 
 $topic = $producer->getTopicHandle($topicName);
 
-if (!$producer->getMetadata(false, 2*1000, $topic)) {
+if (!$producer->getMetadata(false, 5*1000, $topic)) {
     echo "Failed to get metadata, is broker down?\n";
 }