Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Allow consumers to refresh the topic lists #818

Merged
merged 10 commits into from
Apr 20, 2020
7 changes: 6 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
# @param fetcher_max_queue_size [Integer] max number of items in the fetch queue that
# are stored for further processing. Note, that each item in the queue represents a
# response from a single broker.
# @param refresh_topic_interval [Integer] interval of refreshing the topic list.
# If it is 0, the topic list won't be refreshed (default)
# If it is n (n > 0), the topic list will be refreshed every n seconds
# @return [Consumer]
def consumer(
group_id:,
Expand All @@ -345,7 +348,8 @@ def consumer(
offset_commit_threshold: 0,
heartbeat_interval: 10,
offset_retention_time: nil,
fetcher_max_queue_size: 100
fetcher_max_queue_size: 100,
refresh_topic_interval: 0
)
cluster = initialize_cluster

Expand Down Expand Up @@ -399,6 +403,7 @@ def consumer(
fetcher: fetcher,
session_timeout: session_timeout,
heartbeat: heartbeat,
refresh_topic_interval: refresh_topic_interval
)
end

Expand Down
63 changes: 54 additions & 9 deletions lib/kafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Kafka
#
class Consumer

def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:, refresh_topic_interval: 0)
@cluster = cluster
@logger = TaggedLogger.new(logger)
@instrumenter = instrumenter
Expand All @@ -53,6 +53,7 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
@session_timeout = session_timeout
@fetcher = fetcher
@heartbeat = heartbeat
@refresh_topic_interval = refresh_topic_interval

@pauses = Hash.new {|h, k|
h[k] = Hash.new {|h2, k2|
Expand All @@ -73,6 +74,15 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
# when user commits message other than last in a batch, this would make ruby-kafka refetch
# some already consumed messages
@current_offsets = Hash.new { |h, k| h[k] = {} }

# Map storing subscribed topics with their configuration
@subscribed_topics = Concurrent::Map.new

# Set storing topics that matched topics in @subscribed_topics
@matched_topics = Set.new

# Whether join_group must be executed again because new topics are added
@join_group_for_new_topics = false
end

# Subscribes the consumer to a topic.
Expand All @@ -97,13 +107,12 @@ def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manage
def subscribe(topic_or_regex, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
default_offset ||= start_from_beginning ? :earliest : :latest

if topic_or_regex.is_a?(Regexp)
cluster_topics.select { |topic| topic =~ topic_or_regex }.each do |topic|
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
end
else
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
end
@subscribed_topics[topic_or_regex] = {
default_offset: default_offset,
start_from_beginning: start_from_beginning,
max_bytes_per_partition: max_bytes_per_partition
}
scan_for_subscribing

nil
end
Expand Down Expand Up @@ -402,6 +411,7 @@ def consumer_loop
while running?
begin
@instrumenter.instrument("loop.consumer") do
refresh_topic_list_if_enabled
yield
end
rescue HeartbeatError
Expand Down Expand Up @@ -453,6 +463,8 @@ def make_final_offsets_commit!(attempts = 3)
end

def join_group
@join_group_for_new_topics = false

old_generation_id = @group.generation_id

@group.join
Expand Down Expand Up @@ -514,11 +526,19 @@ def resume_paused_partitions!
end
end

def refresh_topic_list_if_enabled
return if @refresh_topic_interval <= 0
return if @refreshed_at && @refreshed_at + @refresh_topic_interval > Time.now

scan_for_subscribing
@refreshed_at = Time.now
end

def fetch_batches
# Return early if the consumer has been stopped.
return [] if shutting_down?

join_group unless @group.member?
join_group if !@group.member? || @join_group_for_new_topics

trigger_heartbeat

Expand Down Expand Up @@ -572,10 +592,35 @@ def clear_current_offsets(excluding: {})
end
end

def scan_for_subscribing
@subscribed_topics.keys.each do |topic_or_regex|
default_offset = @subscribed_topics[topic_or_regex][:default_offset]
start_from_beginning = @subscribed_topics[topic_or_regex][:start_from_beginning]
max_bytes_per_partition = @subscribed_topics[topic_or_regex][:max_bytes_per_partition]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it not make sense to use each here instead, e.g.

@subscribed_topics.each do |topic_or_regex, topic_config|
  default_offset = topic_config.fetch(:default_offset) # also I prefer `fetch` because it crashes explicitly
  ...
end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if topic_or_regex.is_a?(Regexp)
subscribe_to_regex(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
else
subscribe_to_topic(topic_or_regex, default_offset, start_from_beginning, max_bytes_per_partition)
end
end
end

def subscribe_to_regex(topic_regex, default_offset, start_from_beginning, max_bytes_per_partition)
cluster_topics.select { |topic| topic =~ topic_regex }.each do |topic|
subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
end
end

def subscribe_to_topic(topic, default_offset, start_from_beginning, max_bytes_per_partition)
return if @matched_topics.include?(topic)
@matched_topics.add(topic)
@join_group_for_new_topics = true

@group.subscribe(topic)
@offset_manager.set_default_offset(topic, default_offset)
@fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition)
@cluster.mark_as_stale!
end

def cluster_topics
Expand Down
6 changes: 6 additions & 0 deletions spec/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@
allow(cluster).to receive(:add_target_topics)
allow(cluster).to receive(:disconnect)
allow(cluster).to receive(:refresh_metadata_if_necessary!)
allow(cluster).to receive(:mark_as_stale!)

allow(offset_manager).to receive(:commit_offsets)
allow(offset_manager).to receive(:commit_offsets_if_necessary)
allow(offset_manager).to receive(:set_default_offset)
allow(offset_manager).to receive(:mark_as_processed)
allow(offset_manager).to receive(:next_offset_for) { 42 }
allow(offset_manager).to receive(:clear_offsets)

allow(group).to receive(:subscribe)
allow(group).to receive(:group_id)
Expand All @@ -151,11 +153,15 @@
allow(group).to receive(:subscribed_partitions) { assigned_partitions }
allow(group).to receive(:assigned_to?) { false }
allow(group).to receive(:assigned_to?).with('greetings', 0) { true }
allow(group).to receive(:generation_id) { 1 }
allow(group).to receive(:join)
allow(group).to receive(:assigned_partitions) { [] }

allow(heartbeat).to receive(:trigger)

allow(fetcher).to receive(:data?) { fetched_batches.any? }
allow(fetcher).to receive(:poll) { [:batches, fetched_batches] }
allow(fetcher).to receive(:reset)

consumer.subscribe("greetings")
end
Expand Down
41 changes: 41 additions & 0 deletions spec/functional/consumer_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,47 @@
expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
end

example "subscribing to multiple topics using regex and enable refreshing the topic list" do
topic_a = generate_topic_name
topic_b = generate_topic_name

messages_a = (1..500).to_a
messages_b = (501..1000).to_a
messages = messages_a + messages_b

producer = Kafka.new(kafka_brokers, client_id: "test").producer

messages_a.each { |i| producer.produce(i.to_s, topic: topic_a) }
producer.deliver_messages

group_id = "test#{rand(1000)}"

received_messages = []

kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, refresh_topic_interval: 1)
consumer.subscribe(/#{topic_a}|#{topic_b}/)

thread = Thread.new do
consumer.each_message do |message|
received_messages << message

if received_messages.count == messages.count
consumer.stop
end
end
end
thread.abort_on_exception = true

sleep 1
messages_b.each { |i| producer.produce(i.to_s, topic: topic_b) }
producer.deliver_messages

thread.join

expect(received_messages.map(&:value).map(&:to_i).sort).to match_array messages
end

example "consuming messages from a topic that's being written to" do
num_partitions = 3
topic = create_random_topic(num_partitions: num_partitions)
Expand Down