Skip to content

Commit 66cd8f4

Browse files
author
Eduardo Poleo
committed
ISSUE-525/764: Local @topics variable does not contain all the subscription information
The leader consumer thread needs to have knowledge about all siblings consumer subscriptions in order to be perform the correct partition assignments. For consumers with different subscription this is not possible through the local `@topics`. A way to solve this is by obtaining this info from the cluster itself through the `@members` var.
1 parent 9373774 commit 66cd8f4

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

lib/kafka/consumer_group.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,14 @@ def synchronize
190190
if group_leader?
191191
@logger.info "Chosen as leader of group `#{@group_id}`"
192192

193+
topics = Set.new
194+
@members.each do |_member, metadata|
195+
metadata.topics.each { |t| topics.add(t) }
196+
end
197+
193198
group_assignment = @assignor.assign(
194199
members: @members,
195-
topics: @topics,
200+
topics: topics,
196201
)
197202
end
198203

spec/functional/consumer_group_spec.rb

+62
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,68 @@ def call(cluster:, members:, partitions:)
518518
expect(received_messages.values.map(&:count)).to match_array [messages.count / 3, messages.count / 3 * 2]
519519
end
520520

521+
example "subscribing to different topics while in the same consumer group" do
522+
topic1 = create_random_topic(num_partitions: 1)
523+
topic2 = create_random_topic(num_partitions: 1)
524+
messages = (1..500).to_a
525+
526+
begin
527+
kafka = Kafka.new(kafka_brokers, client_id: "test")
528+
producer = kafka.producer
529+
530+
messages[0..249].each do |i|
531+
producer.produce(i.to_s, topic: topic1, partition: 0)
532+
end
533+
534+
messages[250..500].each do |i|
535+
producer.produce(i.to_s, topic: topic2, partition: 0)
536+
end
537+
538+
producer.deliver_messages
539+
end
540+
541+
group_id = "test#{rand(1000)}"
542+
543+
mutex = Mutex.new
544+
received_messages = []
545+
546+
assignment_strategy_class = Kafka::MultiSubscriptionRoundRobinAssignmentStrategy
547+
548+
consumers = [topic1, topic2].map do |topic|
549+
assignment_strategy = assignment_strategy_class.new
550+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
551+
consumer = kafka.consumer(
552+
group_id: group_id,
553+
offset_retention_time: offset_retention_time,
554+
assignment_strategy: assignment_strategy
555+
)
556+
consumer.subscribe(topic)
557+
consumer
558+
end
559+
560+
threads = consumers.map do |consumer|
561+
t = Thread.new do
562+
consumer.each_message do |message|
563+
mutex.synchronize do
564+
received_messages << message
565+
566+
if received_messages.count == messages.count
567+
consumers.each(&:stop)
568+
end
569+
end
570+
end
571+
end
572+
573+
t.abort_on_exception = true
574+
575+
t
576+
end
577+
578+
threads.each(&:join)
579+
580+
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
581+
end
582+
521583
def wait_until(timeout:)
522584
Timeout.timeout(timeout) do
523585
sleep 0.5 until yield

0 commit comments

Comments
 (0)