Skip to content

Commit 992839c

Browse files
committed
Support custom assignment strategy
This commit adds the feature that allows users to specify custom assignment strategy. In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
1 parent 0a07531 commit 992839c

8 files changed

+172
-18
lines changed

Diff for: README.md

+39
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ A Ruby client library for [Apache Kafka](http://kafka.apache.org/), a distribute
2626
4. [Shutting Down a Consumer](#shutting-down-a-consumer)
2727
5. [Consuming Messages in Batches](#consuming-messages-in-batches)
2828
6. [Balancing Throughput and Latency](#balancing-throughput-and-latency)
29+
7. [Customizing Partition Assignment Strategy](#customizing-partition-assignment-strategy)
2930
4. [Thread Safety](#thread-safety)
3031
5. [Logging](#logging)
3132
6. [Instrumentation](#instrumentation)
@@ -723,6 +724,44 @@ consumer.each_message do |message|
723724
end
724725
```
725726

727+
#### Customizing Partition Assignment Strategy
728+
729+
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
730+
You can implement a custom assignment strategy and use it by passing a proc object that create the strategy as the argument `assignment_strategy_builder` like below:
731+
732+
```ruby
733+
class CustomAssignmentStrategy
734+
def initialize(cluster, user_data)
735+
@cluster = cluster
736+
@user_data = user_data
737+
end
738+
739+
# @return [String]
740+
def protocol_name
741+
...
742+
end
743+
744+
# @return [String, nil]
745+
def user_data
746+
@user_data
747+
end
748+
749+
# Assign the topic partitions to the group members.
750+
#
751+
# @param members [Hash<String, String>] a hash mapping member ids to metadata
752+
# @param topics [Array<String>] topics
753+
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
754+
# ids to assignments.
755+
def assign(members:, topics:)
756+
...
757+
end
758+
end
759+
760+
assignment_strategy_builder = ->(cluster) do
761+
CustomAssignmentStrategy.new(cluster, "some-host-information")
762+
end
763+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy_builder: assignment_strategy_builder)
764+
```
726765

727766
### Thread Safety
728767

Diff for: lib/kafka/client.rb

+11-1
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
350350
# If it is n (n > 0), the topic list will be refreshed every n seconds
351351
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
352352
# `call(Kafka::FetchedBatch)`.
353+
# @param assignment_strategy_builder [Proc] a procedure that implements
354+
# `call(Kafka::Cluster)` and creates a assignment strategy.
353355
# @return [Consumer]
354356
def consumer(
355357
group_id:,
@@ -361,7 +363,8 @@ def consumer(
361363
offset_retention_time: nil,
362364
fetcher_max_queue_size: 100,
363365
refresh_topic_interval: 0,
364-
interceptors: []
366+
interceptors: [],
367+
assignment_strategy_builder: nil
365368
)
366369
cluster = initialize_cluster
367370

@@ -372,6 +375,12 @@ def consumer(
372375
# The Kafka protocol expects the retention time to be in ms.
373376
retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1
374377

378+
if assignment_strategy_builder
379+
assignment_strategy = assignment_strategy_builder.call(cluster)
380+
else
381+
assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: cluster)
382+
end
383+
375384
group = ConsumerGroup.new(
376385
cluster: cluster,
377386
logger: @logger,
@@ -380,6 +389,7 @@ def consumer(
380389
rebalance_timeout: rebalance_timeout,
381390
retention_time: retention_time,
382391
instrumenter: instrumenter,
392+
assignment_strategy: assignment_strategy,
383393
)
384394

385395
fetcher = Fetcher.new(

Diff for: lib/kafka/consumer_group.rb

+5-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module Kafka
77
class ConsumerGroup
88
attr_reader :assigned_partitions, :generation_id, :group_id
99

10-
def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:)
10+
def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:, assignment_strategy:)
1111
@cluster = cluster
1212
@logger = TaggedLogger.new(logger)
1313
@group_id = group_id
@@ -19,7 +19,7 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
1919
@members = {}
2020
@topics = Set.new
2121
@assigned_partitions = {}
22-
@assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: @cluster)
22+
@assignment_strategy = assignment_strategy
2323
@retention_time = retention_time
2424
end
2525

@@ -144,6 +144,8 @@ def join_group
144144
rebalance_timeout: @rebalance_timeout,
145145
member_id: @member_id,
146146
topics: @topics,
147+
protocol_name: @assignment_strategy.protocol_name,
148+
user_data: @assignment_strategy.user_data,
147149
)
148150

149151
Protocol.handle_error(response.error_code)
@@ -181,7 +183,7 @@ def synchronize
181183
@logger.info "Chosen as leader of group `#{@group_id}`"
182184

183185
group_assignment = @assignment_strategy.assign(
184-
members: @members.keys,
186+
members: @members,
185187
topics: @topics,
186188
)
187189
end

Diff for: lib/kafka/protocol/join_group_request.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ module Protocol
77
class JoinGroupRequest
88
PROTOCOL_TYPE = "consumer"
99

10-
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [])
10+
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [], protocol_name:, user_data: nil)
1111
@group_id = group_id
1212
@session_timeout = session_timeout * 1000 # Kafka wants ms.
1313
@rebalance_timeout = rebalance_timeout * 1000 # Kafka wants ms.
1414
@member_id = member_id || ""
1515
@protocol_type = PROTOCOL_TYPE
1616
@group_protocols = {
17-
"roundrobin" => ConsumerGroupProtocol.new(topics: topics),
17+
protocol_name => ConsumerGroupProtocol.new(topics: topics, user_data: user_data),
1818
}
1919
end
2020

Diff for: lib/kafka/protocol/join_group_response.rb

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
module Kafka
44
module Protocol
55
class JoinGroupResponse
6+
Metadata = Struct.new(:version, :topics, :user_data)
7+
68
attr_reader :error_code
79

810
attr_reader :generation_id, :group_protocol
@@ -25,7 +27,13 @@ def self.decode(decoder)
2527
group_protocol: decoder.string,
2628
leader_id: decoder.string,
2729
member_id: decoder.string,
28-
members: Hash[decoder.array { [decoder.string, decoder.bytes] }],
30+
members: Hash[
31+
decoder.array do
32+
member_id = decoder.string
33+
d = Decoder.from_string(decoder.bytes)
34+
[member_id, Metadata.new(d.int16, d.array { d.string }, d.bytes)]
35+
end
36+
],
2937
)
3038
end
3139
end

Diff for: lib/kafka/round_robin_assignment_strategy.rb

+11-3
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,24 @@ def initialize(cluster:)
1111
@cluster = cluster
1212
end
1313

14+
def protocol_name
15+
"roundrobin"
16+
end
17+
18+
def user_data
19+
nil
20+
end
21+
1422
# Assign the topic partitions to the group members.
1523
#
16-
# @param members [Array<String>] member ids
24+
# @param members [Hash<String, String>] a hash mapping member ids to metadata
1725
# @param topics [Array<String>] topics
1826
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
1927
# ids to assignments.
2028
def assign(members:, topics:)
2129
group_assignment = {}
2230

23-
members.each do |member_id|
31+
members.each_key do |member_id|
2432
group_assignment[member_id] = Protocol::MemberAssignment.new
2533
end
2634

@@ -37,7 +45,7 @@ def assign(members:, topics:)
3745
index % members.count
3846
end.values
3947

40-
members.zip(partitions_per_member).each do |member_id, member_partitions|
48+
members.keys.zip(partitions_per_member).each do |member_id, member_partitions|
4149
unless member_partitions.nil?
4250
member_partitions.each do |topic, partition|
4351
group_assignment[member_id].assign(topic, [partition])

Diff for: spec/functional/consumer_group_spec.rb

+87
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,93 @@
435435
end
436436
end
437437

438+
example "consuming messages with a custom assignment strategy" do
439+
topic = create_random_topic(num_partitions: 1)
440+
messages = (1..1000).to_a
441+
442+
begin
443+
kafka = Kafka.new(kafka_brokers, client_id: "test")
444+
producer = kafka.producer
445+
446+
messages.each do |i|
447+
producer.produce(i.to_s, topic: topic, partition: 0)
448+
end
449+
450+
producer.deliver_messages
451+
end
452+
453+
group_id = "test#{rand(1000)}"
454+
455+
mutex = Mutex.new
456+
received_messages = []
457+
458+
assignment_strategy_class = Class.new do
459+
def initialize(cluster, weight)
460+
@cluster = cluster
461+
@weight = weight
462+
end
463+
464+
def protocol_name
465+
"custom"
466+
end
467+
468+
def user_data
469+
@weight.to_s
470+
end
471+
472+
def assign(members:, topics:)
473+
group_assignment = members.each_key.with_object({}) do |member_id, assignment|
474+
assignment[member_id] = Kafka::Protocol::MemberAssignment.new
475+
end
476+
477+
member_ids = members.flat_map { |id, metadata| [id] * metadata.user_data.to_i }
478+
partition_index = 0
479+
topics.each_with_index do |topic, i|
480+
@cluster.partitions_for(topic).each_with_index do |partition, j|
481+
member_id = member_ids[partition_index % member_ids.size]
482+
group_assignment[member_id].assign(topic, [partition.partition_id])
483+
partition_index += 1
484+
end
485+
end
486+
487+
group_assignment
488+
end
489+
end
490+
491+
consumers = 2.times.map do |i|
492+
assignment_strategy_builder = ->(cluster) {
493+
assignment_strategy_class.new(cluster, i + 1)
494+
}
495+
496+
kafka = Kafka.new(kafka_brokers, client_id: "test", logger: logger)
497+
consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time, assignment_strategy_builder: assignment_strategy_builder)
498+
consumer.subscribe(topic)
499+
consumer
500+
end
501+
502+
threads = consumers.map do |consumer|
503+
t = Thread.new do
504+
consumer.each_message do |message|
505+
mutex.synchronize do
506+
received_messages << message
507+
508+
if received_messages.count == messages.count
509+
consumers.each(&:stop)
510+
end
511+
end
512+
end
513+
end
514+
515+
t.abort_on_exception = true
516+
517+
t
518+
end
519+
520+
threads.each(&:join)
521+
522+
expect(received_messages.map(&:value).map(&:to_i)).to match_array messages
523+
end
524+
438525
def wait_until(timeout:)
439526
Timeout.timeout(timeout) do
440527
sleep 0.5 until yield

Diff for: spec/round_robin_assignment_strategy_spec.rb

+8-8
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
let(:strategy) { described_class.new(cluster: cluster) }
66

77
it "assigns all partitions" do
8-
members = (0...10).map {|i| "member#{i}" }
8+
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
99
topics = ["greetings"]
1010
partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) }
1111

@@ -28,7 +28,7 @@
2828
cluster = double(:cluster)
2929
strategy = described_class.new(cluster: cluster)
3030

31-
members = (0...10).map {|i| "member#{i}" }
31+
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
3232
topics = ["topic1", "topic2"]
3333
partitions = (0...5).map {|i| double(:"partition#{i}", partition_id: i) }
3434

@@ -57,32 +57,32 @@
5757
{
5858
name: "uneven topics",
5959
topics: { "topic1" => [0], "topic2" => (0..50).to_a },
60-
members: ["member1", "member2"],
60+
members: { "member1" => nil, "member2" => nil },
6161
},
6262
{
6363
name: "only one partition",
6464
topics: { "topic1" => [0] },
65-
members: ["member1", "member2"],
65+
members: { "member1" => nil, "member2" => nil },
6666
},
6767
{
6868
name: "lots of partitions",
6969
topics: { "topic1" => (0..100).to_a },
70-
members: ["member1"]
70+
members: { "member1" => nil },
7171
},
7272
{
7373
name: "lots of members",
7474
topics: { "topic1" => (0..10).to_a, "topic2" => (0..10).to_a },
75-
members: (0..50).map { |i| "member#{i}" }
75+
members: Hash[(0..50).map { |i| ["member#{i}", nil] }]
7676
},
7777
{
7878
name: "odd number of partitions",
7979
topics: { "topic1" => (0..14).to_a },
80-
members: ["member1", "member2"]
80+
members: { "member1" => nil, "member2" => nil },
8181
},
8282
{
8383
name: "five topics, 10 partitions, 3 consumers",
8484
topics: { "topic1" => [0, 1], "topic2" => [0, 1], "topic3" => [0, 1], "topic4" => [0, 1], "topic5" => [0, 1] },
85-
members: ["member1", "member2", "member3"]
85+
members: { "member1" => nil, "member2" => nil, "member3" => nil },
8686
}
8787
].each do |name:, topics:, members:|
8888
it name do

0 commit comments

Comments
 (0)