diff --git a/CHANGELOG.md b/CHANGELOG.md index 84d7e152..2ef9a3f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changes and additions to the library will be listed here. - Add support for `murmur2` based partitioning. - Add `resolve_seed_brokers` option to support seed brokers' hostname with multiple addresses (#877). - Handle SyncGroup responses with a non-zero error and no assignments (#896). +- Add support for non-identical topic subscriptions within the same consumer group (#525 / #764). ## 1.3.0 diff --git a/lib/kafka/consumer_group.rb b/lib/kafka/consumer_group.rb index 1ad1855f..ac25626f 100644 --- a/lib/kafka/consumer_group.rb +++ b/lib/kafka/consumer_group.rb @@ -189,9 +189,14 @@ def synchronize if group_leader? @logger.info "Chosen as leader of group `#{@group_id}`" + topics = Set.new + @members.each do |_member, metadata| + metadata.topics.each { |t| topics.add(t) } + end + group_assignment = @assignor.assign( members: @members, - topics: @topics, + topics: topics, ) end diff --git a/lib/kafka/round_robin_assignment_strategy.rb b/lib/kafka/round_robin_assignment_strategy.rb index 490929da..2d46ad82 100644 --- a/lib/kafka/round_robin_assignment_strategy.rb +++ b/lib/kafka/round_robin_assignment_strategy.rb @@ -1,9 +1,9 @@ -# frozen_string_literal: true - module Kafka - # A consumer group partition assignment strategy that assigns partitions to - # consumers in a round-robin fashion. + # A round robin assignment strategy inpired on the + # original java client round robin assignor. It's capable + # of handling identical as well as different topic subscriptions + # accross the same consumer group. class RoundRobinAssignmentStrategy def protocol_name "roundrobin" @@ -19,13 +19,34 @@ def protocol_name # @return [Hash] a hash # mapping member ids to partitions. def call(cluster:, members:, partitions:) - member_ids = members.keys partitions_per_member = Hash.new {|h, k| h[k] = [] } - partitions.each_with_index do |partition, index| - partitions_per_member[member_ids[index % member_ids.count]] << partition + relevant_partitions = valid_sorted_partitions(members, partitions) + members_ids = members.keys + iterator = (0...members.size).cycle + idx = iterator.next + + relevant_partitions.each do |partition| + topic = partition.topic + + while !members[members_ids[idx]].topics.include?(topic) + idx = iterator.next + end + + partitions_per_member[members_ids[idx]] << partition + idx = iterator.next end partitions_per_member end + + def valid_sorted_partitions(members, partitions) + subscribed_topics = members.map do |id, metadata| + metadata && metadata.topics + end.flatten.compact + + partitions + .select { |partition| subscribed_topics.include?(partition.topic) } + .sort_by { |partition| partition.topic } + end end end diff --git a/spec/functional/consumer_group_spec.rb b/spec/functional/consumer_group_spec.rb index 322ab9e6..c8e35af3 100644 --- a/spec/functional/consumer_group_spec.rb +++ b/spec/functional/consumer_group_spec.rb @@ -518,6 +518,68 @@ def call(cluster:, members:, partitions:) expect(received_messages.values.map(&:count)).to match_array [messages.count / 3, messages.count / 3 * 2] end + example "subscribing to different topics while in the same consumer group" do + topic1 = create_random_topic(num_partitions: 1) + topic2 = create_random_topic(num_partitions: 1) + messages = (1..500).to_a + + begin + kafka = Kafka.new(kafka_brokers, client_id: "test") + producer = kafka.producer + + messages[0..249].each do |i| + producer.produce(i.to_s, topic: topic1, partition: 0) + end + + messages[250..500].each do |i| + producer.produce(i.to_s, topic: topic2, partition: 0) + end + + producer.deliver_messages + end + + group_id = "test#{rand(1000)}" + + mutex = Mutex.new + received_messages = [] + + assignment_strategy_class = Kafka::RoundRobinAssignmentStrategy + + consumers = [topic1, topic2].map do |topic| + assignment_strategy = assignment_strategy_class.new + kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger) + consumer = kafka.consumer( + group_id: group_id, + offset_retention_time: offset_retention_time, + assignment_strategy: assignment_strategy + ) + consumer.subscribe(topic) + consumer + end + + threads = consumers.map do |consumer| + t = Thread.new do + consumer.each_message do |message| + mutex.synchronize do + received_messages << message + + if received_messages.count == messages.count + consumers.each(&:stop) + end + end + end + end + + t.abort_on_exception = true + + t + end + + threads.each(&:join) + + expect(received_messages.map(&:value).map(&:to_i)).to match_array messages + end + def wait_until(timeout:) Timeout.timeout(timeout) do sleep 0.5 until yield diff --git a/spec/round_robin_assignment_strategy_spec.rb b/spec/round_robin_assignment_strategy_spec.rb index c76b9c05..af4aaf4e 100644 --- a/spec/round_robin_assignment_strategy_spec.rb +++ b/spec/round_robin_assignment_strategy_spec.rb @@ -4,7 +4,7 @@ let(:strategy) { described_class.new } it "assigns all partitions" do - members = Hash[(0...10).map {|i| ["member#{i}", nil] }] + members = Hash[(0...10).map {|i| ["member#{i}", double(topics: ['greetings'])] }] partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) } assignments = strategy.call(cluster: nil, members: members, partitions: partitions) @@ -21,8 +21,8 @@ end it "spreads all partitions between members" do - members = Hash[(0...10).map {|i| ["member#{i}", nil] }] topics = ["topic1", "topic2"] + members = Hash[(0...10).map {|i| ["member#{i}", double(topics: topics)] }] partitions = topics.product((0...5).to_a).map {|topic, i| double(:"partition#{i}", topic: topic, partition_id: i) } @@ -46,36 +46,50 @@ expect(num_partitions_assigned).to all eq(1) end + Metadata = Struct.new(:topics) [ { name: "uneven topics", topics: { "topic1" => [0], "topic2" => (0..50).to_a }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1", "topic2"]), + "member2" => Metadata.new(["topic1", "topic2"]) + }, }, { name: "only one partition", topics: { "topic1" => [0] }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1"]), + "member2" => Metadata.new(["topic1"]) + }, }, { name: "lots of partitions", topics: { "topic1" => (0..100).to_a }, - members: { "member1" => nil }, + members: { "member1" => Metadata.new(["topic1"]) }, }, { name: "lots of members", topics: { "topic1" => (0..10).to_a, "topic2" => (0..10).to_a }, - members: Hash[(0..50).map { |i| ["member#{i}", nil] }] + members: Hash[(0..50).map { |i| ["member#{i}", Metadata.new(["topic1", "topic2"])] }] }, { name: "odd number of partitions", topics: { "topic1" => (0..14).to_a }, - members: { "member1" => nil, "member2" => nil }, + members: { + "member1" => Metadata.new(["topic1"]), + "member2" => Metadata.new(["topic1"]) + }, }, { name: "five topics, 10 partitions, 3 consumers", topics: { "topic1" => [0, 1], "topic2" => [0, 1], "topic3" => [0, 1], "topic4" => [0, 1], "topic5" => [0, 1] }, - members: { "member1" => nil, "member2" => nil, "member3" => nil }, + members: { + "member1" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]), + "member2" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]), + "member3" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]) + }, } ].each do |options| name, topics, members = options[:name], options[:topics], options[:members] @@ -113,4 +127,208 @@ def expect_even_assignments(topics, assignments) expect(num_assigned).to be_within(1).of(num_partitions.to_f / assignments.count) end end + + context 'one consumer no subscriptions or topics / partitions' do + it 'returns empty assignments' do + members = { 'member1' => nil } + partitions = [] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({}) + end + end + + context 'one consumer with subscription but no matching topic partition' do + it 'returns empty assignments' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({}) + end + end + + context 'one consumer subscribed to one topic with one partition' do + it 'assigns the partition to the consumer' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0] + }) + end + end + + context 'one consumer subscribed to one topic with multiple partitions' do + it 'assigns all partitions to the consumer' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1] + }) + end + end + + context 'one consumer subscribed to one topic but with multiple different topic partitions' do + it 'only assigns partitions for the subscribed topic' do + members = { 'member1' => double(topics: ['topic1']) } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1] + }) + end + end + + context 'one consumer subscribed to multiple topics' do + it 'assigns all the topics partitions to the consumer' do + members = { 'member1' => double(topics: ['topic1', 'topic2']) } + + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p1, t2p0] + }) + end + end + + context 'two consumers with one topic and only one partition' do + it 'only assigns the partition to one consumer' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0] + }) + end + end + + context 'two consumers subscribed to one topic with two partitions' do + it 'assigns a partition to each consumer' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1] + }) + end + end + + context 'multiple consumers with mixed topics subscriptions' do + it 'creates a balanced assignment' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1', 'topic2']), + 'member3' => double(topics: ['topic1']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1, t2p0, t2p1], + 'member3' => [t1p2] + }) + end + end + + context 'two consumers subscribed to two topics with three partitions each' do + it 'creates a balanced assignment' do + members = { + 'member1' => double(topics: ['topic1', 'topic2']), + 'member2' => double(topics: ['topic1', 'topic2']) + } + partitions = [ + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + t1p2 = double(:"t1p2", topic: "topic1", partition_id: 2), + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + t2p2 = double(:"t2p2", topic: "topic2", partition_id: 2), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + expect(assignments).to eq({ + 'member1' => [t1p0, t1p2, t2p1], + 'member2' => [t1p1, t2p0, t2p2] + }) + end + end + + context 'many consumers subscribed to one topic with partitions given out of order' do + it 'produces balanced assignments' do + members = { + 'member1' => double(topics: ['topic1']), + 'member2' => double(topics: ['topic1']), + 'member3' => double(topics: ['topic2']), + } + + partitions = [ + t2p0 = double(:"t2p0", topic: "topic2", partition_id: 0), + t1p0 = double(:"t1p0", topic: "topic1", partition_id: 0), + t2p1 = double(:"t2p1", topic: "topic2", partition_id: 1), + t1p1 = double(:"t1p1", topic: "topic1", partition_id: 1), + ] + + assignments = strategy.call(cluster: nil, members: members, partitions: partitions) + + # Without sorting the partitions by topic this input would produce a non balanced assignment: + # member1 => [t1p0, t1p1] + # member2 => [] + # member3 => [t2p0, t2p1] + expect(assignments).to eq({ + 'member1' => [t1p0], + 'member2' => [t1p1], + 'member3' => [t2p0, t2p1] + }) + end + end end