diff --git a/build.gradle b/build.gradle index bf86b4d892..5c9eae6758 100644 --- a/build.gradle +++ b/build.gradle @@ -73,6 +73,7 @@ ext { springRetryVersion = '2.0.5' springVersion = '6.1.5' zookeeperVersion = '3.8.4' + parallelConsumerVersion = '0.5.2.8' idPrefix = 'kafka' @@ -289,6 +290,9 @@ project ('spring-kafka') { exclude group: 'org.jetbrains.kotlin' } + // Parallel Consumer + api "io.confluent.parallelconsumer:parallel-consumer-core:$parallelConsumerVersion" + // Spring Data projection message binding support optionalApi ("org.springframework.data:spring-data-commons") { exclude group: 'org.springframework' diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java new file mode 100644 index 0000000000..0cf4e7a5e5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableParallelConsumer.java @@ -0,0 +1,41 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.ParallelConsumerConfiguration; +import org.springframework.kafka.config.ParallelConsumerImportSelector; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * If you want to import {@link ParallelConsumerConfiguration} to your application, + * you just annotated {@link EnableParallelConsumer} to your spring application. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Import(ParallelConsumerImportSelector.class) +public @interface EnableParallelConsumer { +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java new file mode 100644 index 0000000000..615ac24ffc --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/OnMissingParallelConsumerOptionsProviderCondition.java @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.core.type.AnnotatedTypeMetadata; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; + +/** + * This class is to provide {@link ParallelConsumerOptionsProvider} as default. + * {@link ParallelConsumerConfiguration} use this function when {@link ApplicationContext} started. + * If there is no spring bean in {@link ApplicationContext} such as {@link ParallelConsumerOptionsProvider}, + * {@link ParallelConsumerConfiguration} will register default {@link ParallelConsumerOptionsProvider}. + * + * @author Sanghyeok An + * + * @since 3.3 + */ +public class OnMissingParallelConsumerOptionsProviderCondition implements Condition { + + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + return context.getBeanFactory().getBean(ParallelConsumerOptionsProvider.class) == null; + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java new file mode 100644 index 0000000000..a7eb9734b9 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfig.java @@ -0,0 +1,163 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + + +import org.apache.kafka.clients.consumer.Consumer; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; + +import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; +import org.springframework.kafka.annotation.EnableParallelConsumer; + +// It would be better to be migrated to org.springframework.boot.autoconfigure.kafka.KafkaProperties. + +/** + * ParallelConsumerConfig is for config of {@link io.confluent.parallelconsumer}. + * This will be registered as Spring Bean when {@link EnableParallelConsumer} is annotated to your spring application. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public class ParallelConsumerConfig { + + public static final String DEFAULT_BEAN_NAME = "parallelConsumerConfig"; + private final ParallelConsumerOptionsProvider provider; + + public ParallelConsumerConfig(ParallelConsumerOptionsProvider provider) { + this.provider = provider; + } + + public ParallelConsumerOptions toConsumerOptions( + ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, + Consumer consumer, + Producer producer) { + + builder.producer(producer); + return toConsumerOptions(builder, consumer); + } + + public ParallelConsumerOptions toConsumerOptions( + ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder, + Consumer consumer) { + builder.consumer(consumer); + return buildRemainOptions(builder); + } + + private ParallelConsumerOptions buildRemainOptions(ParallelConsumerOptions.ParallelConsumerOptionsBuilder builder) { + if (this.provider.managedExecutorService() != null){ + builder.managedExecutorService(this.provider.managedExecutorService()); + } + + if (this.provider.managedThreadFactory() != null){ + builder.managedThreadFactory(this.provider.managedThreadFactory()); + } + + if (this.provider.meterRegistry() != null){ + builder.meterRegistry(this.provider.meterRegistry()); + } + + if (this.provider.pcInstanceTag() != null){ + builder.pcInstanceTag(this.provider.pcInstanceTag()); + } + + if (this.provider.metricsTags() != null){ + builder.metricsTags(this.provider.metricsTags()); + } + + if (this.provider.allowEagerProcessingDuringTransactionCommit() != null){ + builder.allowEagerProcessingDuringTransactionCommit(this.provider.allowEagerProcessingDuringTransactionCommit()); + } + + if (this.provider.commitLockAcquisitionTimeout() != null){ + builder.commitLockAcquisitionTimeout(this.provider.commitLockAcquisitionTimeout()); + } + + if (this.provider.produceLockAcquisitionTimeout() != null){ + builder.produceLockAcquisitionTimeout(this.provider.produceLockAcquisitionTimeout()); + } + + if (this.provider.commitInterval() != null){ + builder.commitInterval(this.provider.commitInterval()); + } + + if (this.provider.ordering() != null){ + builder.ordering(this.provider.ordering()); + } + + if (this.provider.commitMode() != null){ + builder.commitMode(this.provider.commitMode()); + } + + if (this.provider.maxConcurrency() != null){ + builder.maxConcurrency(this.provider.maxConcurrency()); + } + + if (this.provider.invalidOffsetMetadataPolicy() != null){ + builder.invalidOffsetMetadataPolicy(this.provider.invalidOffsetMetadataPolicy()); + } + + if (this.provider.retryDelayProvider() != null){ + builder.retryDelayProvider(this.provider.retryDelayProvider()); + } + + if (this.provider.sendTimeout() != null){ + builder.sendTimeout(this.provider.sendTimeout()); + } + + if (this.provider.offsetCommitTimeout() != null){ + builder.offsetCommitTimeout(this.provider.offsetCommitTimeout()); + } + + if (this.provider.batchSize() != null){ + builder.batchSize(this.provider.batchSize()); + } + + if (this.provider.thresholdForTimeSpendInQueueWarning() != null){ + builder.thresholdForTimeSpendInQueueWarning(this.provider.thresholdForTimeSpendInQueueWarning()); + } + + if (this.provider.maxFailureHistory() != null){ + builder.maxFailureHistory(this.provider.maxFailureHistory()); + } + + if (this.provider.shutdownTimeout() != null){ + builder.shutdownTimeout(this.provider.shutdownTimeout()); + } + + if (this.provider.drainTimeout() != null){ + builder.drainTimeout(this.provider.drainTimeout()); + } + + if (this.provider.messageBufferSize() != null){ + builder.messageBufferSize(this.provider.messageBufferSize()); + } + + if (this.provider.initialLoadFactor() != null){ + builder.initialLoadFactor(this.provider.initialLoadFactor()); + } + if (this.provider.maximumLoadFactor() != null){ + builder.maximumLoadFactor(this.provider.maximumLoadFactor()); + } + + return builder.build(); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java new file mode 100644 index 0000000000..ea6557fde0 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerConfiguration.java @@ -0,0 +1,67 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Conditional; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; +import org.springframework.kafka.core.ParallelConsumerFactory; +import org.springframework.kafka.annotation.EnableParallelConsumer; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerOptionsProvider; + +/** + * If User decide to use parallelConsumer on SpringKafka, User should import this class to their ComponentScan scopes. + * If so, this class will register both {@link ParallelConsumerContext} and {@link ParallelConsumerFactory} as Spring Bean. + * User has responsibility + * 1. annotated {@link EnableParallelConsumer} on their spring application + * 2. register ConcreteClass of {@link ParallelConsumerRootInterface}. + * + * @author Sanghyoek An + * + * @since 3.3 + */ + +public class ParallelConsumerConfiguration { + + @Bean + @Conditional(OnMissingParallelConsumerOptionsProviderCondition.class) + public ParallelConsumerOptionsProvider parallelConsumerOptionsProvider() { + return new ParallelConsumerOptionsProvider() {}; + } + + @Bean(name = ParallelConsumerConfig.DEFAULT_BEAN_NAME) + public ParallelConsumerConfig parallelConsumerConfig(ParallelConsumerOptionsProvider parallelConsumerOptionsProvider) { + return new ParallelConsumerConfig(parallelConsumerOptionsProvider); + } + + @Bean(name = ParallelConsumerContext.DEFAULT_BEAN_NAME) + public ParallelConsumerContext parallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + ParallelConsumerRootInterface parallelConsumerCallback) { + return new ParallelConsumerContext(parallelConsumerConfig, + parallelConsumerCallback); + } + + @Bean(name = ParallelConsumerFactory.DEFAULT_BEAN_NAME) + public ParallelConsumerFactory parallelConsumerFactory(DefaultKafkaConsumerFactory consumerFactory, + DefaultKafkaProducerFactory producerFactory, + ParallelConsumerContext parallelConsumerContext) { + return new ParallelConsumerFactory(parallelConsumerContext, consumerFactory, producerFactory); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java new file mode 100644 index 0000000000..205be1e5f2 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerContext.java @@ -0,0 +1,61 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ParallelConsumerOptionsBuilder; + +/** + * This class is for collecting all related with ParallelConsumer. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + + +public class ParallelConsumerContext { + + public static final String DEFAULT_BEAN_NAME = "parallelConsumerContext"; + private final ParallelConsumerConfig parallelConsumerConfig; + private final ParallelConsumerRootInterface parallelConsumerCallback; + + public ParallelConsumerContext(ParallelConsumerConfig parallelConsumerConfig, + ParallelConsumerRootInterface callback) { + this.parallelConsumerConfig = parallelConsumerConfig; + this.parallelConsumerCallback = callback; + } + + public ParallelConsumerRootInterface parallelConsumerCallback() { + return this.parallelConsumerCallback; + } + + public ParallelConsumerOptions getParallelConsumerOptions(Consumer consumer) { + final ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + return parallelConsumerConfig.toConsumerOptions(builder, consumer); + } + + public ParallelConsumerOptions getParallelConsumerOptions(Consumer consumer, Producer producer) { + final ParallelConsumerOptionsBuilder builder = ParallelConsumerOptions.builder(); + return parallelConsumerConfig.toConsumerOptions(builder, consumer, producer); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java new file mode 100644 index 0000000000..938addcae1 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/ParallelConsumerImportSelector.java @@ -0,0 +1,38 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.config; + +import org.springframework.context.annotation.ImportSelector; +import org.springframework.core.type.AnnotationMetadata; +import org.springframework.kafka.annotation.EnableParallelConsumer; + +/** + * ParallelConsumerImportSelector is to register {@link ParallelConsumerConfiguration}. + * If you want to import {@link ParallelConsumerConfiguration} to your application, + * you just annotated {@link EnableParallelConsumer} to your spring application. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public class ParallelConsumerImportSelector implements ImportSelector { + @Override + public String[] selectImports(AnnotationMetadata importingClassMetadata) { + return new String[]{ParallelConsumerConfiguration.class.getName()}; + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java new file mode 100644 index 0000000000..5a22685f15 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/ParallelConsumerFactory.java @@ -0,0 +1,181 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.Producer; +import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerRootInterface; +import org.springframework.kafka.core.parallelconsumer.PollAndProduce; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceMany; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceManyResult; +import org.springframework.kafka.core.parallelconsumer.PollAndProduceResult; +import org.springframework.kafka.core.parallelconsumer.Poll; +import org.springframework.kafka.core.parallelconsumer.ParallelConsumerResultInterface; + +import io.confluent.parallelconsumer.ParallelConsumerOptions; +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; + +/** + * ParallelConsumerFactory will be started and closed by Spring LifeCycle. + * This class is quite simple, because ParallelConsumer requires delegating the situation to itself. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public class ParallelConsumerFactory implements SmartLifecycle { + + public static final String DEFAULT_BEAN_NAME = "parallelConsumerFactory"; + + private final DefaultKafkaConsumerFactory defaultKafkaConsumerFactory; + private final DefaultKafkaProducerFactory defaultKafkaProducerFactory; + private final ParallelConsumerContext parallelConsumerContext; + private final ParallelStreamProcessor parallelConsumer; + private final ParallelConsumerOptions parallelConsumerOptions; + private boolean running; + + public ParallelConsumerFactory(ParallelConsumerContext parallelConsumerContext, + DefaultKafkaConsumerFactory defaultKafkaConsumerFactory, + DefaultKafkaProducerFactory defaultKafkaProducerFactory) { + this.parallelConsumerContext = parallelConsumerContext; + this.defaultKafkaConsumerFactory = defaultKafkaConsumerFactory; + this.defaultKafkaProducerFactory = defaultKafkaProducerFactory; + + final Consumer kafkaConsumer = defaultKafkaConsumerFactory.createConsumer(); + final Producer kafkaProducer = defaultKafkaProducerFactory.createProducer(); + this.parallelConsumerOptions = parallelConsumerOptions(kafkaConsumer, kafkaProducer); + this.parallelConsumer = ParallelStreamProcessor.createEosStreamProcessor(this.parallelConsumerOptions); + } + + + private ParallelConsumerOptions parallelConsumerOptions(Consumer consumer, + Producer producer) { + final ParallelConsumerRootInterface callback = parallelConsumerContext.parallelConsumerCallback(); + if (callback instanceof PollAndProduceMany || + callback instanceof PollAndProduce) { + return parallelConsumerContext.getParallelConsumerOptions(consumer, producer); + } else { + return parallelConsumerContext.getParallelConsumerOptions(consumer); + } + } + + @Override + public void start() { + subscribe(); + + final ParallelConsumerRootInterface callback0 = parallelConsumerContext.parallelConsumerCallback(); + + if (callback0 instanceof ParallelConsumerResultInterface) { + if (callback0 instanceof PollAndProduceManyResult) { + final PollAndProduceManyResult callback = + (PollAndProduceManyResult) callback0; + + this.parallelConsumer.pollAndProduceMany(callback::accept, callback::resultConsumer); + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduceResult callback = + (PollAndProduceResult) callback0; + + this.parallelConsumer.pollAndProduce(callback::accept, callback::resultConsumer); + } + else { + throw new UnsupportedOperationException(); + } + } else { + if (callback0 instanceof PollAndProduceMany) { + final PollAndProduceMany callback = + (PollAndProduceMany) callback0; + + this.parallelConsumer.pollAndProduceMany(callback::accept); + } + else if (callback0 instanceof PollAndProduce) { + final PollAndProduce callback = + (PollAndProduce) callback0; + + this.parallelConsumer.pollAndProduce(callback::accept); + } + else if (callback0 instanceof Poll) { + final Poll callback = (Poll) callback0; + + this.parallelConsumer.poll(callback::accept); + } + else { + throw new UnsupportedOperationException(); + } + } + this.running = true; + } + + @Override + public void stop() { + final ParallelConsumerRootInterface callback = + this.parallelConsumerContext.parallelConsumerCallback(); + final DrainingMode drainingMode = callback.drainingMode(); + final Duration duration = callback.drainTimeOut(); + + this.parallelConsumer.close(duration, drainingMode); + this.running = false; + } + + @Override + public boolean isRunning() { + return this.running; + } + + private void subscribe() { + final ParallelConsumerRootInterface callback = this.parallelConsumerContext.parallelConsumerCallback(); + + final List topics = callback.getTopics(); + final ConsumerRebalanceListener rebalanceListener = callback.getRebalanceListener(); + + if (topics != null && !topics.isEmpty()) { + subscribe(topics, rebalanceListener); + } + else { + subscribe(callback.getSubscribeTopicsPattern(), rebalanceListener); + } + } + + private void subscribe(Collection topics, ConsumerRebalanceListener listenerCallback){ + if (listenerCallback == null) { + this.parallelConsumer.subscribe(topics); + } + else { + this.parallelConsumer.subscribe(topics, listenerCallback); + } + } + + private void subscribe(Pattern pattern, ConsumerRebalanceListener listenerCallback) { + if (listenerCallback == null) { + this.parallelConsumer.subscribe(pattern); + } + else { + this.parallelConsumer.subscribe(pattern, listenerCallback); + } + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java new file mode 100644 index 0000000000..5d56e0c6a5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerOptionsProvider.java @@ -0,0 +1,167 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.time.Duration; +import java.util.function.Function; + +import javax.annotation.Nullable; + +import io.confluent.parallelconsumer.ParallelConsumer; +import io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode; +import io.confluent.parallelconsumer.ParallelConsumerOptions.InvalidOffsetMetadataHandlingPolicy; +import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder; +import io.confluent.parallelconsumer.RecordContext; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; + +/** + * User can configure options of {@link ParallelConsumer} via {@link ParallelConsumerOptionsProvider}. + * If user want to configure options of {@link ParallelConsumer}, user should implement {@link ParallelConsumerOptionsProvider} + * and register it as spring bean. + * + * User don't need to implement all of methods. + * Note : If a method returns null, that option will use the default value of the {@link ParallelConsumer}. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface ParallelConsumerOptionsProvider { + + @Nullable + default String managedExecutorService() { + return null; + } + + @Nullable + default String managedThreadFactory() { + return null; + } + + @Nullable + default MeterRegistry meterRegistry() { + return null; + } + + @Nullable + default String pcInstanceTag() { + return null; + } + + @Nullable + default Iterable metricsTags() { + return null; + } + + @Nullable + default Boolean allowEagerProcessingDuringTransactionCommit() { + return null; + } + + @Nullable + default Duration commitLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration produceLockAcquisitionTimeout() { + return null; + } + + @Nullable + default Duration commitInterval() { + return null; + } + + @Nullable + default ProcessingOrder ordering() { + return null; + } + + @Nullable + default CommitMode commitMode() { + return null; + } + + @Nullable + default Integer maxConcurrency() { + return null; + } + + @Nullable + default InvalidOffsetMetadataHandlingPolicy invalidOffsetMetadataPolicy() { + return null; + } + + @Nullable + default Function, Duration> retryDelayProvider() { + return null; + } + + @Nullable + default Duration sendTimeout() { + return null; + } + + @Nullable + default Duration offsetCommitTimeout() { + return null; + } + + @Nullable + default Integer batchSize() { + return null; + } + + @Nullable + default Duration thresholdForTimeSpendInQueueWarning () { + return null; + } + + @Nullable + default Integer maxFailureHistory() { + return null; + } + + @Nullable + default Duration shutdownTimeout() { + return null; + } + + @Nullable + default Duration drainTimeout() { + return null; + } + + @Nullable + default Integer messageBufferSize() { + return null; + } + + @Nullable + default Integer initialLoadFactor() { + return null; + } + + @Nullable + default Integer maximumLoadFactor() { + return null; + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java new file mode 100644 index 0000000000..481065d910 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerResultInterface.java @@ -0,0 +1,35 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Consumer; + +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; + +/** + * This interface is an interface that marks whether there is a Callback for {@link ConsumeProduceResult}. + * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult} + * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerResultInterface}. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface ParallelConsumerResultInterface { + Consumer> resultConsumer(ConsumeProduceResult result); +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java new file mode 100644 index 0000000000..9b6380c240 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/ParallelConsumerRootInterface.java @@ -0,0 +1,58 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.time.Duration; +import java.util.List; +import java.util.regex.Pattern; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + +import io.confluent.parallelconsumer.internal.DrainingCloseable.DrainingMode; + +/** + * This interface provides a common interface for sub-interfaces. + * Users should not implement this interface. + * Users should implement one of {@link Poll}, {@link PollAndProduce}, {@link PollAndProduceResult} + * , {@link PollAndProduceMany}, {@link PollAndProduceManyResult} instead of {@link ParallelConsumerRootInterface}. + + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface ParallelConsumerRootInterface { + + /** + * ... + */ + List getTopics(); + default Pattern getSubscribeTopicsPattern(){ + return null; + } + default ConsumerRebalanceListener getRebalanceListener(){ + return null; + } + default DrainingMode drainingMode() { + return DrainingMode.DONT_DRAIN; + } + + default Duration drainTimeOut() { + return Duration.ofMillis(0); + } + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java new file mode 100644 index 0000000000..f3a399d979 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/Poll.java @@ -0,0 +1,48 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Consumer; + +import org.springframework.kafka.config.ParallelConsumerContext; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; + +/** + * This interface is intended for use when the user does not use the producer after consuming. + * User should implement {@link Poll} and register it as Spring Bean. + * {@link Poll#accept(PollContext)} will be called by {@link ParallelStreamProcessor#poll(Consumer)} + * when {@link ParallelConsumerFactory} started. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface Poll extends ParallelConsumerRootInterface { + + /** + * This is for {@link ParallelConsumerFactory} and {@link ParallelConsumerContext}. + * ParallelConsumer will process the consumed messages using this callback. + * @param context context which Parallel Consumer produce + * @return void. + */ + void accept(PollContext context); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java new file mode 100644 index 0000000000..1eb171d24e --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduce.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Function; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; + +/** + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduce} and register it as Spring Bean. + * {@link PollAndProduce#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduce(Function)} + * when {@link ParallelConsumerFactory} started. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface PollAndProduce extends ParallelConsumerRootInterface { + + /** + * ... + */ + ProducerRecord accept(PollContext context); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java new file mode 100644 index 0000000000..5d7f20c04f --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceMany.java @@ -0,0 +1,45 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.List; +import java.util.function.Function; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.PollContext; + +/** + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduceMany} and register it as Spring Bean. + * {@link PollAndProduceMany#accept(PollContext)} will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function)} + * when {@link ParallelConsumerFactory} started. + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface PollAndProduceMany extends ParallelConsumerRootInterface { + + /** + * ... + */ + List> accept(PollContext context); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java new file mode 100644 index 0000000000..810c8af97b --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceManyResult.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduce} and register it as Spring Bean. + * {@link PollAndProduceManyResult#accept(PollContext)} and {@link PollAndProduceManyResult#resultConsumer(ConsumeProduceResult)} + * will be called by {@link ParallelStreamProcessor#pollAndProduceMany(Function, Consumer)} + * when {@link ParallelConsumerFactory} started. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface PollAndProduceManyResult extends PollAndProduceMany, + ParallelConsumerResultInterface { + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java new file mode 100644 index 0000000000..fbf9c4c48c --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/parallelconsumer/PollAndProduceResult.java @@ -0,0 +1,43 @@ +/* + * Copyright 2014-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core.parallelconsumer; + +import java.util.function.Consumer; +import java.util.function.Function; + +import org.springframework.kafka.core.ParallelConsumerFactory; + +import io.confluent.parallelconsumer.ParallelStreamProcessor; +import io.confluent.parallelconsumer.ParallelStreamProcessor.ConsumeProduceResult; +import io.confluent.parallelconsumer.PollContext; + +/** + * This interface is intended for use when the user use the producer after consuming. + * User should implement {@link PollAndProduceResult} and register it as Spring Bean. + * Both {@link PollAndProduceResult#accept(PollContext)} {@link PollAndProduceResult#resultConsumer(ConsumeProduceResult)} + * will be called by {@link ParallelStreamProcessor#pollAndProduce(Function, Consumer)} + * when {@link ParallelConsumerFactory} started. + * + * @author Sanghyeok An + * + * @since 3.3 + */ + +public interface PollAndProduceResult extends PollAndProduce, + ParallelConsumerResultInterface { + +}