Skip to content

Commit 6a845f1

Browse files
committed
feat(config): rename table topic type to schema type and update related configurations
1 parent 96f11b0 commit 6a845f1

File tree

4 files changed

+12
-12
lines changed

4 files changed

+12
-12
lines changed

Diff for: clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ public class TopicConfig {
264264
public static final String TABLE_TOPIC_COMMIT_INTERVAL_DOC = "The table topic commit interval(ms)";
265265
public static final String TABLE_TOPIC_NAMESPACE_CONFIG = "automq.table.topic.namespace";
266266
public static final String TABLE_TOPIC_NAMESPACE_DOC = "The table topic table namespace";
267-
public static final String TABLE_TOPIC_TYPE_CONFIG = "automq.table.topic.type";
268-
public static final String TABLE_TOPIC_TYPE_DOC = "The table topic type, support schemaless, registry_schema";
267+
public static final String TABLE_TOPIC_SCHEMA_TYPE_CONFIG = "automq.table.topic.schema.type";
268+
public static final String TABLE_TOPIC_SCHEMA_TYPE_DOC = "The table topic schema type, support schemaless, schema";
269269
// AutoMQ inject end
270270

271271
}

Diff for: server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public final class ServerTopicConfigSynonyms {
8888
sameName(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG),
8989
sameName(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG),
9090
sameName(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG),
91-
sameName(TopicConfig.TABLE_TOPIC_TYPE_CONFIG),
91+
sameName(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG),
9292
// AutoMQ inject end
9393

9494
sameNameWithLogPrefix(TopicConfig.PREALLOCATE_CONFIG),

Diff for: server-common/src/main/java/org/apache/kafka/server/record/TableTopicType.java renamed to server-common/src/main/java/org/apache/kafka/server/record/TableTopicSchemaType.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,22 @@
1717

1818
import static java.util.Arrays.asList;
1919

20-
public enum TableTopicType {
20+
public enum TableTopicSchemaType {
2121
SCHEMALESS("schemaless"),
22-
REGISTRY_SCHEMA("registry_schema");
22+
SCHEMA("schema");
2323

2424
public final String name;
25-
private static final List<TableTopicType> VALUES = asList(values());
25+
private static final List<TableTopicSchemaType> VALUES = asList(values());
2626

27-
TableTopicType(String name) {
27+
TableTopicSchemaType(String name) {
2828
this.name = name;
2929
}
3030

3131
public static List<String> names() {
3232
return VALUES.stream().map(v -> v.name).collect(Collectors.toList());
3333
}
3434

35-
public static TableTopicType forName(String n) {
35+
public static TableTopicSchemaType forName(String n) {
3636
String name = n.toLowerCase(Locale.ROOT);
3737
return VALUES.stream().filter(v -> v.name.equals(name)).findFirst().orElseThrow(() ->
3838
new IllegalArgumentException("Unknown table topic type name: " + name)

Diff for: storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.kafka.server.config.ServerLogConfigs;
4141
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;
4242
import org.apache.kafka.server.record.BrokerCompressionType;
43-
import org.apache.kafka.server.record.TableTopicType;
43+
import org.apache.kafka.server.record.TableTopicSchemaType;
4444

4545
import java.util.Collections;
4646
import java.util.HashMap;
@@ -337,7 +337,7 @@ public Optional<String> serverConfigName(String configName) {
337337
.define(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG, BOOLEAN, false, null, MEDIUM, TopicConfig.TABLE_TOPIC_ENABLE_DOC)
338338
.define(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG, LONG, TimeUnit.MINUTES.toMillis(5), between(1, TimeUnit.MINUTES.toMillis(15)), MEDIUM, TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_DOC)
339339
.define(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG, STRING, null, null, MEDIUM, TopicConfig.TABLE_TOPIC_NAMESPACE_DOC)
340-
.define(TopicConfig.TABLE_TOPIC_TYPE_CONFIG, STRING, null, in(TableTopicType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_TYPE_DOC)
340+
.define(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG, STRING, TableTopicSchemaType.SCHEMALESS.name, in(TableTopicSchemaType.names().toArray(new String[0])), MEDIUM, TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_DOC)
341341
// AutoMQ inject end
342342
.define(TopicConfig.REMOTE_LOG_DISABLE_POLICY_CONFIG, STRING, TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN,
343343
in(TopicConfig.REMOTE_LOG_DISABLE_POLICY_RETAIN, TopicConfig.REMOTE_LOG_DISABLE_POLICY_DELETE),
@@ -392,7 +392,7 @@ public Optional<String> serverConfigName(String configName) {
392392
public final boolean tableTopicEnable;
393393
public final long tableTopicCommitInterval;
394394
public final String tableTopicNamespace;
395-
public final TableTopicType tableTopicType;
395+
public final TableTopicSchemaType tableTopicSchemaType;
396396
// AutoMQ inject end
397397

398398
private final int maxMessageSize;
@@ -449,7 +449,7 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
449449
this.tableTopicEnable = getBoolean(TopicConfig.TABLE_TOPIC_ENABLE_CONFIG);
450450
this.tableTopicCommitInterval = getLong(TopicConfig.TABLE_TOPIC_COMMIT_INTERVAL_CONFIG);
451451
this.tableTopicNamespace = getString(TopicConfig.TABLE_TOPIC_NAMESPACE_CONFIG);
452-
this.tableTopicType = TableTopicType.forName(getString(TopicConfig.TABLE_TOPIC_TYPE_CONFIG));
452+
this.tableTopicSchemaType = TableTopicSchemaType.forName(getString(TopicConfig.TABLE_TOPIC_SCHEMA_TYPE_CONFIG));
453453
// AutoMQ inject end
454454

455455
remoteLogConfig = new RemoteLogConfig(this);

0 commit comments

Comments
 (0)