-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Introduce share consumer factories for Kafka Queues (Early Access) #3923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Introduce share consumer factories for Kafka Queues (Early Access) #3923
Conversation
- Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka - Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration. - Tests to verify the share consumer behavior Related to spring-projects#3875 spring-projects#3875 Signed-off-by: Soby Chacko <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool!
Just a couple nit-picks.
Thanks
* @return the share consumer. | ||
*/ | ||
ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientIdPrefix, | ||
@Nullable String clientIdSuffix); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have some explanation in JavaDocs why we just cannot use clientId
?
I know this is something existing from the ConsumerFactory
, but still not clear from there why append
or override
with prefix and suffix concatenation instead of just simple clientId
parameter.
Thanks
* Return whether deserializers are configured automatically. | ||
* @return true if deserializers are configured automatically | ||
*/ | ||
public boolean isConfigureDeserializers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this method to be exposed?
@Override | ||
public void addListener(int index, Listener<K, V> listener) { | ||
Assert.notNull(listener, "'listener' cannot be null"); | ||
if (index >= this.listeners.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's explain this logic in the JavaDocs!
Some one might complain why we don't have removeListener(int index)
contract if we have adding to the specific position.
topics = {"embedded-share-test"}, partitions = 1, | ||
brokerProperties = { | ||
"unstable.api.versions.enable=true", | ||
"group.coordinator.rebalance.protocols=classic,consumer,share", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to enable all the protocols if we talk in this test suite only about share
?
"share.coordinator.state.topic.min.isr=1" | ||
}) | ||
@SpringJUnitConfig | ||
@DirtiesContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need Spring environment if there is no @Configuration
for this test suite?
I even surprised that it works at all without @Configuration
😄
consumerProps.put("key.deserializer", StringDeserializer.class); | ||
consumerProps.put("value.deserializer", StringDeserializer.class); | ||
consumerProps.put("group.id", groupId); | ||
consumerProps.put("share.auto.offset.reset", "earliest"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, you have just explained that we have to use an Admin API to reset share group.
Then why do we have this property over here for the consumer config?
Preliminary set of changes to support Kafka queueus introduced via KIP-932 (Kafka Queue) for early access in Apache Kafka 4.0.0. See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
Add ShareConsumerFactory interface and DefaultShareConsumerFactory implementation as the initial building blocks for supporting Kafka Queues (KIP-932) in Spring for Apache Kafka 4.0.x. This factory and the implementation provide a flexible API for creating share consumers, and are designed as the foundation for further queue integration.
Tests to verify the share consumer behavior
Related to #3875
#3875