Skip to content

Commit b304004

Browse files
committed
[Chore] join_group need to be executed again when a new topic is added
1 parent 5891055 commit b304004

File tree

2 files changed

+8
-3
lines changed

2 files changed

+8
-3
lines changed

lib/kafka/consumer.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
7777
# Hash storing topics that are already being subscribed
7878
# When subcribing to a new topic, if it's already being subscribed before, skip it
7979
@subscribed_topics = Set.new
80+
81+
# Whether join_group must be executed again because new topics are added
82+
@join_group_for_new_topics = false
8083
end
8184

8285
# Subscribes the consumer to a topic.
@@ -489,6 +492,7 @@ def join_group
489492
end
490493

491494
@fetcher.reset
495+
@join_group_for_new_topics = false
492496

493497
@group.assigned_partitions.each do |topic, partitions|
494498
partitions.each do |partition|
@@ -534,7 +538,7 @@ def fetch_batches
534538
# Return early if the consumer has been stopped.
535539
return [] if shutting_down?
536540

537-
join_group unless @group.member?
541+
join_group if !@group.member? || @join_group_for_new_topics
538542

539543
trigger_heartbeat
540544

@@ -597,6 +601,7 @@ def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_by
597601
def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
598602
return if @subscribed_topics.include?(topic)
599603
@subscribed_topics.add(topic)
604+
@join_group_for_new_topics = true
600605

601606
@group.subscribe(topic)
602607
@offset_manager.set_default_offset(topic, default_offset)

spec/functional/consumer_group_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@
117117
topic_a = generate_topic_name
118118
topic_b = generate_topic_name
119119

120-
messages_a = (1..5).to_a
121-
messages_b = (6..10).to_a
120+
messages_a = (1..500).to_a
121+
messages_b = (501..1000).to_a
122122
messages = messages_a + messages_b
123123

124124
producer = Kafka.new(kafka_brokers, client_id: "test").producer

0 commit comments

Comments
 (0)