Skip to content

Support custom assignment strategy #846

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here.

## Unreleased

- Support custom assignment strategy (#846).

## 1.2.0

- Add producer consumer interceptors (#837).
Expand Down
83 changes: 83 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ A Ruby client library for [Apache Kafka](http://kafka.apache.org/), a distribute
4. [Shutting Down a Consumer](#shutting-down-a-consumer)
5. [Consuming Messages in Batches](#consuming-messages-in-batches)
6. [Balancing Throughput and Latency](#balancing-throughput-and-latency)
7. [Customizing Partition Assignment Strategy](#customizing-partition-assignment-strategy)
4. [Thread Safety](#thread-safety)
5. [Logging](#logging)
6. [Instrumentation](#instrumentation)
Expand Down Expand Up @@ -743,6 +744,88 @@ consumer.each_message do |message|
end
```

#### Customizing Partition Assignment Strategy

In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
You can use a custom assignment strategy by passing an object that implements `#call` as the argument `assignment_strategy` like below:

```ruby
class CustomAssignmentStrategy
def initialize(user_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def initialize(user_data)
def initialize(cluster, user_data)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@user_data = user_data
end

# Assign the topic partitions to the group members.
#
# @param cluster [Kafka::Cluster]
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
# partitions the consumer group processes
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def call(cluster:, members:, partitions:)
...
end
end

strategy = CustomAssignmentStrategy.new("some-host-information")
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
```

`members` is a hash mapping member IDs to metadata, and partitions is a list of partitions the consumer group processes. The method `call` must return a hash mapping member IDs to partitions. For example, the following strategy assigns partitions randomly:

```ruby
class RandomAssignmentStrategy
def call(cluster:, members:, partitions:)
member_ids = members.keys
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
partitions_per_member[member_ids[rand(member_ids.count)]] << partition
end
end
end
```

If the strategy needs user data, you should define the method `user_data` that returns user data on each consumer. For example, the following strategy uses the consumers' IP addresses as user data:

```ruby
class NetworkTopologyAssignmentStrategy
def user_data
Socket.ip_address_list.find(&:ipv4_private?).ip_address
end

def call(cluster:, members:, partitions:)
# Display the pair of the member ID and IP address
members.each do |id, metadata|
puts "#{id}: #{metadata.user_data}"
end

# Assign partitions considering the network topology
...
end
end
```

Note that the strategy uses the class name as the default protocol name. You can change it by defining the method `protocol_name`:

```ruby
class NetworkTopologyAssignmentStrategy
def protocol_name
"networktopology"
end

def user_data
Socket.ip_address_list.find(&:ipv4_private?).ip_address
end

def call(cluster:, members:, partitions:)
...
end
end
```

As the method `call` might receive different user data from what it expects, you should avoid using the same protocol name as another strategy that uses different user data.


### Thread Safety

Expand Down
6 changes: 5 additions & 1 deletion lib/kafka/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size:
# If it is n (n > 0), the topic list will be refreshed every n seconds
# @param interceptors [Array<Object>] a list of consumer interceptors that implement
# `call(Kafka::FetchedBatch)`.
# @param assignment_strategy [Object] a partition assignment strategy that
# implements `protocol_type()`, `user_data()`, and `assign(members:, partitions:)`
# @return [Consumer]
def consumer(
group_id:,
Expand All @@ -368,7 +370,8 @@ def consumer(
offset_retention_time: nil,
fetcher_max_queue_size: 100,
refresh_topic_interval: 0,
interceptors: []
interceptors: [],
assignment_strategy: nil
)
cluster = initialize_cluster

Expand All @@ -387,6 +390,7 @@ def consumer(
rebalance_timeout: rebalance_timeout,
retention_time: retention_time,
instrumenter: instrumenter,
assignment_strategy: assignment_strategy
)

fetcher = Fetcher.new(
Expand Down
14 changes: 10 additions & 4 deletions lib/kafka/consumer_group.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# frozen_string_literal: true

require "set"
require "kafka/consumer_group/assignor"
require "kafka/round_robin_assignment_strategy"

module Kafka
class ConsumerGroup
attr_reader :assigned_partitions, :generation_id, :group_id

def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:)
def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout:, retention_time:, instrumenter:, assignment_strategy:)
@cluster = cluster
@logger = TaggedLogger.new(logger)
@group_id = group_id
Expand All @@ -19,7 +20,10 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
@members = {}
@topics = Set.new
@assigned_partitions = {}
@assignment_strategy = RoundRobinAssignmentStrategy.new(cluster: @cluster)
@assignor = Assignor.new(
cluster: cluster,
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
)
@retention_time = retention_time
end

Expand Down Expand Up @@ -144,6 +148,8 @@ def join_group
rebalance_timeout: @rebalance_timeout,
member_id: @member_id,
topics: @topics,
protocol_name: @assignor.protocol_name,
user_data: @assignor.user_data,
)

Protocol.handle_error(response.error_code)
Expand Down Expand Up @@ -180,8 +186,8 @@ def synchronize
if group_leader?
@logger.info "Chosen as leader of group `#{@group_id}`"

group_assignment = @assignment_strategy.assign(
members: @members.keys,
group_assignment = @assignor.assign(
members: @members,
topics: @topics,
)
end
Expand Down
63 changes: 63 additions & 0 deletions lib/kafka/consumer_group/assignor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# frozen_string_literal: true

require "kafka/protocol/member_assignment"

module Kafka
class ConsumerGroup

# A consumer group partition assignor
class Assignor
Partition = Struct.new(:topic, :partition_id)

# @param cluster [Kafka::Cluster]
# @param strategy [Object] an object that implements #protocol_type,
# #user_data, and #assign.
def initialize(cluster:, strategy:)
@cluster = cluster
@strategy = strategy
end

def protocol_name
@strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
end

def user_data
@strategy.user_data if @strategy.respond_to?(:user_data)
end

# Assign the topic partitions to the group members.
#
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata.
# @param topics [Array<String>] topics
# @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
topic_partitions = topics.flat_map do |topic|
begin
partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
rescue UnknownTopicOrPartition
raise UnknownTopicOrPartition, "unknown topic #{topic}"
end
partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
end

group_assignment = {}

members.each_key do |member_id|
group_assignment[member_id] = Protocol::MemberAssignment.new
end
@strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
Array(partitions).each do |partition|
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
end
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call to split the boilerplate from the real assignment strategy, given it's sufficiently flexible to handle real world strategies.


group_assignment
rescue Kafka::LeaderNotAvailable
sleep 1
retry
end
end
end
end
4 changes: 2 additions & 2 deletions lib/kafka/protocol/join_group_request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ module Protocol
class JoinGroupRequest
PROTOCOL_TYPE = "consumer"

def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [])
def initialize(group_id:, session_timeout:, rebalance_timeout:, member_id:, topics: [], protocol_name:, user_data: nil)
@group_id = group_id
@session_timeout = session_timeout * 1000 # Kafka wants ms.
@rebalance_timeout = rebalance_timeout * 1000 # Kafka wants ms.
@member_id = member_id || ""
@protocol_type = PROTOCOL_TYPE
@group_protocols = {
"roundrobin" => ConsumerGroupProtocol.new(topics: topics),
protocol_name => ConsumerGroupProtocol.new(topics: topics, user_data: user_data),
}
end

Expand Down
10 changes: 9 additions & 1 deletion lib/kafka/protocol/join_group_response.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
module Kafka
module Protocol
class JoinGroupResponse
Metadata = Struct.new(:version, :topics, :user_data)

attr_reader :error_code

attr_reader :generation_id, :group_protocol
Expand All @@ -25,7 +27,13 @@ def self.decode(decoder)
group_protocol: decoder.string,
leader_id: decoder.string,
member_id: decoder.string,
members: Hash[decoder.array { [decoder.string, decoder.bytes] }],
members: Hash[
decoder.array do
member_id = decoder.string
d = Decoder.from_string(decoder.bytes)
[member_id, Metadata.new(d.int16, d.array { d.string }, d.bytes)]
end
],
)
end
end
Expand Down
53 changes: 15 additions & 38 deletions lib/kafka/round_robin_assignment_strategy.rb
Original file line number Diff line number Diff line change
@@ -1,54 +1,31 @@
# frozen_string_literal: true

require "kafka/protocol/member_assignment"

module Kafka

# A consumer group partition assignment strategy that assigns partitions to
# consumers in a round-robin fashion.
class RoundRobinAssignmentStrategy
def initialize(cluster:)
@cluster = cluster
def protocol_name
"roundrobin"
end

# Assign the topic partitions to the group members.
#
# @param members [Array<String>] member ids
# @param topics [Array<String>] topics
# @return [Hash<String, Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
group_assignment = {}

members.each do |member_id|
group_assignment[member_id] = Protocol::MemberAssignment.new
end

topic_partitions = topics.flat_map do |topic|
begin
partitions = @cluster.partitions_for(topic).map(&:partition_id)
rescue UnknownTopicOrPartition
raise UnknownTopicOrPartition, "unknown topic #{topic}"
end
Array.new(partitions.count) { topic }.zip(partitions)
end

partitions_per_member = topic_partitions.group_by.with_index do |_, index|
index % members.count
end.values

members.zip(partitions_per_member).each do |member_id, member_partitions|
unless member_partitions.nil?
member_partitions.each do |topic, partition|
group_assignment[member_id].assign(topic, [partition])
end
end
# @param cluster [Kafka::Cluster]
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
# partitions the consumer group processes
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def call(cluster:, members:, partitions:)
member_ids = members.keys
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end

group_assignment
rescue Kafka::LeaderNotAvailable
sleep 1
retry
partitions_per_member
end
end
end
Loading