Skip to content

Commit 7aadb07

Browse files
authored
Merge pull request #903 from eduardopoleoflipp/multiple_topic_subscriptions
ISSUE-525 / 764: ruby-kafka does not support different topic subscriptions in the same consumer group
2 parents 65e4af5 + fa5e4ad commit 7aadb07

5 files changed

+323
-16
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Changes and additions to the library will be listed here.
99
- Add support for `murmur2` based partitioning.
1010
- Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877).
1111
- Handle SyncGroup responses with a non-zero error and no assignments (#896).
12+
- Add support for non-identical topic subscriptions within the same consumer group (#525 / #764).
1213

1314
## 1.3.0
1415

lib/kafka/consumer_group.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,14 @@ def synchronize
189189
if group_leader?
190190
@logger.info "Chosen as leader of group `#{@group_id}`"
191191

192+
topics = Set.new
193+
@members.each do |_member, metadata|
194+
metadata.topics.each { |t| topics.add(t) }
195+
end
196+
192197
group_assignment = @assignor.assign(
193198
members: @members,
194-
topics: @topics,
199+
topics: topics,
195200
)
196201
end
197202

lib/kafka/round_robin_assignment_strategy.rb

+28-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
# frozen_string_literal: true
2-
31
module Kafka
42

5-
# A consumer group partition assignment strategy that assigns partitions to
6-
# consumers in a round-robin fashion.
3+
# A round robin assignment strategy inpired on the
4+
# original java client round robin assignor. It's capable
5+
# of handling identical as well as different topic subscriptions
6+
# accross the same consumer group.
77
class RoundRobinAssignmentStrategy
88
def protocol_name
99
"roundrobin"
@@ -19,13 +19,34 @@ def protocol_name
1919
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
2020
# mapping member ids to partitions.
2121
def call(cluster:, members:, partitions:)
22-
member_ids = members.keys
2322
partitions_per_member = Hash.new {|h, k| h[k] = [] }
24-
partitions.each_with_index do |partition, index|
25-
partitions_per_member[member_ids[index % member_ids.count]] << partition
23+
relevant_partitions = valid_sorted_partitions(members, partitions)
24+
members_ids = members.keys
25+
iterator = (0...members.size).cycle
26+
idx = iterator.next
27+
28+
relevant_partitions.each do |partition|
29+
topic = partition.topic
30+
31+
while !members[members_ids[idx]].topics.include?(topic)
32+
idx = iterator.next
33+
end
34+
35+
partitions_per_member[members_ids[idx]] << partition
36+
idx = iterator.next
2637
end
2738

2839
partitions_per_member
2940
end
41+
42+
def valid_sorted_partitions(members, partitions)
43+
subscribed_topics = members.map do |id, metadata|
44+
metadata && metadata.topics
45+
end.flatten.compact
46+
47+
partitions
48+
.select { |partition| subscribed_topics.include?(partition.topic) }
49+
.sort_by { |partition| partition.topic }
50+
end
3051
end
3152
end

spec/functional/consumer_group_spec.rb

+62
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,68 @@ def call(cluster:, members:, partitions:)
518518
expect(received_messages.values.map(&:count)).to match_array [messages.count / 3, messages.count / 3 * 2]
519519
end
520520

521+
example "subscribing to different topics while in the same consumer group" do
522+
topic1 = create_random_topic(num_partitions: 1)
523+
topic2 = create_random_topic(num_partitions: 1)
524+
messages = (1..500).to_a
525+
526+
begin
527+
kafka = Kafka.new(kafka_brokers, client_id: "test")
528+
producer = kafka.producer
529+
530+
messages[0..249].each do |i|
531+
producer.produce(i.to_s, topic: topic1, partition: 0)
532+
end
533+
534+
messages[250..500].each do |i|
535+
producer.produce(i.to_s, topic: topic2, partition: 0)
536+
end
537+
538+
producer.deliver_messages
539+
end
540+
541+
group_id = "test#{rand(1000)}"
542+
543+
mutex = Mutex.new
544+
received_messages = []
545+
546+
assignment_strategy_class = Kafka::RoundRobinAssignmentStrategy
547+
548+
consumers = [topic1, topic2].map do |topic|
549+
assignment_strategy = assignment_strategy_class.new
550+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
551+
consumer = kafka.consumer(
552+
group_id: group_id,
553+
offset_retention_time: offset_retention_time,
554+
assignment_strategy: assignment_strategy
555+
)
556+
consumer.subscribe(topic)
557+
consumer
558+
end
559+
560+
threads = consumers.map do |consumer|
561+
t = Thread.new do
562+
consumer.each_message do |message|
563+
mutex.synchronize do
564+
received_messages << message
565+
566+
if received_messages.count == messages.count
567+
consumers.each(&:stop)
568+
end
569+
end
570+
end
571+
end
572+
573+
t.abort_on_exception = true
574+
575+
t
576+
end
577+
578+
threads.each(&:join)
579+
580+
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
581+
end
582+
521583
def wait_until(timeout:)
522584
Timeout.timeout(timeout) do
523585
sleep 0.5 until yield

0 commit comments

Comments
 (0)