Skip to content

Commit c3e90bc

Browse files
authored
Merge pull request #846 from abicky/support-custom-assignment-strategy
Support custom assignment strategy
2 parents 5a360ca + 1b10aed commit c3e90bc

11 files changed

+383
-88
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.
44

55
## Unreleased
66

7+
- Support custom assignment strategy (#846).
8+
79
## 1.2.0
810

911
- Add producer consumer interceptors (#837).

README.md

+83
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ A Ruby client library for [Apache Kafka](http://kafka.apache.org/), a distribute
2626
4. [Shutting Down a Consumer](#shutting-down-a-consumer)
2727
5. [Consuming Messages in Batches](#consuming-messages-in-batches)
2828
6. [Balancing Throughput and Latency](#balancing-throughput-and-latency)
29+
7. [Customizing Partition Assignment Strategy](#customizing-partition-assignment-strategy)
2930
4. [Thread Safety](#thread-safety)
3031
5. [Logging](#logging)
3132
6. [Instrumentation](#instrumentation)
@@ -743,6 +744,88 @@ consumer.each_message do |message|
743744
end
744745
```
745746

747+
#### Customizing Partition Assignment Strategy
748+
749+
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
750+
You can use a custom assignment strategy by passing an object that implements `#call` as the argument `assignment_strategy` like below:
751+
752+
```ruby
753+
class CustomAssignmentStrategy
754+
def initialize(user_data)
755+
@user_data = user_data
756+
end
757+
758+
# Assign the topic partitions to the group members.
759+
#
760+
# @param cluster [Kafka::Cluster]
761+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
762+
# mapping member ids to metadata
763+
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
764+
# partitions the consumer group processes
765+
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
766+
# mapping member ids to partitions.
767+
def call(cluster:, members:, partitions:)
768+
...
769+
end
770+
end
771+
772+
strategy = CustomAssignmentStrategy.new("some-host-information")
773+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
774+
```
775+
776+
`members` is a hash mapping member IDs to metadata, and partitions is a list of partitions the consumer group processes. The method `call` must return a hash mapping member IDs to partitions. For example, the following strategy assigns partitions randomly:
777+
778+
```ruby
779+
class RandomAssignmentStrategy
780+
def call(cluster:, members:, partitions:)
781+
member_ids = members.keys
782+
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
783+
partitions_per_member[member_ids[rand(member_ids.count)]] << partition
784+
end
785+
end
786+
end
787+
```
788+
789+
If the strategy needs user data, you should define the method `user_data` that returns user data on each consumer. For example, the following strategy uses the consumers' IP addresses as user data:
790+
791+
```ruby
792+
class NetworkTopologyAssignmentStrategy
793+
def user_data
794+
Socket.ip_address_list.find(&:ipv4_private?).ip_address
795+
end
796+
797+
def call(cluster:, members:, partitions:)
798+
# Display the pair of the member ID and IP address
799+
members.each do |id, metadata|
800+
puts "#{id}: #{metadata.user_data}"
801+
end
802+
803+
# Assign partitions considering the network topology
804+
...
805+
end
806+
end
807+
```
808+
809+
Note that the strategy uses the class name as the default protocol name. You can change it by defining the method `protocol_name`:
810+
811+
```ruby
812+
class NetworkTopologyAssignmentStrategy
813+
def protocol_name
814+
"networktopology"
815+
end
816+
817+
def user_data
818+
Socket.ip_address_list.find(&:ipv4_private?).ip_address
819+
end
820+
821+
def call(cluster:, members:, partitions:)
822+
...
823+
end
824+
end
825+
```
826+
827+
As the method `call` might receive different user data from what it expects, you should avoid using the same protocol name as another strategy that uses different user data.
828+
746829

747830
### Thread Safety
748831

lib/kafka/client.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
357357
# If it is n (n > 0), the topic list will be refreshed every n seconds
358358
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
359359
# `call(Kafka::FetchedBatch)`.
360+
# @param assignment_strategy [Object] a partition assignment strategy that
361+
# implements `protocol_type()`, `user_data()`, and `assign(members:, partitions:)`
360362
# @return [Consumer]
361363
def consumer(
362364
group_id:,
@@ -368,7 +370,8 @@ def consumer(
368370
offset_retention_time: nil,
369371
fetcher_max_queue_size: 100,
370372
refresh_topic_interval: 0,
371-
interceptors: []
373+
interceptors: [],
374+
assignment_strategy: nil
372375
)
373376
cluster = initialize_cluster
374377

@@ -387,6 +390,7 @@ def consumer(
387390
rebalance_timeout: rebalance_timeout,
388391
retention_time: retention_time,
389392
instrumenter: instrumenter,
393+
assignment_strategy: assignment_strategy
390394
)
391395

392396
fetcher = Fetcher.new(

lib/kafka/consumer_group.rb

+10-4
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
# frozen_string_literal: true
22

33
require "set"
4+
require "kafka/consumer_group/assignor"
45
require "kafka/round_robin_assignment_strategy"
56

67
module Kafka
78
class ConsumerGroup
89
attr_reader :assigned_partitions, :generation_id, :group_id
910

10-
def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:)
11+
def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:, assignment_strategy:)
1112
@cluster = cluster
1213
@logger = TaggedLogger.new(logger)
1314
@group_id = group_id
@@ -19,7 +20,10 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
1920
@members = {}
2021
@topics = Set.new
2122
@assigned_partitions = {}
22-
@assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: @cluster)
23+
@assignor = Assignor.new(
24+
cluster: cluster,
25+
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
26+
)
2327
@retention_time = retention_time
2428
end
2529

@@ -147,6 +151,8 @@ def join_group
147151
rebalance_timeout: @rebalance_timeout,
148152
member_id: @member_id,
149153
topics: @topics,
154+
protocol_name: @assignor.protocol_name,
155+
user_data: @assignor.user_data,
150156
)
151157

152158
Protocol.handle_error(response.error_code)
@@ -183,8 +189,8 @@ def synchronize
183189
if group_leader?
184190
@logger.info "Chosen as leader of group `#{@group_id}`"
185191

186-
group_assignment = @assignment_strategy.assign(
187-
members: @members.keys,
192+
group_assignment = @assignor.assign(
193+
members: @members,
188194
topics: @topics,
189195
)
190196
end

lib/kafka/consumer_group/assignor.rb

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# frozen_string_literal: true
2+
3+
require "kafka/protocol/member_assignment"
4+
5+
module Kafka
6+
class ConsumerGroup
7+
8+
# A consumer group partition assignor
9+
class Assignor
10+
Partition = Struct.new(:topic, :partition_id)
11+
12+
# @param cluster [Kafka::Cluster]
13+
# @param strategy [Object] an object that implements #protocol_type,
14+
# #user_data, and #assign.
15+
def initialize(cluster:, strategy:)
16+
@cluster = cluster
17+
@strategy = strategy
18+
end
19+
20+
def protocol_name
21+
@strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
22+
end
23+
24+
def user_data
25+
@strategy.user_data if @strategy.respond_to?(:user_data)
26+
end
27+
28+
# Assign the topic partitions to the group members.
29+
#
30+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
31+
# mapping member ids to metadata.
32+
# @param topics [Array<String>] topics
33+
# @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
34+
# ids to assignments.
35+
def assign(members:, topics:)
36+
topic_partitions = topics.flat_map do |topic|
37+
begin
38+
partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
39+
rescue UnknownTopicOrPartition
40+
raise UnknownTopicOrPartition, "unknown topic #{topic}"
41+
end
42+
partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
43+
end
44+
45+
group_assignment = {}
46+
47+
members.each_key do |member_id|
48+
group_assignment[member_id] = Protocol::MemberAssignment.new
49+
end
50+
@strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
51+
Array(partitions).each do |partition|
52+
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
53+
end
54+
end
55+
56+
group_assignment
57+
rescue Kafka::LeaderNotAvailable
58+
sleep 1
59+
retry
60+
end
61+
end
62+
end
63+
end

lib/kafka/protocol/join_group_request.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ module Protocol
77
class JoinGroupRequest
88
PROTOCOL_TYPE = "consumer"
99

10-
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [])
10+
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [], protocol_name:, user_data: nil)
1111
@group_id = group_id
1212
@session_timeout = session_timeout * 1000 # Kafka wants ms.
1313
@rebalance_timeout = rebalance_timeout * 1000 # Kafka wants ms.
1414
@member_id = member_id || ""
1515
@protocol_type = PROTOCOL_TYPE
1616
@group_protocols = {
17-
"roundrobin" => ConsumerGroupProtocol.new(topics: topics),
17+
protocol_name => ConsumerGroupProtocol.new(topics: topics, user_data: user_data),
1818
}
1919
end
2020

lib/kafka/protocol/join_group_response.rb

+9-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
module Kafka
44
module Protocol
55
class JoinGroupResponse
6+
Metadata = Struct.new(:version, :topics, :user_data)
7+
68
attr_reader :error_code
79

810
attr_reader :generation_id, :group_protocol
@@ -25,7 +27,13 @@ def self.decode(decoder)
2527
group_protocol: decoder.string,
2628
leader_id: decoder.string,
2729
member_id: decoder.string,
28-
members: Hash[decoder.array { [decoder.string, decoder.bytes] }],
30+
members: Hash[
31+
decoder.array do
32+
member_id = decoder.string
33+
d = Decoder.from_string(decoder.bytes)
34+
[member_id, Metadata.new(d.int16, d.array { d.string }, d.bytes)]
35+
end
36+
],
2937
)
3038
end
3139
end
+15-38
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,31 @@
11
# frozen_string_literal: true
22

3-
require "kafka/protocol/member_assignment"
4-
53
module Kafka
64

75
# A consumer group partition assignment strategy that assigns partitions to
86
# consumers in a round-robin fashion.
97
class RoundRobinAssignmentStrategy
10-
def initialize(cluster:)
11-
@cluster = cluster
8+
def protocol_name
9+
"roundrobin"
1210
end
1311

1412
# Assign the topic partitions to the group members.
1513
#
16-
# @param members [Array<String>] member ids
17-
# @param topics [Array<String>] topics
18-
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
19-
# ids to assignments.
20-
def assign(members:, topics:)
21-
group_assignment = {}
22-
23-
members.each do |member_id|
24-
group_assignment[member_id] = Protocol::MemberAssignment.new
25-
end
26-
27-
topic_partitions = topics.flat_map do |topic|
28-
begin
29-
partitions = @cluster.partitions_for(topic).map(&:partition_id)
30-
rescue UnknownTopicOrPartition
31-
raise UnknownTopicOrPartition, "unknown topic #{topic}"
32-
end
33-
Array.new(partitions.count) { topic }.zip(partitions)
34-
end
35-
36-
partitions_per_member = topic_partitions.group_by.with_index do |_, index|
37-
index % members.count
38-
end.values
39-
40-
members.zip(partitions_per_member).each do |member_id, member_partitions|
41-
unless member_partitions.nil?
42-
member_partitions.each do |topic, partition|
43-
group_assignment[member_id].assign(topic, [partition])
44-
end
45-
end
14+
# @param cluster [Kafka::Cluster]
15+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
16+
# mapping member ids to metadata
17+
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
18+
# partitions the consumer group processes
19+
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
20+
# mapping member ids to partitions.
21+
def call(cluster:, members:, partitions:)
22+
member_ids = members.keys
23+
partitions_per_member = Hash.new {|h, k| h[k] = [] }
24+
partitions.each_with_index do |partition, index|
25+
partitions_per_member[member_ids[index % member_ids.count]] << partition
4626
end
4727

48-
group_assignment
49-
rescue Kafka::LeaderNotAvailable
50-
sleep 1
51-
retry
28+
partitions_per_member
5229
end
5330
end
5431
end

0 commit comments

Comments
 (0)