Skip to content

Commit e91cbd9

Browse files
committed
feat(plugin): add tasks.file.status.storage.topic.creation.enable props
This commit adds a new boolean property to indicate if the status storage topic should be automatically created.
1 parent 9f578f7 commit e91cbd9

File tree

4 files changed

+62
-11
lines changed

4 files changed

+62
-11
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/storage/KafkaStateBackingStore.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class KafkaStateBackingStore<T> implements StateBackingStore<T> {
6262
* @param groupId the group attached to the backing topic (i.e., the connector-name).
6363
* @param producerProps the kafka producer properties.
6464
* @param consumerProps the kafka consumer properties.
65-
* @param serde the state serdes.
65+
* @param serde the {@link StateSerde}.
6666
*/
6767
public KafkaStateBackingStore(final String topic,
6868
final String keyPrefix,

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStore.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,17 @@ public void configure(final Map<String, ?> props) {
6262
config.getTaskStorageConsumerEnabled()
6363
);
6464

65-
try (AdminClient client = AdminClient.create(config.getAdminClientTaskStorageConfigs())) {
66-
Map<String, String> topicConfig = new HashMap<>();
67-
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
68-
final NewTopic newTopic = new NewTopic(
69-
config.getTaskStorageTopic(),
70-
config.getTopicPartitions(),
71-
config.getReplicationFactor()
72-
).configs(topicConfig);
73-
createTopic(client, newTopic);
65+
if (config.isTopicCreationEnable()) {
66+
try (AdminClient client = AdminClient.create(config.getAdminClientTaskStorageConfigs())) {
67+
Map<String, String> topicConfig = new HashMap<>();
68+
topicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
69+
final NewTopic newTopic = new NewTopic(
70+
config.getTaskStorageTopic(),
71+
config.getTopicPartitions(),
72+
config.getReplicationFactor()
73+
).configs(topicConfig);
74+
createTopic(client, newTopic);
75+
}
7476
}
7577
}
7678

connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfig.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ public final class KafkaFileObjectStateBackingStoreConfig extends AbstractConfig
5050
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "topic.replication.factor";
5151
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor to be used for the status storage topic.";
5252

53+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG = TASKS_FILE_STATUS_STORAGE_PREFIX + "topic.creation.enable";
54+
public static final String TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_DOC = "Boolean to indicate if the status storage topic should be automatically created.";
55+
56+
5357
/**
5458
* Creates a new {@link KafkaFileObjectStateBackingStoreConfig} instance.
5559
*
@@ -70,6 +74,9 @@ public String getTaskStorageTopic() {
7074
public String getTaskStorageName() {
7175
return this.getString(TASKS_FILE_STATUS_STORAGE_NAME_CONFIG);
7276
}
77+
public boolean isTopicCreationEnable() {
78+
return this.getBoolean(TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG);
79+
}
7380

7481
public Map<String, Object> getConsumerTaskStorageConfigs() {
7582
final Map<String, Object> configs = new HashMap<>();
@@ -100,7 +107,7 @@ private Map<String, Object> getInternalKafkaAdminClientConfigs() {
100107
Map<String, Object> originals = originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX, true);
101108
Map<String, Object> adminClientConfigs = new HashMap<>(originals);
102109
adminClientConfigs.putAll(originalsWithPrefix(TASKS_FILE_STATUS_STORAGE_PREFIX + "admin."));
103-
return KafkaUtils.getAdminClientConfigs(originals);
110+
return KafkaUtils.getAdminClientConfigs(adminClientConfigs);
104111
}
105112

106113
private Map<String, Object> getInternalKafkaConsumerConfigs() {
@@ -149,6 +156,17 @@ static ConfigDef configDef() {
149156
ConfigDef.Width.NONE,
150157
TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG
151158
)
159+
.define(
160+
TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG,
161+
ConfigDef.Type.BOOLEAN,
162+
true,
163+
ConfigDef.Importance.HIGH,
164+
TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_DOC,
165+
GROUP,
166+
groupCounter++,
167+
ConfigDef.Width.NONE,
168+
TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG
169+
)
152170
.define(
153171
TASKS_FILE_STATUS_STORAGE_TOPIC_REPLICATION_FACTOR_CONFIG,
154172
ConfigDef.Type.SHORT,

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/state/KafkaFileObjectStateBackingStoreConfigTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,35 @@ void should_return_producer_configs_given_valid_props() {
114114
Assertions.assertNotNull(producerConfigs.get(ProducerConfig.ACKS_CONFIG));
115115
}
116116

117+
@Test
118+
void should_return_topic_creation_disable_configs_given_false_props() {
119+
// GIVEN
120+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
121+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
122+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???",
123+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG, false
124+
));
125+
126+
// WHEN
127+
boolean enable = config.isTopicCreationEnable();
128+
129+
// THEN
130+
Assertions.assertFalse(enable);
131+
}
132+
133+
@Test
134+
void should_return_topic_creation_enable_configs_given_true_props() {
135+
// GIVEN
136+
var config = new KafkaFileObjectStateBackingStoreConfig(Map.of(
137+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_NAME_CONFIG, "???",
138+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_BOOTSTRAP_SERVERS_CONFIG, "???",
139+
KafkaFileObjectStateBackingStoreConfig.TASKS_FILE_STATUS_STORAGE_TOPIC_CREATION_ENABLE_CONFIG, true
140+
));
141+
142+
// WHEN
143+
boolean enable = config.isTopicCreationEnable();
144+
145+
// THEN
146+
Assertions.assertTrue(enable);
147+
}
117148
}

0 commit comments

Comments
 (0)