diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 6c4d298c9..4ea7bcfb2 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -336,6 +336,9 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: # @param fetcher_max_queue_size [Integer] max number of items in the fetch queue that # are stored for further processing. Note, that each item in the queue represents a # response from a single broker. + # @param refresh_topic_interval [Integer] interval of refreshing the topic list. + # If it is 0, the topic list won't be refreshed (default) + # If it is n (n > 0), the topic list will be refreshed every n seconds # @return [Consumer] def consumer( group_id:, @@ -345,7 +348,8 @@ def consumer( offset_commit_threshold: 0, heartbeat_interval: 10, offset_retention_time: nil, - fetcher_max_queue_size: 100 + fetcher_max_queue_size: 100, + refresh_topic_interval: 0 ) cluster = initialize_cluster @@ -399,6 +403,7 @@ def consumer( fetcher: fetcher, session_timeout: session_timeout, heartbeat: heartbeat, + refresh_topic_interval: refresh_topic_interval ) end diff --git a/lib/kafka/consumer.rb b/lib/kafka/consumer.rb index 6ddbaf798..9e02f627c 100644 --- a/lib/kafka/consumer.rb +++ b/lib/kafka/consumer.rb @@ -44,7 +44,7 @@ module Kafka # class Consumer - def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:) + def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0) @cluster = cluster @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @@ -53,6 +53,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage @session_timeout = session_timeout @fetcher = fetcher @heartbeat = heartbeat + @refresh_topic_interval = refresh_topic_interval @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| @@ -73,6 +74,15 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage # when user commits message other than last in a batch, this would make ruby-kafka refetch # some already consumed messages @current_offsets = Hash.new { |h, k| h[k] = {} } + + # Map storing subscribed topics with their configuration + @subscribed_topics = Concurrent::Map.new + + # Set storing topics that matched topics in @subscribed_topics + @matched_topics = Set.new + + # Whether join_group must be executed again because new topics are added + @join_group_for_new_topics = false end # Subscribes the consumer to a topic. @@ -97,13 +107,12 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest - if topic_or_regex.is_a?(Regexp) - cluster_topics.select { |topic| topic =~ topic_or_regex }.each do |topic| - subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition) - end - else - subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition) - end + @subscribed_topics[topic_or_regex] = { + default_offset: default_offset, + start_from_beginning: start_from_beginning, + max_bytes_per_partition: max_bytes_per_partition + } + scan_for_subscribing nil end @@ -402,6 +411,7 @@ def consumer_loop while running? begin @instrumenter.instrument("loop.consumer") do + refresh_topic_list_if_enabled yield end rescue HeartbeatError @@ -453,6 +463,8 @@ def make_final_offsets_commit!(attempts = 3) end def join_group + @join_group_for_new_topics = false + old_generation_id = @group.generation_id @group.join @@ -514,11 +526,19 @@ def resume_paused_partitions! end end + def refresh_topic_list_if_enabled + return if @refresh_topic_interval <= 0 + return if @refreshed_at && @refreshed_at + @refresh_topic_interval > Time.now + + scan_for_subscribing + @refreshed_at = Time.now + end + def fetch_batches # Return early if the consumer has been stopped. return [] if shutting_down? - join_group unless @group.member? + join_group if !@group.member? || @join_group_for_new_topics trigger_heartbeat @@ -572,10 +592,34 @@ def clear_current_offsets(excluding: {}) end end + def scan_for_subscribing + @subscribed_topics.each do |topic_or_regex, config| + default_offset = config.fetch(:default_offset) + start_from_beginning = config.fetch(:start_from_beginning) + max_bytes_per_partition = config.fetch(:max_bytes_per_partition) + if topic_or_regex.is_a?(Regexp) + subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition) + else + subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition) + end + end + end + + def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_bytes_per_partition) + cluster_topics.select { |topic| topic =~ topic_regex }.each do |topic| + subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition) + end + end + def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition) + return if @matched_topics.include?(topic) + @matched_topics.add(topic) + @join_group_for_new_topics = true + @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) @fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition) + @cluster.mark_as_stale! end def cluster_topics diff --git a/spec/consumer_spec.rb b/spec/consumer_spec.rb index b7475aa2a..6dc780239 100644 --- a/spec/consumer_spec.rb +++ b/spec/consumer_spec.rb @@ -137,12 +137,14 @@ allow(cluster).to receive(:add_target_topics) allow(cluster).to receive(:disconnect) allow(cluster).to receive(:refresh_metadata_if_necessary!) + allow(cluster).to receive(:mark_as_stale!) allow(offset_manager).to receive(:commit_offsets) allow(offset_manager).to receive(:commit_offsets_if_necessary) allow(offset_manager).to receive(:set_default_offset) allow(offset_manager).to receive(:mark_as_processed) allow(offset_manager).to receive(:next_offset_for) { 42 } + allow(offset_manager).to receive(:clear_offsets) allow(group).to receive(:subscribe) allow(group).to receive(:group_id) @@ -151,11 +153,15 @@ allow(group).to receive(:subscribed_partitions) { assigned_partitions } allow(group).to receive(:assigned_to?) { false } allow(group).to receive(:assigned_to?).with('greetings', 0) { true } + allow(group).to receive(:generation_id) { 1 } + allow(group).to receive(:join) + allow(group).to receive(:assigned_partitions) { [] } allow(heartbeat).to receive(:trigger) allow(fetcher).to receive(:data?) { fetched_batches.any? } allow(fetcher).to receive(:poll) { [:batches, fetched_batches] } + allow(fetcher).to receive(:reset) consumer.subscribe("greetings") end diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index ad92ebf81..8462bed15 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -113,6 +113,47 @@ expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages end + example "subscribing to multiple topics using regex and enable refreshing the topic list" do + topic_a = generate_topic_name + topic_b = generate_topic_name + + messages_a = (1..500).to_a + messages_b = (501..1000).to_a + messages = messages_a + messages_b + + producer = Kafka.new(kafka_brokers, client_id: "test").producer + + messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) } + producer.deliver_messages + + group_id = "test#{rand(1000)}" + + received_messages = [] + + kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) + consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, refresh_topic_interval: 1) + consumer.subscribe(/#{topic_a}|#{topic_b}/) + + thread = Thread.new do + consumer.each_message do |message| + received_messages << message + + if received_messages.count == messages.count + consumer.stop + end + end + end + thread.abort_on_exception = true + + sleep 1 + messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) } + producer.deliver_messages + + thread.join + + expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages + end + example "consuming messages from a topic that's being written to" do num_partitions = 3 topic = create_random_topic(num_partitions: num_partitions)