forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathassignor.rb
63 lines (53 loc) · 1.99 KB
/
assignor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# frozen_string_literal: true
require "kafka/protocol/member_assignment"
module Kafka
class ConsumerGroup
# A consumer group partition assignor
class Assignor
Partition = Struct.new(:topic, :partition_id)
# @param cluster [Kafka::Cluster]
# @param strategy [Object] an object that implements #protocol_type,
# #user_data, and #assign.
def initialize(cluster:, strategy:)
@cluster = cluster
@strategy = strategy
end
def protocol_name
@strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s
end
def user_data
@strategy.user_data if @strategy.respond_to?(:user_data)
end
# Assign the topic partitions to the group members.
#
# @param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
# mapping member ids to metadata.
# @param topics [Array<String>] topics
# @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
# ids to assignments.
def assign(members:, topics:)
topic_partitions = topics.flat_map do |topic|
begin
partition_ids = @cluster.partitions_for(topic).map(&:partition_id)
rescue UnknownTopicOrPartition
raise UnknownTopicOrPartition, "unknown topic #{topic}"
end
partition_ids.map {|partition_id| Partition.new(topic, partition_id) }
end
group_assignment = {}
members.each_key do |member_id|
group_assignment[member_id] = Protocol::MemberAssignment.new
end
@strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions|
Array(partitions).each do |partition|
group_assignment[member_id].assign(partition.topic, [partition.partition_id])
end
end
group_assignment
rescue Kafka::LeaderNotAvailable
sleep 1
retry
end
end
end
end