Skip to content

Commit 7ca71fe

Browse files
authored
Add pausePartitions(), resumePartitions() on RdKfaka\Kafka, RdKafka\KafkaConsumer (#438)
1 parent ec302d2 commit 7ca71fe

7 files changed

+310
-6
lines changed

kafka_consumer.c

+82
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,86 @@ PHP_METHOD(RdKafka__KafkaConsumer, queryWatermarkOffsets)
798798
}
799799
/* }}} */
800800

801+
/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::pausePatitions(RdKafka\TopicPartition[] $topicPartitions)
802+
Pause consumption for the provided list of partitions. */
803+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_pause_partitions, 0, 0, 1)
804+
ZEND_ARG_INFO(0, topic_partitions)
805+
ZEND_END_ARG_INFO()
806+
807+
PHP_METHOD(RdKafka__KafkaConsumer, pausePartitions)
808+
{
809+
HashTable *htopars;
810+
rd_kafka_topic_partition_list_t *topars;
811+
rd_kafka_resp_err_t err;
812+
object_intern *intern;
813+
814+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
815+
return;
816+
}
817+
818+
intern = get_object(getThis());
819+
if (!intern) {
820+
return;
821+
}
822+
823+
topars = array_arg_to_kafka_topic_partition_list(1, htopars);
824+
if (!topars) {
825+
return;
826+
}
827+
828+
err = rd_kafka_pause_partitions(intern->rk, topars);
829+
830+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
831+
rd_kafka_topic_partition_list_destroy(topars);
832+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
833+
return;
834+
}
835+
836+
kafka_topic_partition_list_to_array(return_value, topars);
837+
rd_kafka_topic_partition_list_destroy(topars);
838+
}
839+
/* }}} */
840+
841+
/* {{{ proto RdKafka\TopicPartition[] RdKafka\KafkaConsumer::resumePatitions(RdKafka\TopicPartition[] $topicPartitions)
842+
Resume consumption for the provided list of partitions. */
843+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_consumer_resume_partitions, 0, 0, 1)
844+
ZEND_ARG_INFO(0, topic_partitions)
845+
ZEND_END_ARG_INFO()
846+
847+
PHP_METHOD(RdKafka__KafkaConsumer, resumePartitions)
848+
{
849+
HashTable *htopars;
850+
rd_kafka_topic_partition_list_t *topars;
851+
rd_kafka_resp_err_t err;
852+
object_intern *intern;
853+
854+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
855+
return;
856+
}
857+
858+
intern = get_object(getThis());
859+
if (!intern) {
860+
return;
861+
}
862+
863+
topars = array_arg_to_kafka_topic_partition_list(1, htopars);
864+
if (!topars) {
865+
return;
866+
}
867+
868+
err = rd_kafka_resume_partitions(intern->rk, topars);
869+
870+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
871+
rd_kafka_topic_partition_list_destroy(topars);
872+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
873+
return;
874+
}
875+
876+
kafka_topic_partition_list_to_array(return_value, topars);
877+
rd_kafka_topic_partition_list_destroy(topars);
878+
}
879+
/* }}} */
880+
801881
static const zend_function_entry fe[] = { /* {{{ */
802882
PHP_ME(RdKafka__KafkaConsumer, __construct, arginfo_kafka_kafka_consumer___construct, ZEND_ACC_PUBLIC)
803883
PHP_ME(RdKafka__KafkaConsumer, assign, arginfo_kafka_kafka_consumer_assign, ZEND_ACC_PUBLIC)
@@ -815,6 +895,8 @@ static const zend_function_entry fe[] = { /* {{{ */
815895
PHP_ME(RdKafka__KafkaConsumer, getOffsetPositions, arginfo_kafka_kafka_consumer_get_offset_positions, ZEND_ACC_PUBLIC)
816896
PHP_ME(RdKafka__KafkaConsumer, queryWatermarkOffsets, arginfo_kafka_kafka_consumer_query_watermark_offsets, ZEND_ACC_PUBLIC)
817897
PHP_ME(RdKafka__KafkaConsumer, offsetsForTimes, arginfo_kafka_kafka_consumer_offsets_for_times, ZEND_ACC_PUBLIC)
898+
PHP_ME(RdKafka__KafkaConsumer, pausePartitions, arginfo_kafka_kafka_consumer_pause_partitions, ZEND_ACC_PUBLIC)
899+
PHP_ME(RdKafka__KafkaConsumer, resumePartitions, arginfo_kafka_kafka_consumer_resume_partitions, ZEND_ACC_PUBLIC)
818900
PHP_FE_END
819901
}; /* }}} */
820902

package.xml

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
<file role="test" name="integration-tests-check.php"/>
8585
<file role="test" name="kafka_error_exception.phpt"/>
8686
<file role="test" name="message_headers.phpt"/>
87+
<file role="test" name="pause_resume.phpt"/>
8788
<file role="test" name="produce_consume.phpt"/>
8889
<file role="test" name="produce_consume_queue.phpt"/>
8990
<file role="test" name="produce_consume_transactional.phpt"/>

rdkafka.c

+82
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,86 @@ PHP_METHOD(RdKafka__Kafka, setLogger)
733733
}
734734
/* }}} */
735735

736+
/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::pausePatitions(RdKafka\TopicPartition[] $topicPartitions)
737+
Pause producing or consumption for the provided list of partitions. */
738+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_pause_partitions, 0, 0, 1)
739+
ZEND_ARG_INFO(0, topic_partitions)
740+
ZEND_END_ARG_INFO()
741+
742+
PHP_METHOD(RdKafka__Kafka, pausePartitions)
743+
{
744+
HashTable *htopars;
745+
rd_kafka_topic_partition_list_t *topars;
746+
rd_kafka_resp_err_t err;
747+
kafka_object *intern;
748+
749+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
750+
return;
751+
}
752+
753+
intern = get_kafka_object(getThis());
754+
if (!intern) {
755+
return;
756+
}
757+
758+
topars = array_arg_to_kafka_topic_partition_list(1, htopars);
759+
if (!topars) {
760+
return;
761+
}
762+
763+
err = rd_kafka_pause_partitions(intern->rk, topars);
764+
765+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
766+
rd_kafka_topic_partition_list_destroy(topars);
767+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
768+
return;
769+
}
770+
771+
kafka_topic_partition_list_to_array(return_value, topars);
772+
rd_kafka_topic_partition_list_destroy(topars);
773+
}
774+
/* }}} */
775+
776+
/* {{{ proto RdKafka\TopicPartition[] RdKafka\Kafka::resumePatitions(RdKafka\TopicPartition[] $topicPartitions)
777+
Resume producing consumption for the provided list of partitions. */
778+
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_kafka_resume_partitions, 0, 0, 1)
779+
ZEND_ARG_INFO(0, topic_partitions)
780+
ZEND_END_ARG_INFO()
781+
782+
PHP_METHOD(RdKafka__Kafka, resumePartitions)
783+
{
784+
HashTable *htopars;
785+
rd_kafka_topic_partition_list_t *topars;
786+
rd_kafka_resp_err_t err;
787+
kafka_object *intern;
788+
789+
if (zend_parse_parameters(ZEND_NUM_ARGS(), "h", &htopars) == FAILURE) {
790+
return;
791+
}
792+
793+
intern = get_kafka_object(getThis());
794+
if (!intern) {
795+
return;
796+
}
797+
798+
topars = array_arg_to_kafka_topic_partition_list(1, htopars);
799+
if (!topars) {
800+
return;
801+
}
802+
803+
err = rd_kafka_pause_partitions(intern->rk, topars);
804+
805+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
806+
rd_kafka_topic_partition_list_destroy(topars);
807+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
808+
return;
809+
}
810+
811+
kafka_topic_partition_list_to_array(return_value, topars);
812+
rd_kafka_topic_partition_list_destroy(topars);
813+
}
814+
/* }}} */
815+
736816
static const zend_function_entry kafka_fe[] = {
737817
PHP_ME(RdKafka__Kafka, addBrokers, arginfo_kafka_add_brokers, ZEND_ACC_PUBLIC)
738818
PHP_ME(RdKafka__Kafka, getMetadata, arginfo_kafka_get_metadata, ZEND_ACC_PUBLIC)
@@ -749,6 +829,8 @@ static const zend_function_entry kafka_fe[] = {
749829
PHP_ME(RdKafka__Kafka, setLogger, arginfo_kafka_set_logger, ZEND_ACC_PUBLIC | ZEND_ACC_DEPRECATED)
750830
PHP_ME(RdKafka__Kafka, queryWatermarkOffsets, arginfo_kafka_query_watermark_offsets, ZEND_ACC_PUBLIC)
751831
PHP_ME(RdKafka__Kafka, offsetsForTimes, arginfo_kafka_offsets_for_times, ZEND_ACC_PUBLIC)
832+
PHP_ME(RdKafka__Kafka, pausePartitions, arginfo_kafka_kafka_pause_partitions, ZEND_ACC_PUBLIC)
833+
PHP_ME(RdKafka__Kafka, resumePartitions, arginfo_kafka_kafka_resume_partitions, ZEND_ACC_PUBLIC)
752834
PHP_FE_END
753835
};
754836

tests/pause_resume.phpt

+108
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
--TEST--
2+
Pause and resume partitions
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/integration-tests-check.php';
9+
10+
$conf = new RdKafka\Conf();
11+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
12+
13+
$producer = new RdKafka\Producer($conf);
14+
15+
$topicName = sprintf("test_rdkafka_%s", uniqid());
16+
$topic = $producer->newTopic($topicName);
17+
18+
var_dump($producer->pausePartitions([
19+
new RdKafka\TopicPartition($topicName, 0),
20+
]));
21+
var_dump($producer->resumePartitions([
22+
new RdKafka\TopicPartition($topicName, 0),
23+
]));
24+
25+
$conf = new RdKafka\Conf();
26+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
27+
$conf->set('group.id', sprintf("test_rdkafka_group_%s", uniqid()));
28+
29+
$consumer = new RdKafka\KafkaConsumer($conf);
30+
$consumer->assign([
31+
new RdKafka\TopicPartition($topicName, 0),
32+
]);
33+
34+
var_dump($consumer->pausePartitions([
35+
new RdKafka\TopicPartition($topicName, 0),
36+
]));
37+
var_dump($consumer->resumePartitions([
38+
new RdKafka\TopicPartition($topicName, 0),
39+
]));
40+
var_dump($consumer->resumePartitions([
41+
new RdKafka\TopicPartition("", -1),
42+
]));
43+
--EXPECTF--
44+
array(1) {
45+
[0]=>
46+
object(RdKafka\TopicPartition)#5 (4) {
47+
["topic"]=>
48+
string(26) "test_rdkafka_%s"
49+
["partition"]=>
50+
int(0)
51+
["offset"]=>
52+
int(0)
53+
["err"]=>
54+
int(0)
55+
}
56+
}
57+
array(1) {
58+
[0]=>
59+
object(RdKafka\TopicPartition)#4 (4) {
60+
["topic"]=>
61+
string(26) "test_rdkafka_%s"
62+
["partition"]=>
63+
int(0)
64+
["offset"]=>
65+
int(0)
66+
["err"]=>
67+
int(0)
68+
}
69+
}
70+
array(1) {
71+
[0]=>
72+
object(RdKafka\TopicPartition)#6 (4) {
73+
["topic"]=>
74+
string(26) "test_rdkafka_%s"
75+
["partition"]=>
76+
int(0)
77+
["offset"]=>
78+
int(0)
79+
["err"]=>
80+
int(0)
81+
}
82+
}
83+
array(1) {
84+
[0]=>
85+
object(RdKafka\TopicPartition)#5 (4) {
86+
["topic"]=>
87+
string(26) "test_rdkafka_%s"
88+
["partition"]=>
89+
int(0)
90+
["offset"]=>
91+
int(0)
92+
["err"]=>
93+
int(0)
94+
}
95+
}
96+
array(1) {
97+
[0]=>
98+
object(RdKafka\TopicPartition)#6 (4) {
99+
["topic"]=>
100+
string(0) ""
101+
["partition"]=>
102+
int(-1)
103+
["offset"]=>
104+
int(0)
105+
["err"]=>
106+
int(-190)
107+
}
108+
}

tests/topic_partition.phpt

+6-2
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,15 @@ $topar
2626

2727
var_dump($topar);
2828
--EXPECT--
29-
object(RdKafka\TopicPartition)#1 (3) {
29+
object(RdKafka\TopicPartition)#1 (4) {
3030
["topic"]=>
3131
string(4) "test"
3232
["partition"]=>
3333
int(-1)
3434
["offset"]=>
3535
int(42)
36+
["err"]=>
37+
int(0)
3638
}
3739
array(3) {
3840
["topic"]=>
@@ -42,11 +44,13 @@ array(3) {
4244
["offset"]=>
4345
int(42)
4446
}
45-
object(RdKafka\TopicPartition)#1 (3) {
47+
object(RdKafka\TopicPartition)#1 (4) {
4648
["topic"]=>
4749
string(3) "foo"
4850
["partition"]=>
4951
int(123)
5052
["offset"]=>
5153
int(43)
54+
["err"]=>
55+
int(0)
5256
}

0 commit comments

Comments
 (0)