diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 6c4d298c9..1ecf9d0f1 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -694,6 +694,14 @@ def partitions_for(topic) @cluster.partitions_for(topic).count end + # Counts the number of replicas for a topic's partition + # + # @param topic [String] + # @return [Integer] the number of replica nodes for the topic's partition + def replica_count_for(topic) + @cluster.partitions_for(topic).first.replicas.count + end + # Retrieve the offset of the last message in a partition. If there are no # messages in the partition -1 is returned. # diff --git a/lib/kafka/protocol/metadata_response.rb b/lib/kafka/protocol/metadata_response.rb index fbfc90c01..90b246bab 100644 --- a/lib/kafka/protocol/metadata_response.rb +++ b/lib/kafka/protocol/metadata_response.rb @@ -34,7 +34,7 @@ module Protocol # class MetadataResponse class PartitionMetadata - attr_reader :partition_id, :leader + attr_reader :partition_id, :leader, :replicas attr_reader :partition_error_code diff --git a/spec/functional/client_spec.rb b/spec/functional/client_spec.rb index 009455b7f..eec4f4627 100644 --- a/spec/functional/client_spec.rb +++ b/spec/functional/client_spec.rb @@ -130,6 +130,23 @@ expect(kafka.partitions_for(topic)).to be > 0 end + example "fetching the replica count for a topic" do + expect(kafka.replica_count_for(topic)).to eq 1 + end + + example "fetching the replica count for a topic that doesn't yet exist" do + topic = "unknown-topic-#{SecureRandom.uuid}" + + expect { kafka.replica_count_for(topic) }.to raise_exception(Kafka::LeaderNotAvailable) + + # Eventually the call should succeed. + expect { + 10.times { kafka.replica_count_for(topic) rescue nil } + }.not_to raise_exception + + expect(kafka.replica_count_for(topic)).to be > 0 + end + example "delivering a message to a topic" do kafka.deliver_message("yolo", topic: topic, key: "xoxo", partition: 0)