Skip to content

Commit dc997f6

Browse files
author
Eduardo Poleo
committed
ISSUE-525/764: New strategy should support all previous use cases
The new assignment strategy is a more general solution, therefore it should still be able to support the old use cases where all consumer within the same consumer group have identical subscriptions. We're moving all tests pertaining to the old strategy here to prove that this is the case.
1 parent e0147ac commit dc997f6

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# frozen_string_literal: true
2+
3+
describe Kafka::MultiSubscriptionRoundRobinAssignmentStrategy do
4+
let(:strategy) { described_class.new }
5+
6+
# We need to ensure that the new strategy is backwards compatible
7+
# with the previous one. The following tests were backported from the
8+
# RoundRobinAssignmentStrategy specs.
9+
context 'RoundRobinAssignmentStrategy specs' do
10+
it "assigns all partitions" do
11+
members = Hash[(0...10).map {|i| ["member#{i}", double(topics: ['greetings'])] }]
12+
partitions = (0...30).map {|i| double(:"partition#{i}", topic: "greetings", partition_id: i) }
13+
14+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
15+
16+
partitions.each do |partition|
17+
member = assignments.values.find {|assigned_partitions|
18+
assigned_partitions.find {|assigned_partition|
19+
assigned_partition == partition
20+
}
21+
}
22+
23+
expect(member).to_not be_nil
24+
end
25+
end
26+
27+
it "spreads all partitions between members" do
28+
topics = ["topic1", "topic2"]
29+
members = Hash[(0...10).map {|i| ["member#{i}", double(topics: topics)] }]
30+
partitions = topics.product((0...5).to_a).map {|topic, i|
31+
double(:"partition#{i}", topic: topic, partition_id: i)
32+
}
33+
34+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
35+
36+
partitions.each do |partition|
37+
member = assignments.values.find {|assigned_partitions|
38+
assigned_partitions.find {|assigned_partition|
39+
assigned_partition == partition
40+
}
41+
}
42+
43+
expect(member).to_not be_nil
44+
end
45+
46+
num_partitions_assigned = assignments.values.map do |assigned_partitions|
47+
assigned_partitions.count
48+
end
49+
50+
expect(num_partitions_assigned).to all eq(1)
51+
end
52+
53+
Metadata = Struct.new(:topics)
54+
[
55+
{
56+
name: "uneven topics",
57+
topics: { "topic1" => [0], "topic2" => (0..50).to_a },
58+
members: {
59+
"member1" => Metadata.new(["topic1", "topic2"]),
60+
"member2" => Metadata.new(["topic1", "topic2"])
61+
},
62+
},
63+
{
64+
name: "only one partition",
65+
topics: { "topic1" => [0] },
66+
members: {
67+
"member1" => Metadata.new(["topic1"]),
68+
"member2" => Metadata.new(["topic1"])
69+
},
70+
},
71+
{
72+
name: "lots of partitions",
73+
topics: { "topic1" => (0..100).to_a },
74+
members: { "member1" => Metadata.new(["topic1"]) },
75+
},
76+
{
77+
name: "lots of members",
78+
topics: { "topic1" => (0..10).to_a, "topic2" => (0..10).to_a },
79+
members: Hash[(0..50).map { |i| ["member#{i}", Metadata.new(["topic1", "topic2"])] }]
80+
},
81+
{
82+
name: "odd number of partitions",
83+
topics: { "topic1" => (0..14).to_a },
84+
members: {
85+
"member1" => Metadata.new(["topic1"]),
86+
"member2" => Metadata.new(["topic1"])
87+
},
88+
},
89+
{
90+
name: "five topics, 10 partitions, 3 consumers",
91+
topics: { "topic1" => [0, 1], "topic2" => [0, 1], "topic3" => [0, 1], "topic4" => [0, 1], "topic5" => [0, 1] },
92+
members: {
93+
"member1" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]),
94+
"member2" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"]),
95+
"member3" => Metadata.new(["topic1", "topic2", "topic3", "topic4", "topic5"])
96+
},
97+
}
98+
].each do |options|
99+
name, topics, members = options[:name], options[:topics], options[:members]
100+
it name do
101+
partitions = topics.flat_map {|topic, partition_ids|
102+
partition_ids.map {|i|
103+
double(:"partition#{i}", topic: topic, partition_id: i)
104+
}
105+
}
106+
107+
assignments = strategy.call(cluster: nil, members: members, partitions: partitions)
108+
109+
expect_all_partitions_assigned(topics, assignments)
110+
expect_even_assignments(topics, assignments)
111+
end
112+
end
113+
114+
def expect_all_partitions_assigned(topics, assignments)
115+
topics.each do |topic, partition_ids|
116+
partition_ids.each do |partition_id|
117+
assigned = assignments.values.find do |assigned_partitions|
118+
assigned_partitions.find {|assigned_partition|
119+
assigned_partition.topic == topic && assigned_partition.partition_id == partition_id
120+
}
121+
end
122+
expect(assigned).to_not be_nil
123+
end
124+
end
125+
end
126+
127+
def expect_even_assignments(topics, assignments)
128+
num_partitions = topics.values.flatten.count
129+
assignments.values.each do |assigned_partition|
130+
num_assigned = assigned_partition.count
131+
expect(num_assigned).to be_within(1).of(num_partitions.to_f / assignments.count)
132+
end
133+
end
134+
end
135+
end

0 commit comments

Comments
 (0)