forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathround_robin_assignment_strategy.rb
31 lines (27 loc) · 1.04 KB
/
round_robin_assignment_strategy.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# frozen_string_literal: true
module Kafka
# A consumer group partition assignment strategy that assigns partitions to
# consumers in a round-robin fashion.
class RoundRobinAssignmentStrategy
def protocol_name
"roundrobin"
end
# Assign the topic partitions to the group members.
#
# @param cluster [Kafka::Cluster]
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata
# @param partitions [Array<Kafka::ConsumerGroup::Assignor::Partition>] a list of
# partitions the consumer group processes
# @return [Hash<String, Array<Kafka::ConsumerGroup::Assignor::Partition>] a hash
# mapping member ids to partitions.
def call(cluster:, members:, partitions:)
member_ids = members.keys
partitions_per_member = Hash.new {|h, k| h[k] = [] }
partitions.each_with_index do |partition, index|
partitions_per_member[member_ids[index % member_ids.count]] << partition
end
partitions_per_member
end
end
end