Skip to content

feat(config): add table topic conversion type configuration (#2203) #2240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ public class TopicConfig {
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
// AutoMQ inject end

}
7 changes: 6 additions & 1 deletion core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public class AutoMQConfig {

public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled";
public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead.";

public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url";
private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic";

// Deprecated config end

public static void define(ConfigDef configDef) {
Expand Down Expand Up @@ -282,7 +286,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_CONFIG, STRING, S3_EXPORTER_OTLPPROTOCOL, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_DOC)
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_DOC)
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_CONFIG, STRING, "localhost", MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_DOC)
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC);
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC)
.define(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, STRING, null, MEDIUM, AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC);
}

private List<BucketURI> dataBuckets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@

package kafka.server

import java.util
import java.util.Properties
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.controller.ConfigurationValidator
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.storage.internals.log.LogConfig

import java.util
import java.util.Properties
import scala.collection.mutable

/**
Expand Down Expand Up @@ -110,6 +110,11 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
}
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)

// AutoMQ inject start
LogConfig.validateTableTopicSchemaConfigValues(properties, kafkaConfig.tableTopicSchemaRegistryUrl)
// AutoMQ inject end

case BROKER => validateBrokerName(resource.name())
case CLIENT_METRICS =>
val properties = new Properties()
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)
val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG)
val tableTopicSchemaRegistryUrl = getString(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG);
// AutoMQ inject end

/** Internal Configurations **/
Expand Down Expand Up @@ -1253,6 +1254,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
if (tableTopicNamespace != null) {
logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace)
}
if (tableTopicSchemaRegistryUrl != null) {
logProps.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, tableTopicSchemaRegistryUrl)
}
// AutoMQ inject end

logProps
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

import kafka.automq.AutoMQConfig
import kafka.server.{ControllerConfigurationValidator, KafkaConfig}
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
import org.apache.kafka.common.config.TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG
import org.apache.kafka.common.errors.InvalidConfigurationException
import org.apache.kafka.server.record.TableTopicSchemaType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.{Tag, Test}

import java.util

@Tag("S3Unit")
class ControllerConfigurationValidatorTableTest {
val config = new KafkaConfig(TestUtils.createDummyBrokerConfig())
val validator = new ControllerConfigurationValidator(config)

@Test
def testInvalidTableTopicSchemaConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)

// Test without schema registry URL configured
val exception = assertThrows(classOf[InvalidConfigurationException], () => {
validator.validate(new ConfigResource(TOPIC, "foo"), config, config)
})
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)

// Test with schema registry URL configured
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")

val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)

// No exception should be thrown when schema registry URL is configured properly
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config, config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

package kafka.server

import kafka.automq.AutoMQConfig
import kafka.utils.TestUtils
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC}
import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG, TABLE_TOPIC_SCHEMA_TYPE_CONFIG}
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException}
import org.apache.kafka.server.metrics.ClientMetricsConfigs
import org.apache.kafka.server.record.TableTopicSchemaType
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -174,4 +176,26 @@ class ControllerConfigurationValidatorTest {
assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage)
}

@Test
def testInvalidTableTopicSchemaConfig(): Unit = {
val config = new util.TreeMap[String, String]()
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)

// Test without schema registry URL configured
val exception = assertThrows(classOf[InvalidRequestException], () => {
validator.validate(new ConfigResource(TOPIC, "foo"), config, config)
})
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)

// Test with schema registry URL configured
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")

val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)

// No exception should be thrown when schema registry URL is configured properly
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config, config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class ServerTopicConfigSynonyms {
sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG),
sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG),
// AutoMQ inject end

sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.server.record;

import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;

public enum TableTopicSchemaType {
SCHEMALESS("schemaless"),
SCHEMA("schema");

public final String name;
private static final List<TableTopicSchemaType> VALUES = asList(values());

TableTopicSchemaType(String name) {
this.name = name;
}

public static List<String> names() {
return VALUES.stream().map(v -> v.name).collect(Collectors.toList());
}

public static TableTopicSchemaType forName(String n) {
String name = n.toLowerCase(Locale.ROOT);
return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() ->
new IllegalArgumentException("Unknown table topic type name: " + name)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.record.TableTopicSchemaType;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -340,6 +341,7 @@ public Optional<String> serverConfigName(String configName) {
.define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC)
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
// AutoMQ inject end
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
Expand Down Expand Up @@ -393,6 +395,7 @@ public Optional<String> serverConfigName(String configName) {
public final boolean tableTopicEnable;
public final long tableTopicCommitInterval;
public final String tableTopicNamespace;
public final TableTopicSchemaType tableTopicSchemaType;
// AutoMQ inject end

private final int maxMessageSize;
Expand Down Expand Up @@ -449,6 +452,7 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG);
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
// AutoMQ inject end

remoteLogConfig = new RemoteLogConfig(this);
Expand Down Expand Up @@ -782,6 +786,16 @@ public static void validate(Map<String, String> existingConfigs,
}
}

// AutoMQ inject start
public static void validateTableTopicSchemaConfigValues(Properties props, String tableTopicSchemaRegistryUrl) {
String schemaType = props.getProperty(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG);
if (TableTopicSchemaType.SCHEMA.name.equals(schemaType) && tableTopicSchemaRegistryUrl == null) {
throw new InvalidConfigurationException("Table topic schema type is set to SCHEMA but schema registry URL is not configured");
}
}

// AutoMQ inject end

@Override
public String toString() {
return "LogConfig{" +
Expand Down
Loading