Skip to content

Commit 0f7a9e9

Browse files
committed
Implement Kafka::ConsumerGroup::Assignor.register_strategy
This method prevents us from using the same protocol name and makes us register custom strategies easily.
1 parent fbd121d commit 0f7a9e9

File tree

6 files changed

+169
-77
lines changed

6 files changed

+169
-77
lines changed

Diff for: README.md

+14-7
Original file line numberDiff line numberDiff line change
@@ -727,41 +727,48 @@ end
727727
#### Customizing Partition Assignment Strategy
728728

729729
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
730-
You can implement a custom assignment strategy and use it by passing an object that implements `#protocol_name`, `#user_data`, and `#assign` as the argument `assignment_strategy` like below:
730+
You can implement a custom assignment strategy and use it by passing an object that implements `#user_data` and `#assign` as the argument `assignment_strategy` like below:
731731

732732
```ruby
733733
class CustomAssignmentStrategy
734734
def initialize(user_data)
735735
@user_data = user_data
736736
end
737737

738-
# @return [String]
739-
def protocol_name
740-
...
741-
end
742-
743738
# @return [String, nil]
744739
def user_data
745740
@user_data
746741
end
747742

748743
# Assign the topic partitions to the group members.
749744
#
745+
# @param cluster [Kafka::Cluster]
750746
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
751747
# mapping member ids to metadata
752748
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
753749
# partitions the consumer group processes
754750
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
755751
# mapping member ids to partitions.
756-
def assign(members:, partitions:)
752+
def assign(cluster:, members:, partitions:)
757753
...
758754
end
759755
end
756+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)
760757

761758
strategy = CustomAssignmentStrategy.new("some-host-information")
762759
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
763760
```
764761

762+
If the strategy doesn't need user data and constructor arguments, you don't need to define a class:
763+
764+
```
765+
Kafka::ConsumerGroup::Assignor.register_strategy(:another_custom) do |cluster:, members:, partitions:|
766+
# strategy goes here
767+
end
768+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: :another_custom)
769+
```
770+
771+
765772
### Thread Safety
766773

767774
You typically don't want to share a Kafka client object between threads, since the network communication is not synchronized. Furthermore, you should avoid using threads in a consumer unless you're very careful about waiting for all work to complete before returning from the `#each_message` or `#each_batch` block. This is because _checkpointing_ assumes that returning from the block means that the messages that have been yielded have been successfully processed.

Diff for: lib/kafka/consumer_group/assignor.rb

+44-8
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,56 @@ class ConsumerGroup
99
class Assignor
1010
Partition = Struct.new(:topic, :partition_id)
1111

12+
def self.register_strategy(name, strategy_class = nil, &block)
13+
if strategy_classes[name.to_s]
14+
raise ArgumentError, "The strategy '#{name}' is already registered."
15+
end
16+
17+
unless strategy_class.nil? ^ !block_given?
18+
raise "Either strategy class or block but not both must be specified."
19+
end
20+
21+
if block_given?
22+
strategy_class = Class.new do
23+
define_method(:assign) do |cluster:, members:, partitions:|
24+
block.call(cluster: cluster, members: members, partitions: partitions)
25+
end
26+
end
27+
end
28+
29+
strategy_classes[name.to_s] = strategy_class
30+
end
31+
32+
def self.strategy_classes
33+
@strategy_classes ||= {}
34+
end
35+
36+
attr_reader :protocol_name
37+
1238
# @param cluster [Kafka::Cluster]
13-
# @param strategy [Object] an object that implements #protocol_type,
14-
# #user_data, and #assign.
39+
# @param strategy [String, Symbol, Object] a string, a symbol, or an object that implements
40+
# #user_data and #assign.
1541
def initialize(cluster:, strategy:)
1642
@cluster = cluster
17-
@strategy = strategy
18-
end
1943

20-
def protocol_name
21-
@strategy.protocol_name
44+
case strategy
45+
when String, Symbol
46+
@protocol_name = strategy.to_s
47+
klass = self.class.strategy_classes.fetch(@protocol_name) do
48+
raise ArgumentError, "The strategy '#{@protocol_name}' is not registered."
49+
end
50+
@strategy = klass.new
51+
else
52+
@protocol_name, _ = self.class.strategy_classes.find {|name, klass| strategy.is_a?(klass) }
53+
if @protocol_name.nil?
54+
raise ArgumentError, "The strategy class '#{strategy.class}' is not registered."
55+
end
56+
@strategy = strategy
57+
end
2258
end
2359

2460
def user_data
25-
@strategy.user_data
61+
@strategy.user_data if @strategy.respond_to?(:user_data)
2662
end
2763

2864
# Assign the topic partitions to the group members.
@@ -47,7 +83,7 @@ def assign(members:, topics:)
4783
members.each_key do |member_id|
4884
group_assignment[member_id] = Protocol::MemberAssignment.new
4985
end
50-
@strategy.assign(members: members, partitions: topic_partitions).each do |member_id, partitions|
86+
@strategy.assign(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
5187
Array(partitions).each do |partition|
5288
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
5389
end

Diff for: lib/kafka/round_robin_assignment_strategy.rb

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,20 @@ module Kafka
55
# A consumer group partition assignment strategy that assigns partitions to
66
# consumers in a round-robin fashion.
77
class RoundRobinAssignmentStrategy
8-
def protocol_name
9-
"roundrobin"
10-
end
11-
128
def user_data
139
nil
1410
end
1511

1612
# Assign the topic partitions to the group members.
1713
#
14+
# @param cluster [Kafka::Cluster]
1815
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
1916
# mapping member ids to metadata
2017
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
2118
# partitions the consumer group processes
2219
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
2320
# mapping member ids to partitions.
24-
def assign(members:, partitions:)
21+
def assign(cluster:, members:, partitions:)
2522
member_ids = members.keys
2623
partitions_per_member = Hash.new {|h, k| h[k] = [] }
2724
partitions.each_with_index do |partition, index|
@@ -32,3 +29,6 @@ def assign(members:, partitions:)
3229
end
3330
end
3431
end
32+
33+
require "kafka/consumer_group/assignor"
34+
Kafka::ConsumerGroup::Assignor.register_strategy(:roundrobin, Kafka::RoundRobinAssignmentStrategy)

Diff for: spec/consumer_group/assignor_spec.rb

+77-29
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,96 @@
11
# frozen_string_literal: true
22

33
describe Kafka::ConsumerGroup::Assignor do
4-
let(:cluster) { double(:cluster) }
5-
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
6-
let(:strategy) do
7-
klass = Class.new do
8-
def protocol_name
9-
"test"
4+
describe ".register_strategy" do
5+
context "when another strategy is already registered with the same name" do
6+
around do |example|
7+
described_class.register_strategy(:custom) { |**kwargs| [] }
8+
example.run
9+
described_class.strategy_classes.delete("custom")
1010
end
1111

12-
def user_data
13-
nil
12+
it do
13+
expect { described_class.register_strategy(:custom) { |**kwargs| [] } }.to raise_error(ArgumentError)
1414
end
15+
end
16+
end
17+
18+
describe "#assign" do
19+
let(:cluster) { double(:cluster) }
20+
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
21+
22+
let(:members) { Hash[(0...10).map {|i| ["member#{i}", nil] }] }
23+
let(:topics) { ["greetings"] }
24+
let(:partitions) { (0...30).map {|i| double(:"partition#{i}", partition_id: i) } }
25+
26+
before do
27+
allow(cluster).to receive(:partitions_for) { partitions }
28+
end
29+
after do
30+
described_class.strategy_classes.delete("custom")
31+
end
32+
33+
context "when the strategy is an object" do
34+
let(:strategy) do
35+
klass = Class.new do
36+
def assign(cluster:, members:, partitions:)
37+
assignment = {}
38+
partition_count_per_member = (partitions.count.to_f / members.count).ceil
39+
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
40+
assignment[members.keys[index]] = chunk
41+
end
1542

16-
def assign(members:, partitions:)
17-
assignment = {}
18-
partition_count_per_member = (partitions.count.to_f / members.count).ceil
19-
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
20-
assignment[members.keys[index]] = chunk
43+
assignment
44+
end
2145
end
46+
described_class.register_strategy(:custom, klass)
2247

23-
assignment
48+
klass.new
49+
end
50+
51+
it "assigns all partitions" do
52+
assignments = assignor.assign(members: members, topics: topics)
53+
54+
partitions.each do |partition|
55+
member = assignments.values.find {|assignment|
56+
assignment.topics.find {|topic, partitions|
57+
partitions.include?(partition.partition_id)
58+
}
59+
}
60+
61+
expect(member).to_not be_nil
62+
end
2463
end
2564
end
26-
klass.new
27-
end
2865

29-
it "assigns all partitions" do
30-
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
31-
topics = ["greetings"]
32-
partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) }
66+
context "when the strategy is a string" do
67+
let(:strategy) { :custom }
3368

34-
allow(cluster).to receive(:partitions_for) { partitions }
69+
before do
70+
described_class.register_strategy(:custom) do |cluster:, members:, partitions:|
71+
assignment = {}
72+
partition_count_per_member = (partitions.count.to_f / members.count).ceil
73+
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
74+
assignment[members.keys[index]] = chunk
75+
end
3576

36-
assignments = assignor.assign(members: members, topics: topics)
77+
assignment
78+
end
79+
end
80+
81+
it "assigns all partitions" do
82+
assignments = assignor.assign(members: members, topics: topics)
3783

38-
partitions.each do |partition|
39-
member = assignments.values.find {|assignment|
40-
assignment.topics.find {|topic, partitions|
41-
partitions.include?(partition.partition_id)
42-
}
43-
}
84+
partitions.each do |partition|
85+
member = assignments.values.find {|assignment|
86+
assignment.topics.find {|topic, partitions|
87+
partitions.include?(partition.partition_id)
88+
}
89+
}
4490

45-
expect(member).to_not be_nil
91+
expect(member).to_not be_nil
92+
end
93+
end
4694
end
4795
end
4896
end

Diff for: spec/functional/consumer_group_spec.rb

+26-25
Original file line numberDiff line numberDiff line change
@@ -460,15 +460,11 @@ def initialize(weight)
460460
@weight = weight
461461
end
462462

463-
def protocol_name
464-
"custom"
465-
end
466-
467463
def user_data
468464
@weight.to_s
469465
end
470466

471-
def assign(members:, partitions:)
467+
def assign(cluster:, members:, partitions:)
472468
member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
473469
partitions_per_member = Hash.new {|h, k| h[k] = [] }
474470
partitions.each_with_index do |partition, index|
@@ -479,36 +475,41 @@ def assign(members:, partitions:)
479475
end
480476
end
481477

482-
consumers = 2.times.map do |i|
483-
assignment_strategy = assignment_strategy_class.new(i + 1)
478+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, assignment_strategy_class)
479+
begin
480+
consumers = 2.times.map do |i|
481+
assignment_strategy = assignment_strategy_class.new(i + 1)
484482

485-
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
486-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
487-
consumer.subscribe(topic)
488-
consumer
489-
end
483+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
484+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
485+
consumer.subscribe(topic)
486+
consumer
487+
end
490488

491-
threads = consumers.map do |consumer|
492-
t = Thread.new do
493-
consumer.each_message do |message|
494-
mutex.synchronize do
495-
received_messages << message
489+
threads = consumers.map do |consumer|
490+
t = Thread.new do
491+
consumer.each_message do |message|
492+
mutex.synchronize do
493+
received_messages << message
496494

497-
if received_messages.count == messages.count
498-
consumers.each(&:stop)
495+
if received_messages.count == messages.count
496+
consumers.each(&:stop)
497+
end
499498
end
500499
end
501500
end
502-
end
503501

504-
t.abort_on_exception = true
502+
t.abort_on_exception = true
505503

506-
t
507-
end
504+
t
505+
end
508506

509-
threads.each(&:join)
507+
threads.each(&:join)
510508

511-
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
509+
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
510+
ensure
511+
Kafka::ConsumerGroup::Assignor.strategy_classes.delete("custom")
512+
end
512513
end
513514

514515
def wait_until(timeout:)

Diff for: spec/round_robin_assignment_strategy_spec.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
88
partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) }
99

10-
assignments = strategy.assign(members: members, partitions: partitions)
10+
assignments = strategy.assign(cluster: nil, members: members, partitions: partitions)
1111

1212
partitions.each do |partition|
1313
member = assignments.values.find {|assigned_partitions|
@@ -27,7 +27,7 @@
2727
double(:"partition#{i}", topic: topic, partition_id: i)
2828
}
2929

30-
assignments = strategy.assign(members: members, partitions: partitions)
30+
assignments = strategy.assign(cluster: nil, members: members, partitions: partitions)
3131

3232
partitions.each do |partition|
3333
member = assignments.values.find {|assigned_partitions|
@@ -85,7 +85,7 @@
8585
}
8686
}
8787

88-
assignments = strategy.assign(members: members, partitions: partitions)
88+
assignments = strategy.assign(cluster: nil, members: members, partitions: partitions)
8989

9090
expect_all_partitions_assigned(topics, assignments)
9191
expect_even_assignments(topics, assignments)

0 commit comments

Comments
 (0)