-
Notifications
You must be signed in to change notification settings - Fork 339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISSUE-525 / 764: ruby-kafka does not support different topic subscriptions in the same consumer group #903
ISSUE-525 / 764: ruby-kafka does not support different topic subscriptions in the same consumer group #903
Conversation
4c68314
to
e3b681c
Compare
Can't wait!! |
e3b681c
to
66cd8f4
Compare
Would there be a reason to not always use the new assignment strategy? I'd prefer to have fewer options to confuse people. |
lib/kafka/multi_subscription_round_robin_assignment_strategy.rb
Outdated
Show resolved
Hide resolved
Looks good! I'd like to see if we could perhaps merge this with the existing strategy rather than adding a new one, if this is purely additional in terms of functionality and reliability. |
I just wanted to be cautious since I'm not all too familiarized with the project but yes, I can merge this with the original strategy. It should be easy enough since this already passes all the original strategy unit tests. Thanks for taking time to review. |
c8b2266
to
d82f4b7
Compare
Hey @dasch ! I incorporated your suggestions. There are a few failing tests but I have a strong suspicious that these are flaky tests. There are all of the type of:
I've pushed already 3 times and I'm always getting similar errors but for different specs. Also I find it hard to believe that my changes would break the connection to the broker. |
Yeah, unfortunately the test suite is really flake – I would love to have someone improve that :D |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good! Can you add an entry to the changelog?
The existing assignment strategy assumes identical subscriptions among consumers within the same consumer group. We need a more general implementation that accounts for different topic subscriptions to be able to perform correct assignments.The new implementation presented here is heavily inspired by the Kafka java client RoundRobinAssignor https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
…iption information The leader consumer thread needs to have knowledge about all siblings consumer subscriptions in order to be perform the correct partition assignments. For consumers with different subscription this is not possible through the local `@topics`. A way to solve this is by obtaining this info from the cluster itself through the `@members` var.
d82f4b7
to
9ddebb0
Compare
9ddebb0
to
fa5e4ad
Compare
Great! just added the entry to the changelog. Thank you so much! |
Awesome! I'd like to cut an alpha release and have you test that out in production, if that's okay? |
Great yes of course, we will! |
@dasch I'll ping you on the dev slack channel once we've ascertained that everything is looking good on our system with the new changes. |
Great! |
There are 2 issues that currently prevent supporting this feature:
RoundRobinAssignmentStrategy
assumes all subscriptions are equalWe need a more generalized algorithm to perform assignments now that we have different subscriptions across the consumer group. This PR introduces a new additional strategy heavily inspired in the java kafka client RoundRobinAssignor that properly handle this case.
If different consumers were to have different topic subscription we can't rely on the in-memory
@topics
variable to produce correct assignments. Instead, we can leverage the subscription information provided by the cluster, and stored in@members
, to determine the correct consumer subscriptions for a particular consumer group. Once we have the correct list of topics the downstream code in the assignor can then fetch the corresponding partitions from the cluster in preparation for the assignments.