Skip to content

Commit d6c7c9c

Browse files
authored
Merge pull request #818 from lmduc/master
[Feat] Allow consumers to refresh the topic lists
2 parents da02c84 + 58ea79a commit d6c7c9c

File tree

4 files changed

+106
-10
lines changed

4 files changed

+106
-10
lines changed

lib/kafka/client.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
336336
# @param fetcher_max_queue_size [Integer] max number of items in the fetch queue that
337337
# are stored for further processing. Note, that each item in the queue represents a
338338
# response from a single broker.
339+
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
340+
# If it is 0, the topic list won't be refreshed (default)
341+
# If it is n (n > 0), the topic list will be refreshed every n seconds
339342
# @return [Consumer]
340343
def consumer(
341344
group_id:,
@@ -345,7 +348,8 @@ def consumer(
345348
offset_commit_threshold: 0,
346349
heartbeat_interval: 10,
347350
offset_retention_time: nil,
348-
fetcher_max_queue_size: 100
351+
fetcher_max_queue_size: 100,
352+
refresh_topic_interval: 0
349353
)
350354
cluster = initialize_cluster
351355

@@ -399,6 +403,7 @@ def consumer(
399403
fetcher: fetcher,
400404
session_timeout: session_timeout,
401405
heartbeat: heartbeat,
406+
refresh_topic_interval: refresh_topic_interval
402407
)
403408
end
404409

lib/kafka/consumer.rb

+53-9
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ module Kafka
4444
#
4545
class Consumer
4646

47-
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
47+
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
4848
@cluster = cluster
4949
@logger = TaggedLogger.new(logger)
5050
@instrumenter = instrumenter
@@ -53,6 +53,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
5353
@session_timeout = session_timeout
5454
@fetcher = fetcher
5555
@heartbeat = heartbeat
56+
@refresh_topic_interval = refresh_topic_interval
5657

5758
@pauses = Hash.new {|h, k|
5859
h[k] = Hash.new {|h2, k2|
@@ -73,6 +74,15 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
7374
# when user commits message other than last in a batch, this would make ruby-kafka refetch
7475
# some already consumed messages
7576
@current_offsets = Hash.new { |h, k| h[k] = {} }
77+
78+
# Map storing subscribed topics with their configuration
79+
@subscribed_topics = Concurrent::Map.new
80+
81+
# Set storing topics that matched topics in @subscribed_topics
82+
@matched_topics = Set.new
83+
84+
# Whether join_group must be executed again because new topics are added
85+
@join_group_for_new_topics = false
7686
end
7787

7888
# Subscribes the consumer to a topic.
@@ -97,13 +107,12 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
97107
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
98108
default_offset ||= start_from_beginning ? :earliest : :latest
99109

100-
if topic_or_regex.is_a?(Regexp)
101-
cluster_topics.select { |topic| topic =~ topic_or_regex }.each do |topic|
102-
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
103-
end
104-
else
105-
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
106-
end
110+
@subscribed_topics[topic_or_regex] = {
111+
default_offset: default_offset,
112+
start_from_beginning: start_from_beginning,
113+
max_bytes_per_partition: max_bytes_per_partition
114+
}
115+
scan_for_subscribing
107116

108117
nil
109118
end
@@ -402,6 +411,7 @@ def consumer_loop
402411
while running?
403412
begin
404413
@instrumenter.instrument("loop.consumer") do
414+
refresh_topic_list_if_enabled
405415
yield
406416
end
407417
rescue HeartbeatError
@@ -453,6 +463,8 @@ def make_final_offsets_commit!(attempts = 3)
453463
end
454464

455465
def join_group
466+
@join_group_for_new_topics = false
467+
456468
old_generation_id = @group.generation_id
457469

458470
@group.join
@@ -514,11 +526,19 @@ def resume_paused_partitions!
514526
end
515527
end
516528

529+
def refresh_topic_list_if_enabled
530+
return if @refresh_topic_interval <= 0
531+
return if @refreshed_at && @refreshed_at + @refresh_topic_interval > Time.now
532+
533+
scan_for_subscribing
534+
@refreshed_at = Time.now
535+
end
536+
517537
def fetch_batches
518538
# Return early if the consumer has been stopped.
519539
return [] if shutting_down?
520540

521-
join_group unless @group.member?
541+
join_group if !@group.member? || @join_group_for_new_topics
522542

523543
trigger_heartbeat
524544

@@ -572,10 +592,34 @@ def clear_current_offsets(excluding: {})
572592
end
573593
end
574594

595+
def scan_for_subscribing
596+
@subscribed_topics.each do |topic_or_regex, config|
597+
default_offset = config.fetch(:default_offset)
598+
start_from_beginning = config.fetch(:start_from_beginning)
599+
max_bytes_per_partition = config.fetch(:max_bytes_per_partition)
600+
if topic_or_regex.is_a?(Regexp)
601+
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
602+
else
603+
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
604+
end
605+
end
606+
end
607+
608+
def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_bytes_per_partition)
609+
cluster_topics.select { |topic| topic =~ topic_regex }.each do |topic|
610+
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
611+
end
612+
end
613+
575614
def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
615+
return if @matched_topics.include?(topic)
616+
@matched_topics.add(topic)
617+
@join_group_for_new_topics = true
618+
576619
@group.subscribe(topic)
577620
@offset_manager.set_default_offset(topic, default_offset)
578621
@fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition)
622+
@cluster.mark_as_stale!
579623
end
580624

581625
def cluster_topics

spec/consumer_spec.rb

+6
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,14 @@
137137
allow(cluster).to receive(:add_target_topics)
138138
allow(cluster).to receive(:disconnect)
139139
allow(cluster).to receive(:refresh_metadata_if_necessary!)
140+
allow(cluster).to receive(:mark_as_stale!)
140141

141142
allow(offset_manager).to receive(:commit_offsets)
142143
allow(offset_manager).to receive(:commit_offsets_if_necessary)
143144
allow(offset_manager).to receive(:set_default_offset)
144145
allow(offset_manager).to receive(:mark_as_processed)
145146
allow(offset_manager).to receive(:next_offset_for) { 42 }
147+
allow(offset_manager).to receive(:clear_offsets)
146148

147149
allow(group).to receive(:subscribe)
148150
allow(group).to receive(:group_id)
@@ -151,11 +153,15 @@
151153
allow(group).to receive(:subscribed_partitions) { assigned_partitions }
152154
allow(group).to receive(:assigned_to?) { false }
153155
allow(group).to receive(:assigned_to?).with('greetings', 0) { true }
156+
allow(group).to receive(:generation_id) { 1 }
157+
allow(group).to receive(:join)
158+
allow(group).to receive(:assigned_partitions) { [] }
154159

155160
allow(heartbeat).to receive(:trigger)
156161

157162
allow(fetcher).to receive(:data?) { fetched_batches.any? }
158163
allow(fetcher).to receive(:poll) { [:batches, fetched_batches] }
164+
allow(fetcher).to receive(:reset)
159165

160166
consumer.subscribe("greetings")
161167
end

spec/functional/consumer_group_spec.rb

+41
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,47 @@
113113
expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
114114
end
115115

116+
example "subscribing to multiple topics using regex and enable refreshing the topic list" do
117+
topic_a = generate_topic_name
118+
topic_b = generate_topic_name
119+
120+
messages_a = (1..500).to_a
121+
messages_b = (501..1000).to_a
122+
messages = messages_a + messages_b
123+
124+
producer = Kafka.new(kafka_brokers, client_id: "test").producer
125+
126+
messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) }
127+
producer.deliver_messages
128+
129+
group_id = "test#{rand(1000)}"
130+
131+
received_messages = []
132+
133+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
134+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, refresh_topic_interval: 1)
135+
consumer.subscribe(/#{topic_a}|#{topic_b}/)
136+
137+
thread = Thread.new do
138+
consumer.each_message do |message|
139+
received_messages << message
140+
141+
if received_messages.count == messages.count
142+
consumer.stop
143+
end
144+
end
145+
end
146+
thread.abort_on_exception = true
147+
148+
sleep 1
149+
messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) }
150+
producer.deliver_messages
151+
152+
thread.join
153+
154+
expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
155+
end
156+
116157
example "consuming messages from a topic that's being written to" do
117158
num_partitions = 3
118159
topic = create_random_topic(num_partitions: num_partitions)

0 commit comments

Comments
 (0)