diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index 66a18afd5..999aa1115 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -467,8 +467,7 @@ def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30) # # @return [Array] the list of topic names. def topics - @cluster.clear_target_topics - @cluster.topics + @cluster.list_topics end def has_topic?(topic) diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index 6f7b553ec..2134aa77c 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -232,6 +232,12 @@ def topics cluster_info.topics.map(&:topic_name) end + # Lists all topics in the cluster. + def list_topics + response = random_broker.fetch_metadata(topics: nil) + response.topics.map(&:topic_name) + end + def disconnect @broker_pool.close end diff --git a/lib/kafka/protocol/encoder.rb b/lib/kafka/protocol/encoder.rb index d8af797fa..2f7fcd06b 100644 --- a/lib/kafka/protocol/encoder.rb +++ b/lib/kafka/protocol/encoder.rb @@ -74,8 +74,13 @@ def write_int64(int) # @param array [Array] # @return [nil] def write_array(array, &block) - write_int32(array.size) - array.each(&block) + if array.nil? + # An array can be null, which is different from it being empty. + write_int32(-1) + else + write_int32(array.size) + array.each(&block) + end end # Writes a string to the IO object. diff --git a/spec/functional/client_spec.rb b/spec/functional/client_spec.rb index d685072a1..a2dceb910 100644 --- a/spec/functional/client_spec.rb +++ b/spec/functional/client_spec.rb @@ -4,11 +4,13 @@ let!(:topic) { create_random_topic(num_partitions: 3) } example "listing all topics in the cluster" do - expect(kafka.has_topic?(topic)).to eq true + # Use a clean Kafka instance to avoid hitting caches. + kafka = Kafka.new(seed_brokers: KAFKA_BROKERS, logger: LOGGER) - topic2 = create_random_topic + topics = kafka.topics - expect(kafka.has_topic?(topic2)).to eq true + expect(topics).to include topic + expect(kafka.has_topic?(topic)).to eq true end example "fetching the partition count for a topic" do