Skip to content

Commit a596680

Browse files
authored
[KIP-848] Added support for testing with new 'consumer' group protocol. (#1812)
1 parent 1c4dd96 commit a596680

34 files changed

+381
-207
lines changed

.semaphore/semaphore.yml

+13-5
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,20 @@ blocks:
186186
commands:
187187
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
188188
jobs:
189-
- name: Build
189+
- name: Build and Tests with 'classic' group protocol
190+
commands:
191+
- sem-version python 3.9
192+
# use a virtualenv
193+
- python3 -m venv _venv && source _venv/bin/activate
194+
- chmod u+r+x tools/source-package-verification.sh
195+
- tools/source-package-verification.sh
196+
- name: Build and Tests with 'consumer' group protocol
190197
commands:
191-
- sem-version python 3.8
198+
- sem-version python 3.9
192199
# use a virtualenv
193200
- python3 -m venv _venv && source _venv/bin/activate
194201
- chmod u+r+x tools/source-package-verification.sh
202+
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
195203
- tools/source-package-verification.sh
196204
- name: "Source package verification with Python 3 (Linux arm64)"
197205
dependencies: []
@@ -207,7 +215,7 @@ blocks:
207215
jobs:
208216
- name: Build
209217
commands:
210-
- sem-version python 3.8
218+
- sem-version python 3.9
211219
# use a virtualenv
212220
- python3 -m venv _venv && source _venv/bin/activate
213221
- chmod u+r+x tools/source-package-verification.sh
@@ -226,7 +234,7 @@ blocks:
226234
jobs:
227235
- name: Build
228236
commands:
229-
- sem-version python 3.8
237+
- sem-version python 3.9
230238
# use a virtualenv
231239
- python3 -m venv _venv && source _venv/bin/activate
232240
- chmod u+r+x tools/source-package-verification.sh
@@ -245,7 +253,7 @@ blocks:
245253
jobs:
246254
- name: Build
247255
commands:
248-
- sem-version python 3.8
256+
- sem-version python 3.9
249257
# use a virtualenv
250258
- python3 -m venv _venv && source _venv/bin/activate
251259
- chmod u+r+x tools/source-package-verification.sh

requirements/requirements-tests-install.txt

+2-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22
-r requirements-schemaregistry.txt
33
-r requirements-avro.txt
44
-r requirements-protobuf.txt
5-
-r requirements-json.txt
5+
-r requirements-json.txt
6+
tests/trivup/trivup-0.12.7.tar.gz

requirements/requirements-tests.txt

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,4 @@ urllib3 >= 2.0.0,<3; python_version > "3.7"
44
flake8
55
pytest
66
pytest-timeout
7-
requests-mock
8-
trivup>=0.8.3
7+
requests-mock

tests/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ A python3 env suitable for running tests:
1818

1919
$ python3 -m venv venv_test
2020
$ source venv_test/bin/activate
21-
$ python3 -m pip install .[tests]
21+
$ python3 -m pip install -r requirements/requirements-tests-install.txt
22+
$ python3 -m pip install .
2223

2324
When you're finished with it:
2425

tests/common/__init__.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2024 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import os
20+
from confluent_kafka import Consumer, DeserializingConsumer
21+
from confluent_kafka.avro import AvroConsumer
22+
23+
_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL'
24+
_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE'
25+
26+
27+
def _update_conf_group_protocol(conf=None):
28+
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
29+
conf['group.protocol'] = 'consumer'
30+
31+
32+
def _trivup_cluster_type_kraft():
33+
return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft'
34+
35+
36+
class TestUtils:
37+
@staticmethod
38+
def use_kraft():
39+
return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft()
40+
41+
@staticmethod
42+
def use_group_protocol_consumer():
43+
return _GROUP_PROTOCOL_ENV in os.environ and os.environ[_GROUP_PROTOCOL_ENV] == 'consumer'
44+
45+
46+
class TestConsumer(Consumer):
47+
def __init__(self, conf=None, **kwargs):
48+
_update_conf_group_protocol(conf)
49+
super(TestConsumer, self).__init__(conf, **kwargs)
50+
51+
52+
class TestDeserializingConsumer(DeserializingConsumer):
53+
def __init__(self, conf=None, **kwargs):
54+
_update_conf_group_protocol(conf)
55+
super(TestDeserializingConsumer, self).__init__(conf, **kwargs)
56+
57+
58+
class TestAvroConsumer(AvroConsumer):
59+
def __init__(self, conf=None, **kwargs):
60+
_update_conf_group_protocol(conf)
61+
super(TestAvroConsumer, self).__init__(conf, **kwargs)

tests/integration/admin/test_basic_operations.py

+20-11
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
import confluent_kafka
1716
import struct
1817
import time
19-
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState
18+
19+
from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError
2020
from confluent_kafka.admin import (NewPartitions, ConfigResource,
2121
AclBinding, AclBindingFilter, ResourceType,
2222
ResourcePatternType, AclOperation, AclPermissionType)
@@ -58,6 +58,8 @@ def verify_admin_acls(admin_client,
5858
for acl_binding, f in fs.items():
5959
f.result() # trigger exception if there was an error
6060

61+
time.sleep(1)
62+
6163
acl_binding_filter1 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.ANY,
6264
None, None, AclOperation.ANY, AclPermissionType.ANY)
6365
acl_binding_filter2 = AclBindingFilter(ResourceType.ANY, None, ResourcePatternType.PREFIXED,
@@ -83,6 +85,8 @@ def verify_admin_acls(admin_client,
8385
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
8486
expected_acl_bindings)
8587

88+
time.sleep(1)
89+
8690
#
8791
# Delete the ACLs with TOPIC and GROUP
8892
#
@@ -94,6 +98,9 @@ def verify_admin_acls(admin_client,
9498
assert deleted_acl_bindings == expected, \
9599
"Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings,
96100
expected)
101+
102+
time.sleep(1)
103+
97104
#
98105
# All the ACLs should have been deleted
99106
#
@@ -201,14 +208,14 @@ def test_basic_operations(kafka_cluster):
201208
# Second iteration: create topic.
202209
#
203210
for validate in (True, False):
204-
our_topic = kafka_cluster.create_topic(topic_prefix,
205-
{
206-
"num_partitions": num_partitions,
207-
"config": topic_config,
208-
"replication_factor": 1,
209-
},
210-
validate_only=validate
211-
)
211+
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
212+
{
213+
"num_partitions": num_partitions,
214+
"config": topic_config,
215+
"replication_factor": 1,
216+
},
217+
validate_only=validate
218+
)
212219

213220
admin_client = kafka_cluster.admin()
214221

@@ -270,7 +277,7 @@ def consume_messages(group_id, num_messages=None):
270277
print('Read all the required messages: exiting')
271278
break
272279
except ConsumeError as e:
273-
if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF:
280+
if msg is not None and e.code == KafkaError._PARTITION_EOF:
274281
print('Reached end of %s [%d] at offset %d' % (
275282
msg.topic(), msg.partition(), msg.offset()))
276283
eof_reached[(msg.topic(), msg.partition())] = True
@@ -345,6 +352,8 @@ def verify_config(expconfig, configs):
345352
fs = admin_client.alter_configs([resource])
346353
fs[resource].result() # will raise exception on failure
347354

355+
time.sleep(1)
356+
348357
#
349358
# Read the config back again and verify.
350359
#

tests/integration/admin/test_delete_records.py

+16-15
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ def test_delete_records(kafka_cluster):
2525
admin_client = kafka_cluster.admin()
2626

2727
# Create a topic with a single partition
28-
topic = kafka_cluster.create_topic("test-del-records",
29-
{
30-
"num_partitions": 1,
31-
"replication_factor": 1,
32-
})
28+
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
29+
{
30+
"num_partitions": 1,
31+
"replication_factor": 1,
32+
})
3333

3434
# Create Producer instance
3535
p = kafka_cluster.producer()
@@ -73,16 +73,17 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
7373
admin_client = kafka_cluster.admin()
7474
num_partitions = 3
7575
# Create two topics with a single partition
76-
topic = kafka_cluster.create_topic("test-del-records",
77-
{
78-
"num_partitions": num_partitions,
79-
"replication_factor": 1,
80-
})
81-
topic2 = kafka_cluster.create_topic("test-del-records2",
82-
{
83-
"num_partitions": num_partitions,
84-
"replication_factor": 1,
85-
})
76+
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
77+
{
78+
"num_partitions": num_partitions,
79+
"replication_factor": 1,
80+
})
81+
topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2",
82+
{
83+
"num_partitions": num_partitions,
84+
"replication_factor": 1,
85+
})
86+
8687
topics = [topic, topic2]
8788
partitions = list(range(num_partitions))
8889
# Create Producer instance

tests/integration/admin/test_describe_operations.py

+21-9
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
import time
1617
import pytest
18+
1719
from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType,
1820
ResourcePatternType, AclOperation, AclPermissionType)
1921
from confluent_kafka.error import ConsumeError
2022
from confluent_kafka import ConsumerGroupState, TopicCollection
2123

24+
from tests.common import TestUtils
25+
2226
topic_prefix = "test-topic"
2327

2428

@@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs):
8286

8387
def create_acls(admin_client, acl_bindings):
8488
perform_admin_operation_sync(admin_client.create_acls, acl_bindings)
89+
time.sleep(1)
8590

8691

8792
def delete_acls(admin_client, acl_binding_filters):
8893
perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters)
94+
time.sleep(1)
8995

9096

9197
def verify_provided_describe_for_authorized_operations(
@@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations(
115121
acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL,
116122
"User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW)
117123
create_acls(admin_client, [acl_binding])
124+
time.sleep(1)
118125

119126
# Check with updated authorized operations
120127
desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs)
@@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations(
126133
acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY,
127134
None, None, AclOperation.ANY, AclPermissionType.ANY)
128135
delete_acls(admin_client, [acl_binding_filter])
136+
time.sleep(1)
129137
return desc
130138

131139

@@ -196,20 +204,24 @@ def test_describe_operations(sasl_cluster):
196204

197205
# Create Topic
198206
topic_config = {"compression.type": "gzip"}
199-
our_topic = sasl_cluster.create_topic(topic_prefix,
200-
{
201-
"num_partitions": 1,
202-
"config": topic_config,
203-
"replication_factor": 1,
204-
},
205-
validate_only=False
206-
)
207+
our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix,
208+
{
209+
"num_partitions": 1,
210+
"config": topic_config,
211+
"replication_factor": 1,
212+
},
213+
validate_only=False
214+
)
207215

208216
# Verify Authorized Operations in Describe Topics
209217
verify_describe_topics(admin_client, our_topic)
210218

211219
# Verify Authorized Operations in Describe Groups
212-
verify_describe_groups(sasl_cluster, admin_client, our_topic)
220+
# Skip this test if using group protocol `consumer`
221+
# as there is new RPC for describe_groups() in
222+
# group protocol `consumer` case.
223+
if not TestUtils.use_group_protocol_consumer():
224+
verify_describe_groups(sasl_cluster, admin_client, our_topic)
213225

214226
# Delete Topic
215227
perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10)

0 commit comments

Comments
 (0)