diff --git a/CHANGELOG.md b/CHANGELOG.md index 143608f17..820cfd804 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ Changes and additions to the library will be listed here. ## Unreleased +- Add support for config entries in the topic creation API. + ## v0.5.3 - Add support for the topic deletion API (#528). diff --git a/lib/kafka/client.rb b/lib/kafka/client.rb index c58c218e8..2894cd99b 100644 --- a/lib/kafka/client.rb +++ b/lib/kafka/client.rb @@ -457,10 +457,16 @@ def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes # @param replication_factor [Integer] the replication factor of the topic. # @param timeout [Integer] a duration of time to wait for the topic to be # completely created. + # @param config_entries [Hash] topic-level configs to use for the topic. + # See https://kafka.apache.org/documentation/#topicconfigs. # @raise [Kafka::TopicAlreadyExists] if the topic already exists. # @return [nil] - def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30) - @cluster.create_topic(name, num_partitions: num_partitions, replication_factor: replication_factor, timeout: timeout) + def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config_entries: {}) + @cluster.create_topic(name, + num_partitions: num_partitions, + replication_factor: replication_factor, + timeout: timeout, + config_entries: config_entries) end # Delete a topic in the cluster. diff --git a/lib/kafka/cluster.rb b/lib/kafka/cluster.rb index a470c85af..b027abc2a 100644 --- a/lib/kafka/cluster.rb +++ b/lib/kafka/cluster.rb @@ -156,12 +156,13 @@ def partitions_for(topic) raise end - def create_topic(name, num_partitions:, replication_factor:, timeout:) + def create_topic(name, num_partitions:, replication_factor:, timeout:, config_entries:) options = { topics: { name => { num_partitions: num_partitions, replication_factor: replication_factor, + config_entries: config_entries, } }, timeout: timeout, diff --git a/lib/kafka/protocol/create_topics_request.rb b/lib/kafka/protocol/create_topics_request.rb index 81fb76626..5c6d5e230 100644 --- a/lib/kafka/protocol/create_topics_request.rb +++ b/lib/kafka/protocol/create_topics_request.rb @@ -28,7 +28,10 @@ def encode(encoder) encoder.write_array([]) # Config entries. We don't care. - encoder.write_array([]) + encoder.write_array(config.fetch(:config_entries)) do |config_name, config_value| + encoder.write_string(config_name) + encoder.write_string(config_value) + end end # Timeout is in ms. diff --git a/spec/functional/topic_management_spec.rb b/spec/functional/topic_management_spec.rb index 7d1599d0e..e9a7ecf25 100644 --- a/spec/functional/topic_management_spec.rb +++ b/spec/functional/topic_management_spec.rb @@ -10,6 +10,20 @@ expect(partitions).to eq 3 end + example "creating a topic with config entries" do + unless kafka.supports_api?(Kafka::Protocol::DESCRIBE_CONFIGS_API) + skip("This Kafka version not support ") + end + + topic = generate_topic_name + expect(kafka.topics).not_to include(topic) + + kafka.create_topic(topic, num_partitions: 3, config_entries: { 'cleanup.policy' => 'compact' }) + + configs = kafka.describe_topic(topic, %w(cleanup.policy)) + expect(configs['cleanup.policy']).to eq('compact') + end + example "deleting topics" do topic = generate_topic_name diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 7d9afa4c9..8ef6044eb 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -59,8 +59,11 @@ def create_random_topic(*args) topic end - def create_topic(name, num_partitions: 1, num_replicas: 1) - kafka.create_topic(name, num_partitions: num_partitions, replication_factor: num_replicas) + def create_topic(name, num_partitions: 1, num_replicas: 1, config_entries: {}) + kafka.create_topic(name, + num_partitions: num_partitions, + replication_factor: num_replicas, + config_entries: config_entries) end end