diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index d531822d02..88d32ce568 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -2471,6 +2471,10 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |When present and `syncCommits` is `false` a callback invoked after the commit completes. +|[[offsetAndMetadataProvider]]<> +|`null` +|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. + |[[commitLogLevel]]<> |DEBUG |The logging level for logs pertaining to committing offsets. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 81c9794b34..76a7d6281f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -376,13 +376,8 @@ public String getListenerId() { return this.beanName; // the container factory sets the bean name to the id attribute } - /** - * Get arbitrary static information that will be added to the - * {@link KafkaHeaders#LISTENER_INFO} header of all records. - * @return the info. - * @since 2.8.4 - */ @Nullable + @Override public byte[] getListenerInfo() { return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 32959fbf6d..87ef5ca5cf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -44,6 +44,7 @@ * @param the value type. * * @author Gary Russell + * @author Francois Rosiere * * @since 1.3.5 * @@ -139,7 +140,8 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { ConsumerRecord skipped = records.get(0); this.kafkaTemplate.sendOffsetsToTransaction( Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()), - new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata()); + createOffsetAndMetadata(container, skipped.offset() + 1) + ), consumer.groupMetadata()); } if (!recoverable && this.backOff != null) { @@ -165,4 +167,10 @@ public void clearThreadState() { this.lastIntervals.remove(); } + private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) { + if (container == null) { + return new OffsetAndMetadata(offset); + } + return ListenerUtils.createOffsetAndMetadata(container, offset); + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java new file mode 100644 index 0000000000..52e744ac3d --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Default implementation for {@link ListenerMetadata}. + * @author Francois Rosiere + * @since 2.8.6 + */ +class DefaultListenerMetadata implements ListenerMetadata { + + private final MessageListenerContainer container; + + DefaultListenerMetadata(MessageListenerContainer container) { + Assert.notNull(container, "'container' must not be null"); + this.container = container; + } + + @Override + @Nullable + public String getListenerId() { + return this.container.getListenerId(); + } + + @Override + @Nullable + public String getGroupId() { + return this.container.getGroupId(); + } + + @Override + @Nullable + public byte[] getListenerInfo() { + return this.container.getListenerInfo(); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 566d944f57..178c23ee39 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -47,6 +47,7 @@ * fallback handler. * * @author Gary Russell + * @author Francois Rosiere * @since 2.8 * */ @@ -138,7 +139,7 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N } Map offsets = new HashMap<>(); toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()), - (key, val) -> new OffsetAndMetadata(rec.offset() + 1))); + (key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1))); if (offsets.size() > 0) { commit(consumer, container, offsets); } @@ -149,7 +150,7 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N ConsumerRecord recovered = remaining.get(0); commit(consumer, container, Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - new OffsetAndMetadata(recovered.offset() + 1))); + ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); if (remaining.size() > 1) { throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 24c903df38..7604eb4923 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -144,6 +144,7 @@ * @author Tom van den Berge * @author Lukasz Kaminski * @author Tomaz Fernandes + * @author Francois Rosiere */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer { @@ -560,7 +561,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ? (listenerMetadata, offset) -> new OffsetAndMetadata(offset) : this.containerProperties.getOffsetAndMetadataProvider(); - private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata(); + private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this); private final Consumer consumer; @@ -3144,32 +3145,7 @@ public String toString() { } private OffsetAndMetadata createOffsetAndMetadata(long offset) { - return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset); - } - - private final class ConsumerAwareListenerMetadata implements ListenerMetadata { - - ConsumerAwareListenerMetadata() { - } - - @Override - @Nullable - public String getListenerId() { - return getBeanName(); - } - - @Override - @Nullable - public String getGroupId() { - return ListenerConsumer.this.consumerGroupId; - } - - @Override - @Nullable - public byte[] getListenerInfo() { - return ListenerConsumer.this.listenerinfo; - } - + return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset); } private final class ConsumerAcknowledgment implements Acknowledgment { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index 68afa00110..d6f5652826 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -22,6 +22,7 @@ import java.io.ObjectStreamClass; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -37,6 +38,7 @@ * Listener utilities. * * @author Gary Russell + * @author Francois Rosiere * @since 2.0 * */ @@ -194,5 +196,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter while (System.currentTimeMillis() < timeout); } + /** + * Create a new {@link OffsetAndMetadata} using the given container and offset. + * @param container a container. + * @param offset an offset. + * @return an offset and metadata. + * @since 2.8.6 + */ + public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container, + long offset) { + final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties() + .getOffsetAndMetadataProvider(); + if (metadataProvider != null) { + return metadataProvider.provide(new DefaultListenerMetadata(container), offset); + } + return new OffsetAndMetadata(offset); + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java index 27b5f76214..5c3f06eb0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; /** @@ -35,6 +36,7 @@ * @author Gary Russell * @author Vladimir Tsanev * @author Tomaz Fernandes + * @author Francois Rosiere */ public interface MessageListenerContainer extends SmartLifecycle, DisposableBean { @@ -194,6 +196,17 @@ default String getListenerId() { throw new UnsupportedOperationException("This container does not support retrieving the listener id"); } + /** + * Get arbitrary static information that will be added to the + * {@link KafkaHeaders#LISTENER_INFO} header of all records. + * @return the info. + * @since 2.8.6 + */ + @Nullable + default byte[] getListenerInfo() { + throw new UnsupportedOperationException("This container does not support retrieving the listener info"); + } + /** * If this container has child containers, return true if at least one child is running. If there are not * child containers, returns {@link #isRunning()}. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java index eb1f49148e..a67cba1ce0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java @@ -19,9 +19,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; /** - * Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets, - * the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to - * have more granularity in the way to create an {@link OffsetAndMetadata}. + * Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an + * {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets. * * @author Francois Rosiere * @since 2.8.5 diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 7bfde1fb28..60e83f2c50 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -44,6 +44,7 @@ * Seek utilities. * * @author Gary Russell + * @author Francois Rosiere * @since 2.2 * */ @@ -211,7 +212,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List record = records.get(0); Map offsetToCommit = Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); + ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1)); if (container.getContainerProperties().isSyncCommits()) { consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout()); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java index efcde9dec9..9080a47d92 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java @@ -48,6 +48,7 @@ /** * @author Gary Russell + * @author Francois Rosiere * @since 2.3.1 * */ @@ -75,6 +76,7 @@ void testClassifier() { Consumer consumer = mock(Consumer.class); given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo")); MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.getContainerProperties()).willReturn(new ContainerProperties("foo")); processor.process(records, consumer, container, illegalState, true, EOSMode.V2); processor.process(records, consumer, container, new DeserializationException("intended", null, false, illegalState), true, EOSMode.V2); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java index 6bbca30f94..c60ce19827 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,10 +20,12 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.junit.jupiter.api.Test; /** * @author Gary Russell + * @author Francois Rosiere * @since 2.7.1 * */ @@ -38,4 +40,24 @@ void stoppableSleep() throws InterruptedException { assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500); } + @Test + void testCreationOfOffsetAndMetadataWithoutProvider() { + final MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.getContainerProperties()).willReturn(new ContainerProperties("foo")); + final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L); + assertThat(offsetAndMetadata.offset()).isEqualTo(1); + assertThat(offsetAndMetadata.metadata()).isEmpty(); + } + + @Test + void testCreationOfOffsetAndMetadataWithProvider() { + final MessageListenerContainer container = mock(MessageListenerContainer.class); + final ContainerProperties properties = new ContainerProperties("foo"); + properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata")); + given(container.getContainerProperties()).willReturn(properties); + final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L); + assertThat(offsetAndMetadata.offset()).isEqualTo(1); + assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata"); + } } +