Skip to content

Commit 253cc4c

Browse files
committed
Support only block call fasion strategy
cf. #846 (comment)
1 parent 76f6f23 commit 253cc4c

File tree

6 files changed

+98
-153
lines changed

6 files changed

+98
-153
lines changed

Diff for: README.md

+27-28
Original file line numberDiff line numberDiff line change
@@ -747,47 +747,46 @@ end
747747
#### Customizing Partition Assignment Strategy
748748

749749
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
750-
You can implement a custom assignment strategy and use it by passing an object that implements `#user_data` and `#assign` as the argument `assignment_strategy` like below:
750+
You can use a custom assignment strategy like below:
751751

752752
```ruby
753-
class CustomAssignmentStrategy
754-
def initialize(user_data)
755-
@user_data = user_data
756-
end
753+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
754+
# strategy goes here
755+
end
756+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: :custom)
757+
```
757758

758-
# @return [String, nil]
759-
def user_data
760-
@user_data
761-
end
759+
`members` is a hash mapping member ids to metadata, and `partitions` is a list of partitions the consumer group processes. The block should return a hash mapping member ids to partitions.
760+
For example, the following strategy assigns partitions randomly:
762761

763-
# Assign the topic partitions to the group members.
764-
#
765-
# @param cluster [Kafka::Cluster]
766-
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
767-
# mapping member ids to metadata
768-
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
769-
# partitions the consumer group processes
770-
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
771-
# mapping member ids to partitions.
772-
def assign(cluster:, members:, partitions:)
773-
...
762+
```ruby
763+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
764+
member_ids = members.keys
765+
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
766+
partitions_per_member[member_ids[rand(member_ids.count)]] << partition
774767
end
775768
end
776-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, CustomAssignmentStrategy)
777-
778-
strategy = CustomAssignmentStrategy.new("some-host-information")
779-
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
780769
```
781770

782-
If the strategy doesn't need user data and constructor arguments, you don't need to define a class:
771+
If the strategy needs user data, you can pass the option `user_data`, a proc that returns user data on each consumer:
783772

784-
```
785-
Kafka::ConsumerGroup::Assignor.register_strategy(:another_custom) do |cluster:, members:, partitions:|
773+
```ruby
774+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, user_data: ->{ ... }) do |cluster:, members:, partitions:|
786775
# strategy goes here
787776
end
788-
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: :another_custom)
789777
```
790778

779+
For example, the following strategy displays the IP address of each consumer:
780+
781+
```ruby
782+
user_data_proc = ->{ Socket.ip_address_list.find(&:ipv4_private?).ip_address }
783+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, user_data: user_data_proc) do |cluster:, members:, partitions:|
784+
members.each do |id, metadata|
785+
puts "#{id}: #{metadata.user_data}"
786+
end
787+
...
788+
end
789+
```
791790

792791
### Thread Safety
793792

Diff for: lib/kafka/consumer_group.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
2222
@assigned_partitions = {}
2323
@assignor = Assignor.new(
2424
cluster: cluster,
25-
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
25+
strategy: assignment_strategy || :roundrobin
2626
)
2727
@retention_time = retention_time
2828
end

Diff for: lib/kafka/consumer_group/assignor.rb

+24-31
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,33 @@ class ConsumerGroup
77

88
# A consumer group partition assignor
99
class Assignor
10+
Strategy = Struct.new(:assign, :user_data)
1011
Partition = Struct.new(:topic, :partition_id)
1112

12-
def self.register_strategy(name, strategy_class = nil, &block)
13-
if strategy_classes[name.to_s]
13+
# @param name [String]
14+
# @param user_data [Proc, nil] a proc that returns user data as a string
15+
# @yield [cluster:, members:, partitions:]
16+
# @yieldparam cluster [Kafka::Cluster]
17+
# @yieldparam members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>]
18+
# a hash mapping member ids to metadata
19+
# @yieldparam partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>]
20+
# a list of partitions the consumer group processes
21+
# @yieldreturn [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>]
22+
# a hash mapping member ids to partitions.
23+
def self.register_strategy(name, user_data: nil, &block)
24+
if strategies[name.to_s]
1425
raise ArgumentError, "The strategy '#{name}' is already registered."
1526
end
1627

17-
unless strategy_class.nil? ^ !block_given?
18-
raise "Either strategy class or block but not both must be specified."
28+
unless block_given?
29+
raise ArgumentError, "The block must be given."
1930
end
2031

21-
if block_given?
22-
strategy_class = Class.new do
23-
define_method(:assign) do |cluster:, members:, partitions:|
24-
block.call(cluster: cluster, members: members, partitions: partitions)
25-
end
26-
end
27-
end
28-
29-
strategy_classes[name.to_s] = strategy_class
32+
strategies[name.to_s] = Strategy.new(block, user_data)
3033
end
3134

32-
def self.strategy_classes
33-
@strategy_classes ||= {}
35+
def self.strategies
36+
@strategies ||= {}
3437
end
3538

3639
attr_reader :protocol_name
@@ -40,25 +43,14 @@ def self.strategy_classes
4043
# #user_data and #assign.
4144
def initialize(cluster:, strategy:)
4245
@cluster = cluster
43-
44-
case strategy
45-
when String, Symbol
46-
@protocol_name = strategy.to_s
47-
klass = self.class.strategy_classes.fetch(@protocol_name) do
48-
raise ArgumentError, "The strategy '#{@protocol_name}' is not registered."
49-
end
50-
@strategy = klass.new
51-
else
52-
@protocol_name, _ = self.class.strategy_classes.find {|name, klass| strategy.is_a?(klass) }
53-
if @protocol_name.nil?
54-
raise ArgumentError, "The strategy class '#{strategy.class}' is not registered."
55-
end
56-
@strategy = strategy
46+
@protocol_name = strategy.to_s
47+
@strategy = self.class.strategies.fetch(@protocol_name) do
48+
raise ArgumentError, "The strategy '#{@protocol_name}' is not registered."
5749
end
5850
end
5951

6052
def user_data
61-
@strategy.user_data if @strategy.respond_to?(:user_data)
53+
@strategy.user_data.call if @strategy.user_data
6254
end
6355

6456
# Assign the topic partitions to the group members.
@@ -83,7 +75,8 @@ def assign(members:, topics:)
8375
members.each_key do |member_id|
8476
group_assignment[member_id] = Protocol::MemberAssignment.new
8577
end
86-
@strategy.assign(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
78+
79+
@strategy.assign.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
8780
Array(partitions).each do |partition|
8881
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
8982
end

Diff for: lib/kafka/round_robin_assignment_strategy.rb

+4-5
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ module Kafka
55
# A consumer group partition assignment strategy that assigns partitions to
66
# consumers in a round-robin fashion.
77
class RoundRobinAssignmentStrategy
8-
def user_data
9-
nil
10-
end
11-
128
# Assign the topic partitions to the group members.
139
#
1410
# @param cluster [Kafka::Cluster]
@@ -31,4 +27,7 @@ def assign(cluster:, members:, partitions:)
3127
end
3228

3329
require "kafka/consumer_group/assignor"
34-
Kafka::ConsumerGroup::Assignor.register_strategy(:roundrobin, Kafka::RoundRobinAssignmentStrategy)
30+
strategy = Kafka::RoundRobinAssignmentStrategy.new
31+
Kafka::ConsumerGroup::Assignor.register_strategy(:roundrobin) do |cluster:, members:, partitions:|
32+
strategy.assign(cluster: cluster, members: members, partitions: partitions)
33+
end

Diff for: spec/consumer_group/assignor_spec.rb

+19-58
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
around do |example|
77
described_class.register_strategy(:custom) { |**kwargs| [] }
88
example.run
9-
described_class.strategy_classes.delete("custom")
9+
described_class.strategies.delete("custom")
1010
end
1111

1212
it do
@@ -17,79 +17,40 @@
1717

1818
describe "#assign" do
1919
let(:cluster) { double(:cluster) }
20-
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
20+
let(:assignor) { described_class.new(cluster: cluster, strategy: :custom) }
2121

2222
let(:members) { Hash[(0...10).map {|i| ["member#{i}", nil] }] }
2323
let(:topics) { ["greetings"] }
2424
let(:partitions) { (0...30).map {|i| double(:"partition#{i}", partition_id: i) } }
2525

2626
before do
2727
allow(cluster).to receive(:partitions_for) { partitions }
28-
end
29-
after do
30-
described_class.strategy_classes.delete("custom")
31-
end
3228

33-
context "when the strategy is an object" do
34-
let(:strategy) do
35-
klass = Class.new do
36-
def assign(cluster:, members:, partitions:)
37-
assignment = {}
38-
partition_count_per_member = (partitions.count.to_f / members.count).ceil
39-
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
40-
assignment[members.keys[index]] = chunk
41-
end
42-
43-
assignment
44-
end
29+
described_class.register_strategy(:custom) do |cluster:, members:, partitions:|
30+
assignment = {}
31+
partition_count_per_member = (partitions.count.to_f / members.count).ceil
32+
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
33+
assignment[members.keys[index]] = chunk
4534
end
46-
described_class.register_strategy(:custom, klass)
47-
48-
klass.new
49-
end
50-
51-
it "assigns all partitions" do
52-
assignments = assignor.assign(members: members, topics: topics)
53-
54-
partitions.each do |partition|
55-
member = assignments.values.find {|assignment|
56-
assignment.topics.find {|topic, partitions|
57-
partitions.include?(partition.partition_id)
58-
}
59-
}
6035

61-
expect(member).to_not be_nil
62-
end
36+
assignment
6337
end
6438
end
39+
after do
40+
described_class.strategies.delete("custom")
41+
end
6542

66-
context "when the strategy is a string" do
67-
let(:strategy) { :custom }
68-
69-
before do
70-
described_class.register_strategy(:custom) do |cluster:, members:, partitions:|
71-
assignment = {}
72-
partition_count_per_member = (partitions.count.to_f / members.count).ceil
73-
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
74-
assignment[members.keys[index]] = chunk
75-
end
76-
77-
assignment
78-
end
79-
end
80-
81-
it "assigns all partitions" do
82-
assignments = assignor.assign(members: members, topics: topics)
43+
it "assigns all partitions" do
44+
assignments = assignor.assign(members: members, topics: topics)
8345

84-
partitions.each do |partition|
85-
member = assignments.values.find {|assignment|
86-
assignment.topics.find {|topic, partitions|
87-
partitions.include?(partition.partition_id)
88-
}
46+
partitions.each do |partition|
47+
member = assignments.values.find {|assignment|
48+
assignment.topics.find {|topic, partitions|
49+
partitions.include?(partition.partition_id)
8950
}
51+
}
9052

91-
expect(member).to_not be_nil
92-
end
53+
expect(member).to_not be_nil
9354
end
9455
end
9556
end

Diff for: spec/functional/consumer_group_spec.rb

+23-30
Original file line numberDiff line numberDiff line change
@@ -436,15 +436,16 @@
436436
end
437437

438438
example "consuming messages with a custom assignment strategy" do
439-
topic = create_random_topic(num_partitions: 1)
440-
messages = (1..1000).to_a
439+
num_partitions = 3
440+
topic = create_random_topic(num_partitions: num_partitions)
441+
messages = (1..100 * num_partitions).to_a
441442

442443
begin
443444
kafka = Kafka.new(kafka_brokers, client_id: "test")
444445
producer = kafka.producer
445446

446447
messages.each do |i|
447-
producer.produce(i.to_s, topic: topic, partition: 0)
448+
producer.produce(i.to_s, topic: topic, partition: i % num_partitions)
448449
end
449450

450451
producer.deliver_messages
@@ -453,35 +454,26 @@
453454
group_id = "test#{rand(1000)}"
454455

455456
mutex = Mutex.new
456-
received_messages = []
457-
458-
assignment_strategy_class = Class.new do
459-
def initialize(weight)
460-
@weight = weight
461-
end
457+
received_messages = Hash.new {|h, k| h[k] = [] }
462458

463-
def user_data
464-
@weight.to_s
459+
consumer_weight = 0
460+
user_data_proc = -> do
461+
consumer_weight += 1
462+
consumer_weight.to_s
463+
end
464+
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, user_data: user_data_proc) do |cluster:, members:, partitions:|
465+
member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
466+
partitions_per_member = Hash.new {|h, k| h[k] = [] }
467+
partitions.each_with_index do |partition, index|
468+
partitions_per_member[member_ids[index % member_ids.count]] << partition
465469
end
466470

467-
def assign(cluster:, members:, partitions:)
468-
member_ids = members.flat_map {|id, metadata| [id] * metadata.user_data.to_i }
469-
partitions_per_member = Hash.new {|h, k| h[k] = [] }
470-
partitions.each_with_index do |partition, index|
471-
partitions_per_member[member_ids[index % member_ids.count]] << partition
472-
end
473-
474-
partitions_per_member
475-
end
471+
partitions_per_member
476472
end
477-
478-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, assignment_strategy_class)
479473
begin
480-
consumers = 2.times.map do |i|
481-
assignment_strategy = assignment_strategy_class.new(i + 1)
482-
474+
consumers = 2.times.map do
483475
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
484-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
476+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: :custom)
485477
consumer.subscribe(topic)
486478
consumer
487479
end
@@ -490,9 +482,9 @@ def assign(cluster:, members:, partitions:)
490482
t = Thread.new do
491483
consumer.each_message do |message|
492484
mutex.synchronize do
493-
received_messages << message
485+
received_messages[consumer] << message
494486

495-
if received_messages.count == messages.count
487+
if received_messages.values.flatten.count == messages.count
496488
consumers.each(&:stop)
497489
end
498490
end
@@ -506,9 +498,10 @@ def assign(cluster:, members:, partitions:)
506498

507499
threads.each(&:join)
508500

509-
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
501+
expect(received_messages.values.flatten.map {|v| v.value.to_i }).to match_array messages
502+
expect(received_messages[consumers[1]].count).to eq received_messages[consumers[0]].count * 2
510503
ensure
511-
Kafka::ConsumerGroup::Assignor.strategy_classes.delete("custom")
504+
Kafka::ConsumerGroup::Assignor.strategies.delete("custom")
512505
end
513506
end
514507

0 commit comments

Comments
 (0)