forked from zendesk/ruby-kafka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetadata_response.rb
185 lines (151 loc) · 5.56 KB
/
metadata_response.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# frozen_string_literal: true
module Kafka
module Protocol
# A response to a {MetadataRequest}.
#
# The response contains information on the brokers, topics, and partitions in
# the cluster.
#
# * For each broker a node id, host, and port is provided.
# * For each topic partition the node id of the broker acting as partition leader,
# as well as a list of node ids for the set of replicas, are given. The `isr` list is
# the subset of replicas that are "in sync", i.e. have fully caught up with the
# leader.
#
# ## API Specification
#
# MetadataResponse => [Broker][TopicMetadata]
# Broker => NodeId Host Port (any number of brokers may be returned)
# NodeId => int32
# Host => string
# Port => int32
#
# TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
# TopicErrorCode => int16
#
# PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
# PartitionErrorCode => int16
# PartitionId => int32
# Leader => int32
# Replicas => [int32]
# Isr => [int32]
#
class MetadataResponse
class PartitionMetadata
attr_reader :partition_id, :leader, :replicas
attr_reader :partition_error_code
def initialize(partition_error_code:, partition_id:, leader:, replicas: [], isr: [])
@partition_error_code = partition_error_code
@partition_id = partition_id
@leader = leader
@replicas = replicas
@isr = isr
end
end
class TopicMetadata
# @return [String] the name of the topic
attr_reader :topic_name
# @return [Array<PartitionMetadata>] the partitions in the topic.
attr_reader :partitions
attr_reader :topic_error_code
def initialize(topic_error_code: 0, topic_name:, partitions:)
@topic_error_code = topic_error_code
@topic_name = topic_name
@partitions = partitions
end
end
# @return [Array<Kafka::BrokerInfo>] the list of brokers in the cluster.
attr_reader :brokers
# @return [Array<TopicMetadata>] the list of topics in the cluster.
attr_reader :topics
# @return [Integer] The broker id of the controller broker.
attr_reader :controller_id
def initialize(brokers:, controller_id:, topics:)
@brokers = brokers
@controller_id = controller_id
@topics = topics
end
# Finds the node id of the broker that is acting as leader for the given topic
# and partition per this metadata.
#
# @param topic [String] the name of the topic.
# @param partition [Integer] the partition number.
# @return [Integer] the node id of the leader.
def find_leader_id(topic, partition)
topic_info = @topics.find {|t| t.topic_name == topic }
if topic_info.nil?
raise UnknownTopicOrPartition, "no topic #{topic}"
end
Protocol.handle_error(topic_info.topic_error_code)
partition_info = topic_info.partitions.find {|p| p.partition_id == partition }
if partition_info.nil?
raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}"
end
begin
Protocol.handle_error(partition_info.partition_error_code)
rescue ReplicaNotAvailable
# This error can be safely ignored per the protocol specification.
end
partition_info.leader
end
# Finds the broker info for the given node id.
#
# @param node_id [Integer] the node id of the broker.
# @return [Kafka::BrokerInfo] information about the broker.
def find_broker(node_id)
broker = @brokers.find {|b| b.node_id == node_id }
raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil?
broker
end
def controller_broker
find_broker(controller_id)
end
def partitions_for(topic_name)
topic = @topics.find {|t| t.topic_name == topic_name }
if topic.nil?
raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
end
Protocol.handle_error(topic.topic_error_code)
topic.partitions
end
# Decodes a MetadataResponse from a {Decoder} containing response data.
#
# @param decoder [Decoder]
# @return [MetadataResponse] the metadata response.
def self.decode(decoder)
brokers = decoder.array do
node_id = decoder.int32
host = decoder.string
port = decoder.int32
_rack = decoder.string
BrokerInfo.new(
node_id: node_id,
host: host,
port: port
)
end
controller_id = decoder.int32
topics = decoder.array do
topic_error_code = decoder.int16
topic_name = decoder.string
_is_internal = decoder.boolean
partitions = decoder.array do
PartitionMetadata.new(
partition_error_code: decoder.int16,
partition_id: decoder.int32,
leader: decoder.int32,
replicas: decoder.array { decoder.int32 },
isr: decoder.array { decoder.int32 },
)
end
TopicMetadata.new(
topic_error_code: topic_error_code,
topic_name: topic_name,
partitions: partitions,
)
end
new(brokers: brokers, controller_id: controller_id, topics: topics)
end
end
end
end