Skip to content

Commit a6c1ef4

Browse files
committed
Remove Kafka::ConsumerGroup::Assignor.register_strategy
See zendesk#846 (comment) for the reason.
1 parent f1a8162 commit a6c1ef4

File tree

7 files changed

+166
-127
lines changed

7 files changed

+166
-127
lines changed

README.md

+58-19
Original file line numberDiff line numberDiff line change
@@ -747,47 +747,86 @@ end
747747
#### Customizing Partition Assignment Strategy
748748

749749
In some cases, you might want to assign more partitions to some consumers. For example, in applications inserting some records to a database, the consumers running on hosts nearby the database can process more messages than the consumers running on other hosts.
750-
You can use a custom assignment strategy like below:
750+
You can use a custom assignment strategy by passing an object that implements `#call` as the argument `assignment_strategy` like below:
751751

752752
```ruby
753-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
754-
# strategy goes here
753+
class CustomAssignmentStrategy
754+
def initialize(user_data)
755+
@user_data = user_data
756+
end
757+
758+
# Assign the topic partitions to the group members.
759+
#
760+
# @param cluster [Kafka::Cluster]
761+
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
762+
# mapping member ids to metadata
763+
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
764+
# partitions the consumer group processes
765+
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
766+
# mapping member ids to partitions.
767+
def call(cluster:, members:, partitions:)
768+
...
769+
end
755770
end
756-
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: :custom)
771+
772+
strategy = CustomAssignmentStrategy.new("some-host-information")
773+
consumer = kafka.consumer(group_id: "some-group", assignment_strategy: strategy)
757774
```
758775

759-
`members` is a hash mapping member ids to metadata, and `partitions` is a list of partitions the consumer group processes. The block should return a hash mapping member ids to partitions.
760-
For example, the following strategy assigns partitions randomly:
776+
`members` is a hash mapping member IDs to metadata, and partitions is a list of partitions the consumer group processes. The method `call` must return a hash mapping member IDs to partitions. For example, the following strategy assigns partitions randomly:
761777

762778
```ruby
763-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom) do |cluster:, members:, partitions:|
764-
member_ids = members.keys
765-
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
766-
partitions_per_member[member_ids[rand(member_ids.count)]] << partition
779+
class RandomAssignmentStrategy
780+
def call(cluster:, members:, partitions:)
781+
member_ids = members.keys
782+
partitions.each_with_object(Hash.new {|h, k| h[k] = [] }) do |partition, partitions_per_member|
783+
partitions_per_member[member_ids[rand(member_ids.count)]] << partition
784+
end
767785
end
768786
end
769787
```
770788

771-
If the strategy needs user data, you can pass the option `user_data`, a proc that returns user data on each consumer:
789+
If the strategy needs user data, you should define the method `user_data` that returns user data on each consumer. For example, the following strategy uses the consumers' IP addresses as user data:
772790

773791
```ruby
774-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, user_data: ->{ ... }) do |cluster:, members:, partitions:|
775-
# strategy goes here
792+
class NetworkTopologyAssignmentStrategy
793+
def user_data
794+
Socket.ip_address_list.find(&:ipv4_private?).ip_address
795+
end
796+
797+
def call(cluster:, members:, partitions:)
798+
# Display the pair of the member ID and IP address
799+
members.each do |id, metadata|
800+
puts "#{id}: #{metadata.user_data}"
801+
end
802+
803+
# Assign partitions considering the network topology
804+
...
805+
end
776806
end
777807
```
778808

779-
For example, the following strategy displays the IP address of each consumer:
809+
Note that the strategy uses the class name as the default protocol name. You can change it by defining the method `protocol_name`:
780810

781811
```ruby
782-
user_data_proc = ->{ Socket.ip_address_list.find(&:ipv4_private?).ip_address }
783-
Kafka::ConsumerGroup::Assignor.register_strategy(:custom, user_data: user_data_proc) do |cluster:, members:, partitions:|
784-
members.each do |id, metadata|
785-
puts "#{id}: #{metadata.user_data}"
812+
class NetworkTopologyAssignmentStrategy
813+
def protocol_name
814+
"networktopology"
815+
end
816+
817+
def user_data
818+
Socket.ip_address_list.find(&:ipv4_private?).ip_address
819+
end
820+
821+
def call(cluster:, members:, partitions:)
822+
...
786823
end
787-
...
788824
end
789825
```
790826

827+
As the method `call` might receive different user data from what it expects, you should avoid using the same protocol name as another strategy that uses different user data.
828+
829+
791830
### Thread Safety
792831

793832
You typically don't want to share a Kafka client object between threads, since the network communication is not synchronized. Furthermore, you should avoid using threads in a consumer unless you're very careful about waiting for all work to complete before returning from the `#each_message` or `#each_batch` block. This is because _checkpointing_ assumes that returning from the block means that the messages that have been yielded have been successfully processed.

lib/kafka/consumer_group.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def initialize(cluster:, logger:, group_id:, session_timeout:, rebalance_timeout
2222
@assigned_partitions = {}
2323
@assignor = Assignor.new(
2424
cluster: cluster,
25-
strategy: assignment_strategy || :roundrobin
25+
strategy: assignment_strategy || RoundRobinAssignmentStrategy.new
2626
)
2727
@retention_time = retention_time
2828
end

lib/kafka/consumer_group/assignor.rb

+9-38
Original file line numberDiff line numberDiff line change
@@ -7,50 +7,22 @@ class ConsumerGroup
77

88
# A consumer group partition assignor
99
class Assignor
10-
Strategy = Struct.new(:assign, :user_data)
1110
Partition = Struct.new(:topic, :partition_id)
1211

13-
# @param name [String]
14-
# @param user_data [Proc, nil] a proc that returns user data as a string
15-
# @yield [cluster:, members:, partitions:]
16-
# @yieldparam cluster [Kafka::Cluster]
17-
# @yieldparam members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>]
18-
# a hash mapping member ids to metadata
19-
# @yieldparam partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>]
20-
# a list of partitions the consumer group processes
21-
# @yieldreturn [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>]
22-
# a hash mapping member ids to partitions.
23-
def self.register_strategy(name, user_data: nil, &block)
24-
if strategies[name.to_s]
25-
raise ArgumentError, "The strategy '#{name}' is already registered."
26-
end
27-
28-
unless block_given?
29-
raise ArgumentError, "The block must be given."
30-
end
31-
32-
strategies[name.to_s] = Strategy.new(block, user_data)
33-
end
34-
35-
def self.strategies
36-
@strategies ||= {}
37-
end
38-
39-
attr_reader :protocol_name
40-
4112
# @param cluster [Kafka::Cluster]
42-
# @param strategy [String, Symbol, Object] a string, a symbol, or an object that implements
43-
# #user_data and #assign.
13+
# @param strategy [Object] an object that implements #protocol_type,
14+
# #user_data, and #assign.
4415
def initialize(cluster:, strategy:)
4516
@cluster = cluster
46-
@protocol_name = strategy.to_s
47-
@strategy = self.class.strategies.fetch(@protocol_name) do
48-
raise ArgumentError, "The strategy '#{@protocol_name}' is not registered."
49-
end
17+
@strategy = strategy
18+
end
19+
20+
def protocol_name
21+
@strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
5022
end
5123

5224
def user_data
53-
@strategy.user_data.call if @strategy.user_data
25+
@strategy.user_data if @strategy.respond_to?(:user_data)
5426
end
5527

5628
# Assign the topic partitions to the group members.
@@ -75,8 +47,7 @@ def assign(members:, topics:)
7547
members.each_key do |member_id|
7648
group_assignment[member_id] = Protocol::MemberAssignment.new
7749
end
78-
79-
@strategy.assign.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
50+
@strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
8051
Array(partitions).each do |partition|
8152
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
8253
end

lib/kafka/round_robin_assignment_strategy.rb

+5-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ module Kafka
55
# A consumer group partition assignment strategy that assigns partitions to
66
# consumers in a round-robin fashion.
77
class RoundRobinAssignmentStrategy
8+
def protocol_name
9+
"roundrobin"
10+
end
11+
812
# Assign the topic partitions to the group members.
913
#
1014
# @param cluster [Kafka::Cluster]
@@ -14,7 +18,7 @@ class RoundRobinAssignmentStrategy
1418
# partitions the consumer group processes
1519
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
1620
# mapping member ids to partitions.
17-
def assign(cluster:, members:, partitions:)
21+
def call(cluster:, members:, partitions:)
1822
member_ids = members.keys
1923
partitions_per_member = Hash.new {|h, k| h[k] = [] }
2024
partitions.each_with_index do |partition, index|
@@ -25,9 +29,3 @@ def assign(cluster:, members:, partitions:)
2529
end
2630
end
2731
end
28-
29-
require "kafka/consumer_group/assignor"
30-
strategy = Kafka::RoundRobinAssignmentStrategy.new
31-
Kafka::ConsumerGroup::Assignor.register_strategy(:roundrobin) do |cluster:, members:, partitions:|
32-
strategy.assign(cluster: cluster, members: members, partitions: partitions)
33-
end

spec/consumer_group/assignor_spec.rb

+51-25
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,72 @@
11
# frozen_string_literal: true
22

33
describe Kafka::ConsumerGroup::Assignor do
4-
describe ".register_strategy" do
5-
context "when another strategy is already registered with the same name" do
6-
around do |example|
7-
described_class.register_strategy(:custom) { |**kwargs| [] }
8-
example.run
9-
described_class.strategies.delete("custom")
4+
let(:cluster) { double(:cluster) }
5+
let(:assignor) { described_class.new(cluster: cluster, strategy: strategy) }
6+
7+
describe "#protocol_name" do
8+
subject(:protocol_name) { assignor.protocol_name }
9+
10+
context "when the strategy has the method" do
11+
let(:strategy) { double(protocol_name: "custom") }
12+
13+
it "returns the return value" do
14+
expect(protocol_name).to eq "custom"
1015
end
16+
end
17+
18+
context "when the strategy has the method" do
19+
let(:strategy) { Class.new.new }
1120

12-
it do
13-
expect { described_class.register_strategy(:custom) { |**kwargs| [] } }.to raise_error(ArgumentError)
21+
it "returns the return value" do
22+
expect(protocol_name).to match /\A#<Class:0x[0-9a-f]+>\z/
1423
end
1524
end
1625
end
1726

18-
describe "#assign" do
19-
let(:cluster) { double(:cluster) }
20-
let(:assignor) { described_class.new(cluster: cluster, strategy: :custom) }
27+
describe "#user_data" do
28+
subject(:user_data) { assignor.user_data }
2129

22-
let(:members) { Hash[(0...10).map {|i| ["member#{i}", nil] }] }
23-
let(:topics) { ["greetings"] }
24-
let(:partitions) { (0...30).map {|i| double(:"partition#{i}", partition_id: i) } }
30+
context "when the strategy has the method" do
31+
let(:strategy) { double(user_data: "user_data") }
2532

26-
before do
27-
allow(cluster).to receive(:partitions_for) { partitions }
33+
it "returns the return value" do
34+
expect(user_data).to eq "user_data"
35+
end
36+
end
2837

29-
described_class.register_strategy(:custom) do |cluster:, members:, partitions:|
30-
assignment = {}
31-
partition_count_per_member = (partitions.count.to_f / members.count).ceil
32-
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
33-
assignment[members.keys[index]] = chunk
34-
end
38+
context "when the strategy has the method" do
39+
let(:strategy) { Class.new.new }
3540

36-
assignment
41+
it "returns the return value" do
42+
expect(user_data).to be_nil
3743
end
3844
end
39-
after do
40-
described_class.strategies.delete("custom")
45+
end
46+
47+
describe "#assign" do
48+
let(:strategy) do
49+
klass = Class.new do
50+
def call(cluster:, members:, partitions:)
51+
assignment = {}
52+
partition_count_per_member = (partitions.count.to_f / members.count).ceil
53+
partitions.each_slice(partition_count_per_member).with_index do |chunk, index|
54+
assignment[members.keys[index]] = chunk
55+
end
56+
57+
assignment
58+
end
59+
end
60+
klass.new
4161
end
4262

4363
it "assigns all partitions" do
64+
members = Hash[(0...10).map {|i| ["member#{i}", nil] }]
65+
topics = ["greetings"]
66+
partitions = (0...30).map {|i| double(:"partition#{i}", partition_id: i) }
67+
68+
allow(cluster).to receive(:partitions_for) { partitions }
69+
4470
assignments = assignor.assign(members: members, topics: topics)
4571

4672
partitions.each do |partition|

0 commit comments

Comments
 (0)