Skip to content

Commit 74d9d38

Browse files
committed
feat(config): add table topic conversion type configuration (#2203)
* feat(config): add table topic conversion type configurations * feat(config): rename table topic type to schema type and update related configurations * feat(config): add table topic schema registry URL configuration and validation * test(config): add unit tests for ControllerConfigurationValidator table topic schema configuration * fix(tests): update exception type in ControllerConfigurationValidatorTableTest for schema validation * feat(config): polish code
1 parent 8125b80 commit 74d9d38

File tree

9 files changed

+148
-6
lines changed

9 files changed

+148
-6
lines changed

Diff for: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,8 @@ public class TopicConfig {
265265
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
266266
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
267267
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
268+
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
269+
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
268270
// AutoMQ inject end
269271

270272
}

Diff for: core/src/main/java/kafka/automq/AutoMQConfig.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ public class AutoMQConfig {
230230

231231
public static final String S3_TELEMETRY_OPS_ENABLED_CONFIG = "s3.telemetry.ops.enabled";
232232
public static final String S3_TELEMETRY_OPS_ENABLED_DOC = "[DEPRECATED] use s3.telemetry.metrics.uri instead.";
233+
234+
public static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG = "automq.table.topic.schema.registry.url";
235+
private static final String TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC = "The schema registry url for table topic";
236+
233237
// Deprecated config end
234238

235239
public static void define(ConfigDef configDef) {
@@ -282,7 +286,8 @@ public static void define(ConfigDef configDef) {
282286
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_CONFIG, STRING, S3_EXPORTER_OTLPPROTOCOL, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_PROTOCOL_DOC)
283287
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_OTLP_COMPRESSION_ENABLE_DOC)
284288
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_CONFIG, STRING, "localhost", MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_HOST_DOC)
285-
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC);
289+
.define(AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_CONFIG, INT, 9090, MEDIUM, AutoMQConfig.S3_METRICS_EXPORTER_PROM_PORT_DOC)
290+
.define(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, STRING, null, MEDIUM, AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_DOC);
286291
}
287292

288293
private List<BucketURI> dataBuckets;

Diff for: core/src/main/scala/kafka/server/ControllerConfigurationValidator.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
package kafka.server
1919

20-
import java.util
21-
import java.util.Properties
2220
import org.apache.kafka.common.config.ConfigResource
2321
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, TOPIC}
24-
import org.apache.kafka.controller.ConfigurationValidator
2522
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException}
2623
import org.apache.kafka.common.internals.Topic
24+
import org.apache.kafka.controller.ConfigurationValidator
2725
import org.apache.kafka.server.metrics.ClientMetricsConfigs
2826
import org.apache.kafka.storage.internals.log.LogConfig
2927

28+
import java.util
29+
import java.util.Properties
3030
import scala.collection.mutable
3131

3232
/**
@@ -108,7 +108,7 @@ class ControllerConfigurationValidator(kafkaConfig: KafkaConfig) extends Configu
108108
throw new InvalidConfigurationException("Null value not supported for topic configs: " +
109109
nullTopicConfigs.mkString(","))
110110
}
111-
LogConfig.validate(oldConfigs, properties, kafkaConfig.extractLogConfigMap,
111+
LogConfig.validate(properties, kafkaConfig.extractLogConfigMap,
112112
kafkaConfig.remoteLogManagerConfig.isRemoteStorageSystemEnabled(), false)
113113
case BROKER => validateBrokerName(resource.name())
114114
case CLIENT_METRICS =>

Diff for: core/src/main/scala/kafka/server/KafkaConfig.scala

+4
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
796796
val s3BackPressureEnabled = getBoolean(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)
797797
val s3BackPressureCooldownMs = getLong(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
798798
val tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG)
799+
val tableTopicSchemaRegistryUrl = getString(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG);
799800
// AutoMQ inject end
800801

801802
/** Internal Configurations **/
@@ -1253,6 +1254,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
12531254
if (tableTopicNamespace != null) {
12541255
logProps.put(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, tableTopicNamespace)
12551256
}
1257+
if (tableTopicSchemaRegistryUrl != null) {
1258+
logProps.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, tableTopicSchemaRegistryUrl)
1259+
}
12561260
// AutoMQ inject end
12571261

12581262
logProps
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
import kafka.automq.AutoMQConfig
13+
import kafka.server.{ControllerConfigurationValidator, KafkaConfig}
14+
import kafka.utils.TestUtils
15+
import org.apache.kafka.common.config.ConfigResource
16+
import org.apache.kafka.common.config.ConfigResource.Type.TOPIC
17+
import org.apache.kafka.common.config.TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG
18+
import org.apache.kafka.common.errors.InvalidConfigurationException
19+
import org.apache.kafka.server.record.TableTopicSchemaType
20+
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
21+
import org.junit.jupiter.api.{Tag, Test}
22+
23+
import java.util
24+
25+
@Tag("S3Unit")
26+
class ControllerConfigurationValidatorTableTest {
27+
val config = new KafkaConfig(TestUtils.createDummyBrokerConfig())
28+
val validator = new ControllerConfigurationValidator(config)
29+
30+
@Test
31+
def testInvalidTableTopicSchemaConfig(): Unit = {
32+
val config = new util.TreeMap[String, String]()
33+
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)
34+
35+
// Test without schema registry URL configured
36+
val exception = assertThrows(classOf[InvalidConfigurationException], () => {
37+
validator.validate(new ConfigResource(TOPIC, "foo"), config)
38+
})
39+
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)
40+
41+
// Test with schema registry URL configured
42+
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
43+
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
44+
45+
val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
46+
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)
47+
48+
// No exception should be thrown when schema registry URL is configured properly
49+
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config)
50+
}
51+
}

Diff for: core/src/test/scala/unit/kafka/server/ControllerConfigurationValidatorTest.scala

+25-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package kafka.server
1919

20+
import kafka.automq.AutoMQConfig
2021
import kafka.utils.TestUtils
2122
import org.apache.kafka.common.config.ConfigResource
2223
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER, CLIENT_METRICS, TOPIC}
23-
import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG}
24+
import org.apache.kafka.common.config.TopicConfig.{REMOTE_LOG_STORAGE_ENABLE_CONFIG, SEGMENT_BYTES_CONFIG, SEGMENT_JITTER_MS_CONFIG, SEGMENT_MS_CONFIG, TABLE_TOPIC_SCHEMA_TYPE_CONFIG}
2425
import org.apache.kafka.common.errors.{InvalidConfigurationException, InvalidRequestException, InvalidTopicException}
2526
import org.apache.kafka.server.metrics.ClientMetricsConfigs
27+
import org.apache.kafka.server.record.TableTopicSchemaType
2628
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
2729
import org.junit.jupiter.api.Test
2830
import org.junit.jupiter.params.ParameterizedTest
@@ -174,4 +176,26 @@ class ControllerConfigurationValidatorTest {
174176
assertThrows(classOf[InvalidConfigurationException], () => validator.validate(
175177
new ConfigResource(CLIENT_METRICS, "subscription-1"), config, emptyMap())). getMessage)
176178
}
179+
180+
@Test
181+
def testInvalidTableTopicSchemaConfig(): Unit = {
182+
val config = new util.TreeMap[String, String]()
183+
config.put(TABLE_TOPIC_SCHEMA_TYPE_CONFIG, TableTopicSchemaType.SCHEMA.name)
184+
185+
// Test without schema registry URL configured
186+
val exception = assertThrows(classOf[InvalidRequestException], () => {
187+
validator.validate(new ConfigResource(TOPIC, "foo"), config)
188+
})
189+
assertEquals("Table topic schema type is set to SCHEMA but schema registry URL is not configured", exception.getMessage)
190+
191+
// Test with schema registry URL configured
192+
val brokerConfigWithSchemaRegistry = TestUtils.createDummyBrokerConfig()
193+
brokerConfigWithSchemaRegistry.put(AutoMQConfig.TABLE_TOPIC_SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
194+
195+
val kafkaConfigWithSchemaRegistry = new KafkaConfig(brokerConfigWithSchemaRegistry)
196+
val validatorWithSchemaRegistry = new ControllerConfigurationValidator(kafkaConfigWithSchemaRegistry)
197+
198+
// No exception should be thrown when schema registry URL is configured properly
199+
validatorWithSchemaRegistry.validate(new ConfigResource(TOPIC, "foo"), config)
200+
}
177201
}

Diff for: server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java

+1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public final class ServerTopicConfigSynonyms {
8888
sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG),
8989
sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG),
9090
sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG),
91+
sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG),
9192
// AutoMQ inject end
9293

9394
sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package org.apache.kafka.server.record;
13+
14+
import java.util.List;
15+
import java.util.Locale;
16+
import java.util.stream.Collectors;
17+
18+
import static java.util.Arrays.asList;
19+
20+
public enum TableTopicSchemaType {
21+
SCHEMALESS("schemaless"),
22+
SCHEMA("schema");
23+
24+
public final String name;
25+
private static final List<TableTopicSchemaType> VALUES = asList(values());
26+
27+
TableTopicSchemaType(String name) {
28+
this.name = name;
29+
}
30+
31+
public static List<String> names() {
32+
return VALUES.stream().map(v -> v.name).collect(Collectors.toList());
33+
}
34+
35+
public static TableTopicSchemaType forName(String n) {
36+
String name = n.toLowerCase(Locale.ROOT);
37+
return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() ->
38+
new IllegalArgumentException("Unknown table topic type name: " + name)
39+
);
40+
}
41+
}

Diff for: storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.kafka.server.config.ServerLogConfigs;
3939
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
4040
import org.apache.kafka.server.record.BrokerCompressionType;
41+
import org.apache.kafka.server.record.TableTopicSchemaType;
4142

4243
import java.util.Collections;
4344
import java.util.HashMap;
@@ -340,6 +341,7 @@ public Optional<String> serverConfigName(String configName) {
340341
.define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC)
341342
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
342343
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
344+
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
343345
// AutoMQ inject end
344346
.define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC)
345347
.define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC);
@@ -393,6 +395,7 @@ public Optional<String> serverConfigName(String configName) {
393395
public final boolean tableTopicEnable;
394396
public final long tableTopicCommitInterval;
395397
public final String tableTopicNamespace;
398+
public final TableTopicSchemaType tableTopicSchemaType;
396399
// AutoMQ inject end
397400

398401
private final int maxMessageSize;
@@ -449,6 +452,7 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
449452
this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG);
450453
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
451454
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
455+
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
452456
// AutoMQ inject end
453457

454458
remoteLogConfig = new RemoteLogConfig(this);
@@ -782,6 +786,16 @@ public static void validate(Map<String, String> existingConfigs,
782786
}
783787
}
784788

789+
// AutoMQ inject start
790+
public static void validateTableTopicSchemaConfigValues(Properties props, String tableTopicSchemaRegistryUrl) {
791+
String schemaType = props.getProperty(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG);
792+
if (TableTopicSchemaType.SCHEMA.name.equals(schemaType) && tableTopicSchemaRegistryUrl == null) {
793+
throw new InvalidConfigurationException("Table topic schema type is set to SCHEMA but schema registry URL is not configured");
794+
}
795+
}
796+
797+
// AutoMQ inject end
798+
785799
@Override
786800
public String toString() {
787801
return "LogConfig{" +

0 commit comments

Comments
 (0)