Skip to content

Commit cbab110

Browse files
committed
Introduce Kafka::ConsumerGroup::Assignor
The assignor is responsible for getting partition information and building an assignment from the result of its strategy. That simplifies assignment strategies.
1 parent 5bf2547 commit cbab110

File tree

8 files changed

+181
-118
lines changed

8 files changed

+181
-118
lines changed

Diff for: README.md

+10-11
Original file line numberDiff line numberDiff line change
@@ -727,11 +727,11 @@ end
727727
#### Customizing Partition Assignment Strategy
728728

729729
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
730-
You can implement a custom assignment strategy and use it by passing a proc object that create the strategy as the argument `assignment_strategy_builder` like below:
730+
You can implement a custom assignment strategy and use it by passing an object that implements `#protocol_name`, `#user_data`, and `#assign` as the argument `assignment_strategy` like below:
731731

732732
```ruby
733733
class CustomAssignmentStrategy
734-
def initialize(cluster, user_data)
734+
def initialize(user_data)
735735
@cluster = cluster
736736
@user_data = user_data
737737
end
@@ -748,20 +748,19 @@ class CustomAssignmentStrategy
748748

749749
# Assign the topic partitions to the group members.
750750
#
751-
# @param members [Hash<String, Protocol::JoinGroupResponse::Metadata>] a hash
751+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
752752
# mapping member ids to metadata
753-
# @param topics [Array<String>] topics
754-
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
755-
# ids to assignments.
756-
def assign(members:, topics:)
753+
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
754+
# partitions the consumer group processes
755+
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
756+
# mapping member ids to partitions.
757+
def assign(members:, partitions:)
757758
...
758759
end
759760
end
760761

761-
assignment_strategy_builder = ->(cluster) do
762-
CustomAssignmentStrategy.new(cluster, "some-host-information")
763-
end
764-
consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder)
762+
strategy = CustomAssignmentStrategy.new(cluster, "some-host-information")
763+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
765764
```
766765

767766
### Thread Safety

Diff for: lib/kafka/client.rb

+4-10
Original file line numberDiff line numberDiff line change
@@ -350,8 +350,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
350350
# If it is n (n > 0), the topic list will be refreshed every n seconds
351351
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
352352
# `call(Kafka::FetchedBatch)`.
353-
# @param assignment_strategy_builder [Proc] a procedure that implements
354-
# `call(Kafka::Cluster)` and creates a assignment strategy.
353+
# @param assignment_strategy [Object] a partition assignment strategy that
354+
# implements `protocol_type()`, `user_data()`, and `assign(members:, partitions:)`
355355
# @return [Consumer]
356356
def consumer(
357357
group_id:,
@@ -364,7 +364,7 @@ def consumer(
364364
fetcher_max_queue_size: 100,
365365
refresh_topic_interval: 0,
366366
interceptors: [],
367-
assignment_strategy_builder: nil
367+
assignment_strategy: nil
368368
)
369369
cluster = initialize_cluster
370370

@@ -375,12 +375,6 @@ def consumer(
375375
# The Kafka protocol expects the retention time to be in ms.
376376
retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1
377377

378-
if assignment_strategy_builder
379-
assignment_strategy = assignment_strategy_builder.call(cluster)
380-
else
381-
assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: cluster)
382-
end
383-
384378
group = ConsumerGroup.new(
385379
cluster: cluster,
386380
logger: @logger,
@@ -389,7 +383,7 @@ def consumer(
389383
rebalance_timeout: rebalance_timeout,
390384
retention_time: retention_time,
391385
instrumenter: instrumenter,
392-
assignment_strategy: assignment_strategy,
386+
assignment_strategy: assignment_strategy
393387
)
394388

395389
fetcher = Fetcher.new(

Diff for: lib/kafka/consumer_group.rb

+8-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# frozen_string_literal: true
22

33
require "set"
4+
require "kafka/consumer_group/assignor"
45
require "kafka/round_robin_assignment_strategy"
56

67
module Kafka
@@ -19,7 +20,10 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
1920
@members = {}
2021
@topics = Set.new
2122
@assigned_partitions = {}
22-
@assignment_strategy = assignment_strategy
23+
@assignor = Assignor.new(
24+
cluster: cluster,
25+
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
26+
)
2327
@retention_time = retention_time
2428
end
2529

@@ -144,8 +148,8 @@ def join_group
144148
rebalance_timeout: @rebalance_timeout,
145149
member_id: @member_id,
146150
topics: @topics,
147-
protocol_name: @assignment_strategy.protocol_name,
148-
user_data: @assignment_strategy.user_data,
151+
protocol_name: @assignor.protocol_name,
152+
user_data: @assignor.user_data,
149153
)
150154

151155
Protocol.handle_error(response.error_code)
@@ -182,7 +186,7 @@ def synchronize
182186
if group_leader?
183187
@logger.info "Chosen as leader of group `#{@group_id}`"
184188

185-
group_assignment = @assignment_strategy.assign(
189+
group_assignment = @assignor.assign(
186190
members: @members,
187191
topics: @topics,
188192
)

Diff for: lib/kafka/consumer_group/assignor.rb

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/protocol/member_assignment"
4+
5+
module Kafka
6+
class ConsumerGroup
7+
8+
# A consumer group partition assignor
9+
class Assignor
10+
Partition = Struct.new(:topic, :partition_id)
11+
12+
# @param cluster [Kafka::Cluster]
13+
# @param strategy [Object] an object that implements #protocol_type,
14+
# #user_data, and #assign.
15+
def initialize(cluster:, strategy:)
16+
@cluster = cluster
17+
@strategy = strategy
18+
end
19+
20+
def protocol_name
21+
@strategy.protocol_name
22+
end
23+
24+
def user_data
25+
@strategy.user_data
26+
end
27+
28+
# Assign the topic partitions to the group members.
29+
#
30+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
31+
# mapping member ids to metadata.
32+
# @param topics [Array<String>] topics
33+
# @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
34+
# ids to assignments.
35+
def assign(members:, topics:)
36+
topic_partitions = topics.flat_map do |topic|
37+
begin
38+
partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
39+
rescue UnknownTopicOrPartition
40+
raise UnknownTopicOrPartition, "unknown topic #{topic}"
41+
end
42+
partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
43+
end
44+
45+
group_assignment = {}
46+
47+
members.each_key do |member_id|
48+
group_assignment[member_id] = Protocol::MemberAssignment.new
49+
end
50+
@strategy.assign(members: members, partitions: topic_partitions).each do |member_id, partitions|
51+
Array(partitions).each do |partition|
52+
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
53+
end
54+
end
55+
56+
group_assignment
57+
rescue Kafka::LeaderNotAvailable
58+
sleep 1
59+
retry
60+
end
61+
end
62+
end
63+
end

Diff for: lib/kafka/round_robin_assignment_strategy.rb

+11-40
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
# frozen_string_literal: true
22

3-
require "kafka/protocol/member_assignment"
4-
53
module Kafka
64

75
# A consumer group partition assignment strategy that assigns partitions to
86
# consumers in a round-robin fashion.
97
class RoundRobinAssignmentStrategy
10-
def initialize(cluster:)
11-
@cluster = cluster
12-
end
13-
148
def protocol_name
159
"roundrobin"
1610
end
@@ -21,43 +15,20 @@ def user_data
2115

2216
# Assign the topic partitions to the group members.
2317
#
24-
# @param members [Hash<String, Protocol::JoinGroupResponse::Metadata>] a hash
18+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
2519
# mapping member ids to metadata
26-
# @param topics [Array<String>] topics
27-
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
28-
# ids to assignments.
29-
def assign(members:, topics:)
30-
group_assignment = {}
31-
32-
members.each_key do |member_id|
33-
group_assignment[member_id] = Protocol::MemberAssignment.new
34-
end
35-
36-
topic_partitions = topics.flat_map do |topic|
37-
begin
38-
partitions = @cluster.partitions_for(topic).map(&:partition_id)
39-
rescue UnknownTopicOrPartition
40-
raise UnknownTopicOrPartition, "unknown topic #{topic}"
41-
end
42-
Array.new(partitions.count) { topic }.zip(partitions)
43-
end
44-
45-
partitions_per_member = topic_partitions.group_by.with_index do |_, index|
46-
index % members.count
47-
end.values
48-
49-
members.keys.zip(partitions_per_member).each do |member_id, member_partitions|
50-
unless member_partitions.nil?
51-
member_partitions.each do |topic, partition|
52-
group_assignment[member_id].assign(topic, [partition])
53-
end
54-
end
20+
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
21+
# partitions the consumer group processes
22+
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
23+
# mapping member ids to partitions.
24+
def assign(members:, partitions:)
25+
member_ids = members.keys
26+
partitions_per_member = Hash.new{ |h, k| h[k] = [] }
27+
partitions.each_with_index do |partition, index|
28+
partitions_per_member[member_ids[index % member_ids.count]] << partition
5529
end
5630

57-
group_assignment
58-
rescue Kafka::LeaderNotAvailable
59-
sleep 1
60-
retry
31+
partitions_per_member
6132
end
6233
end
6334
end

Diff for: spec/consumer_group/assignor_spec.rb

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# frozen_string_literal: true
2+
3+
describe Kafka::ConsumerGroup::Assignor do
4+
let(:cluster) { double(:cluster) }
5+
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
6+
let(:strategy) do
7+
klass = Class.new do
8+
def protocol_name
9+
"test"
10+
end
11+
12+
def user_data
13+
nil
14+
end
15+
16+
def assign(members:, partitions:)
17+
assignment = {}
18+
partition_count_per_member = (partitions.count.to_f / members.count).ceil
19+
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
20+
assignment[members.keys[index]] = chunk
21+
end
22+
23+
assignment
24+
end
25+
end
26+
klass.new
27+
end
28+
29+
it "assigns all partitions" do
30+
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
31+
topics = ["greetings"]
32+
partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) }
33+
34+
allow(cluster).to receive(:partitions_for) { partitions }
35+
36+
assignments = assignor.assign(members: members, topics: topics)
37+
38+
partitions.each do |partition|
39+
member = assignments.values.find {|assignment|
40+
assignment.topics.find {|topic, partitions|
41+
partitions.include?(partition.partition_id)
42+
}
43+
}
44+
45+
expect(member).to_not be_nil
46+
end
47+
end
48+
end

Diff for: spec/functional/consumer_group_spec.rb

+8-19
Original file line numberDiff line numberDiff line change
@@ -456,8 +456,7 @@
456456
received_messages = []
457457

458458
assignment_strategy_class = Class.new do
459-
def initialize(cluster, weight)
460-
@cluster = cluster
459+
def initialize(weight)
461460
@weight = weight
462461
end
463462

@@ -469,32 +468,22 @@ def user_data
469468
@weight.to_s
470469
end
471470

472-
def assign(members:, topics:)
473-
group_assignment = members.each_key.with_object({}) do |member_id, assignment|
474-
assignment[member_id] = Kafka::Protocol::MemberAssignment.new
475-
end
476-
471+
def assign(members:, partitions:)
477472
member_ids = members.flat_map { |id, metadata| [id] * metadata.user_data.to_i }
478-
partition_index = 0
479-
topics.each_with_index do |topic, i|
480-
@cluster.partitions_for(topic).each_with_index do |partition, j|
481-
member_id = member_ids[partition_index % member_ids.size]
482-
group_assignment[member_id].assign(topic, [partition.partition_id])
483-
partition_index += 1
484-
end
473+
partitions_per_member = Hash.new{ |h, k| h[k] = [] }
474+
partitions.each_with_index do |partition, index|
475+
partitions_per_member[member_ids[index % member_ids.count]] << partition
485476
end
486477

487-
group_assignment
478+
partitions_per_member
488479
end
489480
end
490481

491482
consumers = 2.times.map do |i|
492-
assignment_strategy_builder = ->(cluster) {
493-
assignment_strategy_class.new(cluster, i + 1)
494-
}
483+
assignment_strategy = assignment_strategy_class.new(i + 1)
495484

496485
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
497-
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy_builder: assignment_strategy_builder)
486+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy: assignment_strategy)
498487
consumer.subscribe(topic)
499488
consumer
500489
end

0 commit comments

Comments
 (0)