diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index 9fd5a4d4f0..818f9200fd 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -265,6 +265,8 @@ public class TopicConfig { public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)"; public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace"; public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type"; + public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema"; // AutoMQ inject end } diff --git a/core/src/main/java/kafka/automq/AutoMQConfig.java b/core/src/main/java/kafka/automq/AutoMQConfig.java index aa9a7e5dad..a9d02881ba 100644 --- a/core/src/main/java/kafka/automq/AutoMQConfig.java +++ b/core/src/main/java/kafka/automq/AutoMQConfig.java @@ -230,6 +230,10 @@ public class AutoMQConfig { public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled"; public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead."; + + public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url"; + private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic"; + // Deprecated config end public static void define(ConfigDef configDef) { @@ -282,7 +286,8 @@ public static void define(ConfigDef configDef) { .define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_CONFIG, STRING, S3_EXPORTER_OTLPPROTOCOL, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_DOC) .define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_DOC) .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_CONFIG, STRING, "localhost", MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_DOC) - .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC); + .define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC) + .define(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, STRING, null, MEDIUM, AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC); } private List dataBuckets; diff --git a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala index 49083e3a0c..a6ed6b82e3 100644 --- a/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala +++ b/core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala @@ -17,16 +17,16 @@ package kafka.server -import java.util -import java.util.Properties import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC} -import org.apache.kafka.controller.ConfigurationValidator import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException} import org.apache.kafka.common.internals.Topic +import org.apache.kafka.controller.ConfigurationValidator import org.apache.kafka.server.metrics.ClientMetricsConfigs import org.apache.kafka.storage.internals.log.LogConfig +import java.util +import java.util.Properties import scala.collection.mutable /** @@ -110,6 +110,11 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu } LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap, kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false) + + // AutoMQ inject start + LogConfig.validateTableTopicSchemaConfigValues(properties, kafkaConfig.tableTopicSchemaRegistryUrl) + // AutoMQ inject end + case BROKER => validateBrokerName(resource.name()) case CLIENT_METRICS => val properties = new Properties() diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 715e9e7688..4d77d0b273 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -796,6 +796,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG) val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG) val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG) + val tableTopicSchemaRegistryUrl = getString(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG); // AutoMQ inject end /** Internal Configurations **/ @@ -1253,6 +1254,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (tableTopicNamespace != null) { logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace) } + if (tableTopicSchemaRegistryUrl != null) { + logProps.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, tableTopicSchemaRegistryUrl) + } // AutoMQ inject end logProps diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala new file mode 100644 index 0000000000..ebe4f64f79 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTableTest.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +import kafka.automq.AutoMQConfig +import kafka.server.{ControllerConfigurationValidator, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.ConfigResource.Type.TOPIC +import org.apache.kafka.common.config.TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG +import org.apache.kafka.common.errors.InvalidConfigurationException +import org.apache.kafka.server.record.TableTopicSchemaType +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.{Tag, Test} + +import java.util + +@Tag("S3Unit") +class ControllerConfigurationValidatorTableTest { + val config = new KafkaConfig(TestUtils.createDummyBrokerConfig()) + val validator = new ControllerConfigurationValidator(config) + + @Test + def testInvalidTableTopicSchemaConfig(): Unit = { + val config = new util.TreeMap[String, String]() + config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name) + + // Test without schema registry URL configured + val exception = assertThrows(classOf[InvalidConfigurationException], () => { + validator.validate(new ConfigResource(TOPIC, "foo"), config, config) + }) + assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage) + + // Test with schema registry URL configured + val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig() + brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081") + + val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry) + val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry) + + // No exception should be thrown when schema registry URL is configured properly + validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config, config) + } +} diff --git a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala index 4cf5ad70ce..d13adbe70c 100644 --- a/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala @@ -17,12 +17,14 @@ package kafka.server +import kafka.automq.AutoMQConfig import kafka.utils.TestUtils import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC} -import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG} +import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG, TABLE_TOPIC_SCHEMA_TYPE_CONFIG} import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException} import org.apache.kafka.server.metrics.ClientMetricsConfigs +import org.apache.kafka.server.record.TableTopicSchemaType import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -174,4 +176,26 @@ class ControllerConfigurationValidatorTest { assertThrows(classOf[InvalidConfigurationException], () => validator.validate( new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage) } + + @Test + def testInvalidTableTopicSchemaConfig(): Unit = { + val config = new util.TreeMap[String, String]() + config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name) + + // Test without schema registry URL configured + val exception = assertThrows(classOf[InvalidRequestException], () => { + validator.validate(new ConfigResource(TOPIC, "foo"), config, config) + }) + assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage) + + // Test with schema registry URL configured + val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig() + brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081") + + val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry) + val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry) + + // No exception should be thrown when schema registry URL is configured properly + validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config, config) + } } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java index 33a802cf29..fea99d8dd9 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java @@ -88,6 +88,7 @@ public final class ServerTopicConfigSynonyms { sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG), sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG), sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG), + sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG), // AutoMQ inject end sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG), diff --git a/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java new file mode 100644 index 0000000000..6683694233 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java @@ -0,0 +1,41 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package org.apache.kafka.server.record; + +import java.util.List; +import java.util.Locale; +import java.util.stream.Collectors; + +import static java.util.Arrays.asList; + +public enum TableTopicSchemaType { + SCHEMALESS("schemaless"), + SCHEMA("schema"); + + public final String name; + private static final List VALUES = asList(values()); + + TableTopicSchemaType(String name) { + this.name = name; + } + + public static List names() { + return VALUES.stream().map(v -> v.name).collect(Collectors.toList()); + } + + public static TableTopicSchemaType forName(String n) { + String name = n.toLowerCase(Locale.ROOT); + return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() -> + new IllegalArgumentException("Unknown table topic type name: " + name) + ); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index fcb21a3488..981925f38f 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -38,6 +38,7 @@ import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.config.ServerTopicConfigSynonyms; import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.record.TableTopicSchemaType; import java.util.Collections; import java.util.HashMap; @@ -340,6 +341,7 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC) .define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC) .define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC) + .define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC) // AutoMQ inject end .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC); @@ -393,6 +395,7 @@ public Optional serverConfigName(String configName) { public final boolean tableTopicEnable; public final long tableTopicCommitInterval; public final String tableTopicNamespace; + public final TableTopicSchemaType tableTopicSchemaType; // AutoMQ inject end private final int maxMessageSize; @@ -449,6 +452,7 @@ public LogConfig(Map props, Set overriddenConfigs) { this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG); this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG); this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG); + this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG)); // AutoMQ inject end remoteLogConfig = new RemoteLogConfig(this); @@ -782,6 +786,16 @@ public static void validate(Map existingConfigs, } } + // AutoMQ inject start + public static void validateTableTopicSchemaConfigValues(Properties props, String tableTopicSchemaRegistryUrl) { + String schemaType = props.getProperty(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG); + if (TableTopicSchemaType.SCHEMA.name.equals(schemaType) && tableTopicSchemaRegistryUrl == null) { + throw new InvalidConfigurationException("Table topic schema type is set to SCHEMA but schema registry URL is not configured"); + } + } + + // AutoMQ inject end + @Override public String toString() { return "LogConfig{" +