Skip to content

Commit d6f4d16

Browse files
authored
Add opaque on produce(), producev() (#424)
1 parent c7e8ffe commit d6f4d16

16 files changed

+416
-36
lines changed

conf.c

+15-16
Original file line numberDiff line numberDiff line change
@@ -161,29 +161,28 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi
161161
zval_ptr_dtor(&args[2]);
162162
}
163163

164-
static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
164+
void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
165165
{
166166
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
167+
zend_string *msg_opaque = msg->_private;
167168
zval args[2];
168169

169-
if (!opaque) {
170-
return;
171-
}
170+
if (cbs != NULL && cbs->dr_msg) {
171+
ZVAL_NULL(&args[0]);
172+
ZVAL_NULL(&args[1]);
172173

173-
if (!cbs->dr_msg) {
174-
return;
175-
}
174+
ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
175+
kafka_message_new(&args[1], msg, msg_opaque);
176176

177-
ZVAL_NULL(&args[0]);
178-
ZVAL_NULL(&args[1]);
177+
rdkafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args);
179178

180-
ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
181-
kafka_message_new(&args[1], msg);
182-
183-
rdkafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args);
179+
zval_ptr_dtor(&args[0]);
180+
zval_ptr_dtor(&args[1]);
181+
}
184182

185-
zval_ptr_dtor(&args[0]);
186-
zval_ptr_dtor(&args[1]);
183+
if (msg_opaque != NULL) {
184+
zend_string_release(msg_opaque);
185+
}
187186
}
188187

189188
static int kafka_conf_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
@@ -267,7 +266,7 @@ static void kafka_conf_consume_cb(rd_kafka_message_t *msg, void *opaque)
267266
ZVAL_NULL(&args[0]);
268267
ZVAL_NULL(&args[1]);
269268

270-
kafka_message_new(&args[0], msg);
269+
kafka_message_new(&args[0], msg, NULL);
271270
ZVAL_ZVAL(&args[1], &cbs->zrk, 1, 0);
272271

273272

conf.h

+2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ void kafka_conf_minit(INIT_FUNC_ARGS);
6464
void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs);
6565
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from);
6666

67+
void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque);
68+
6769
extern zend_class_entry * ce_kafka_conf;
6870
extern zend_class_entry * ce_kafka_topic_conf;
6971

kafka_consumer.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ PHP_METHOD(RdKafka__KafkaConsumer, consume)
392392
rkmessage = &rkmessage_tmp;
393393
}
394394

395-
kafka_message_new(return_value, rkmessage);
395+
kafka_message_new(return_value, rkmessage, NULL);
396396

397397
if (rkmessage != &rkmessage_tmp) {
398398
rd_kafka_message_destroy(rkmessage);

message.c

+7-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
zend_class_entry * ce_kafka_message;
3434

35-
void kafka_message_new(zval *return_value, const rd_kafka_message_t *message)
35+
void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, zend_string *msg_opaque)
3636
{
3737
object_init_ex(return_value, ce_kafka_message);
3838

@@ -84,6 +84,10 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message)
8484
}
8585
}
8686
#endif
87+
88+
if (msg_opaque != NULL) {
89+
zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque);
90+
}
8791
}
8892

8993
void kafka_message_list_to_array(zval *return_value, rd_kafka_message_t **messages, long size) /* {{{ */
@@ -97,7 +101,7 @@ void kafka_message_list_to_array(zval *return_value, rd_kafka_message_t **messag
97101
for (i = 0; i < size; i++) {
98102
msg = messages[i];
99103
ZVAL_NULL(&zmsg);
100-
kafka_message_new(&zmsg, msg);
104+
kafka_message_new(&zmsg, msg, NULL);
101105
add_next_index_zval(return_value, &zmsg);
102106
}
103107
} /* }}} */
@@ -161,4 +165,5 @@ void kafka_message_minit(INIT_FUNC_ARGS) { /* {{{ */
161165
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
162166
zend_declare_property_null(ce_kafka_message, ZEND_STRL("headers"), ZEND_ACC_PUBLIC);
163167
#endif
168+
zend_declare_property_null(ce_kafka_message, ZEND_STRL("opaque"), ZEND_ACC_PUBLIC);
164169
} /* }}} */

message.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818

1919
void kafka_message_minit(INIT_FUNC_ARGS);
20-
void kafka_message_new(zval *return_value, const rd_kafka_message_t *message);
20+
void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, zend_string *msg_opaque);
2121
void kafka_message_list_to_array(zval *return_value, rd_kafka_message_t **messages, long size);
2222

2323
extern zend_class_entry * ce_kafka_message;

php_rdkafka_priv.h

+20
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,26 @@
1919
#ifndef PHP_RDKAFKA_PRIV_H
2020
#define PHP_RDKAFKA_PRIV_H
2121

22+
#ifndef Z_PARAM_STRING_OR_NULL
23+
#define Z_PARAM_STRING_OR_NULL(dest, dest_len) \
24+
Z_PARAM_STRING_EX(dest, dest_len, 1, 0)
25+
#endif
26+
27+
#ifndef Z_PARAM_STR_OR_NULL
28+
#define Z_PARAM_STR_OR_NULL(dest) \
29+
Z_PARAM_STR_EX(dest, 1, 0)
30+
#endif
31+
32+
#ifndef Z_PARAM_ARRAY_HT_OR_NULL
33+
#define Z_PARAM_ARRAY_HT_OR_NULL(dest) \
34+
Z_PARAM_ARRAY_HT_EX(dest, 1, 0)
35+
#endif
36+
37+
#ifndef Z_PARAM_LONG_OR_NULL
38+
#define Z_PARAM_LONG_OR_NULL(dest, is_null) \
39+
Z_PARAM_LONG_EX(dest, is_null, 1, 0)
40+
#endif
41+
2242
#if PHP_MAJOR_VERSION >= 8
2343

2444
#define Z_RDKAFKA_OBJ zend_object

queue.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ PHP_METHOD(RdKafka__Queue, consume)
112112
return;
113113
}
114114

115-
kafka_message_new(return_value, message);
115+
kafka_message_new(return_value, message, NULL);
116116

117117
rd_kafka_message_destroy(message);
118118
}

rdkafka.c

+17-4
Original file line numberDiff line numberDiff line change
@@ -78,20 +78,27 @@ static void kafka_free(zend_object *object) /* {{{ */
7878
{
7979
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
8080

81+
kafka_conf_callbacks_dtor(&intern->cbs);
82+
8183
if (intern->rk) {
8284
if (intern->type == RD_KAFKA_CONSUMER) {
8385
stop_consuming(intern);
8486
zend_hash_destroy(&intern->consuming);
8587
zend_hash_destroy(&intern->queues);
88+
} else if (intern->type == RD_KAFKA_PRODUCER) {
89+
#ifdef HAS_RD_KAFKA_PURGE
90+
// Force internal delivery callbacks for queued messages, as we rely
91+
// on these to free msg_opaques
92+
rd_kafka_purge(intern->rk, RD_KAFKA_PURGE_F_QUEUE | RD_KAFKA_PURGE_F_INFLIGHT);
93+
rd_kafka_flush(intern->rk, 0);
94+
#endif
8695
}
8796
zend_hash_destroy(&intern->topics);
8897

8998
rd_kafka_destroy(intern->rk);
9099
intern->rk = NULL;
91100
}
92101

93-
kafka_conf_callbacks_dtor(&intern->cbs);
94-
95102
zend_object_std_dtor(&intern->std);
96103
}
97104
/* }}} */
@@ -130,11 +137,17 @@ static void kafka_init(zval *this_ptr, rd_kafka_type_t type, zval *zconf) /* {{{
130137
if (conf_intern) {
131138
conf = rd_kafka_conf_dup(conf_intern->u.conf);
132139
kafka_conf_callbacks_copy(&intern->cbs, &conf_intern->cbs);
133-
intern->cbs.zrk = *this_ptr;
134-
rd_kafka_conf_set_opaque(conf, &intern->cbs);
135140
}
136141
}
137142

143+
if (conf == NULL) {
144+
conf = rd_kafka_conf_new();
145+
}
146+
147+
intern->cbs.zrk = *this_ptr;
148+
rd_kafka_conf_set_opaque(conf, &intern->cbs);
149+
rd_kafka_conf_set_dr_msg_cb(conf, kafka_conf_dr_msg_cb);
150+
138151
rk = rd_kafka_new(type, conf, errstr, sizeof(errstr));
139152

140153
if (rk == NULL) {

tests/produce_opaque.phpt

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
--TEST--
2+
Produce with opaque
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
RD_KAFKA_BUILD_VERSION < 0x1000000 && die("skip librdkafka < 1.0.0");
7+
--FILE--
8+
<?php
9+
require __DIR__ . '/integration-tests-check.php';
10+
11+
$conf = new RdKafka\Conf();
12+
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
13+
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
14+
}
15+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
16+
17+
$opaques = [];
18+
$conf->setDrMsgCb(function ($producer, $msg) use (&$opaques) {
19+
$opaques[] = $msg->opaque;
20+
});
21+
22+
$producer = new RdKafka\Producer($conf);
23+
24+
$topicName = sprintf("test_rdkafka_%s", uniqid());
25+
26+
$topic = $producer->newTopic($topicName);
27+
28+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
29+
echo "Failed to get metadata, is broker down?\n";
30+
}
31+
32+
for ($i = 0; $i < 10; $i++) {
33+
$topic->produce(0, 0, "message $i", null, "opaque $i");
34+
}
35+
36+
$producer->flush(10*1000);
37+
38+
var_dump($opaques);
39+
--EXPECT--
40+
array(10) {
41+
[0]=>
42+
string(8) "opaque 0"
43+
[1]=>
44+
string(8) "opaque 1"
45+
[2]=>
46+
string(8) "opaque 2"
47+
[3]=>
48+
string(8) "opaque 3"
49+
[4]=>
50+
string(8) "opaque 4"
51+
[5]=>
52+
string(8) "opaque 5"
53+
[6]=>
54+
string(8) "opaque 6"
55+
[7]=>
56+
string(8) "opaque 7"
57+
[8]=>
58+
string(8) "opaque 8"
59+
[9]=>
60+
string(8) "opaque 9"
61+
}

tests/produce_opaque_noconf.phpt

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
--TEST--
2+
Produce with opaque, no conf
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
RD_KAFKA_BUILD_VERSION < 0x1000000 && die("skip librdkafka < 1.0.0");
7+
RD_KAFKA_BUILD_VERSION >= 0x1050000 && die("skip librdkafka >= 1.5.0");
8+
--FILE--
9+
<?php
10+
require __DIR__ . '/integration-tests-check.php';
11+
12+
$producer = new RdKafka\Producer();
13+
var_dump($producer->addBrokers(getenv('TEST_KAFKA_BROKERS')));
14+
15+
$topicName = sprintf("test_rdkafka_%s", uniqid());
16+
17+
$topic = $producer->newTopic($topicName);
18+
19+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
20+
echo "Failed to get metadata, is broker down?\n";
21+
}
22+
23+
for ($i = 0; $i < 10; $i++) {
24+
$topic->produce(0, 0, "message $i", null, "opaque $i");
25+
}
26+
27+
echo "Expect no leaks\n";
28+
--EXPECT--
29+
int(1)
30+
Expect no leaks

tests/produce_opaque_noflush.phpt

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
--TEST--
2+
Produce with opaque, no flush
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
RD_KAFKA_BUILD_VERSION < 0x1000000 && die("skip librdkafka < 1.0.0");
7+
--FILE--
8+
<?php
9+
require __DIR__ . '/integration-tests-check.php';
10+
11+
$conf = new RdKafka\Conf();
12+
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
13+
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
14+
}
15+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
16+
17+
$producer = new RdKafka\Producer($conf);
18+
19+
$topicName = sprintf("test_rdkafka_%s", uniqid());
20+
21+
$topic = $producer->newTopic($topicName);
22+
23+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
24+
echo "Failed to get metadata, is broker down?\n";
25+
}
26+
27+
for ($i = 0; $i < 10; $i++) {
28+
$topic->produce(0, 0, "message $i", null, "opaque $i");
29+
}
30+
31+
echo "Expect no leaks\n";
32+
--EXPECT--
33+
Expect no leaks
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
--TEST--
2+
Produce with opaque, no flush, with delivery callback
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
RD_KAFKA_BUILD_VERSION < 0x1000000 && die("skip librdkafka < 1.0.0");
7+
--FILE--
8+
<?php
9+
require __DIR__ . '/integration-tests-check.php';
10+
11+
$conf = new RdKafka\Conf();
12+
if (RD_KAFKA_VERSION >= 0x090000 && false !== getenv('TEST_KAFKA_BROKER_VERSION')) {
13+
$conf->set('broker.version.fallback', getenv('TEST_KAFKA_BROKER_VERSION'));
14+
}
15+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
16+
17+
$conf->setDrMsgCb(function ($rdkafka, $msg) {
18+
var_dump($rdkafka, $msg);
19+
});
20+
21+
$producer = new RdKafka\Producer($conf);
22+
23+
$topicName = sprintf("test_rdkafka_%s", uniqid());
24+
25+
$topic = $producer->newTopic($topicName);
26+
27+
if (!$producer->getMetadata(false, $topic, 2*1000)) {
28+
echo "Failed to get metadata, is broker down?\n";
29+
}
30+
31+
for ($i = 0; $i < 10; $i++) {
32+
$topic->produce(0, 0, "message $i", null, "opaque $i");
33+
}
34+
35+
echo "Expect no leaks\n";
36+
--EXPECT--
37+
Expect no leaks

0 commit comments

Comments
 (0)