From e90b92c17404a076b4e86c29b5fb8731820d8a75 Mon Sep 17 00:00:00 2001 From: Francois Rosiere Date: Wed, 4 May 2022 00:46:51 +0200 Subject: [PATCH 1/3] GH-2252: Keep offset metadata in case of batch reprocessing --- .../AbstractMessageListenerContainer.java | 7 +-- .../DefaultAfterRollbackProcessor.java | 9 +++- .../listener/DefaultListenerMetadata.java | 51 +++++++++++++++++++ .../kafka/listener/FailedBatchProcessor.java | 4 +- .../KafkaMessageListenerContainer.java | 29 +---------- .../kafka/listener/ListenerUtils.java | 17 +++++++ .../listener/MessageListenerContainer.java | 12 +++++ .../listener/OffsetAndMetadataProvider.java | 5 +- .../kafka/listener/SeekUtils.java | 2 +- .../DefaultAfterRollbackProcessorTests.java | 1 + .../kafka/listener/ListenerUtilsTests.java | 23 ++++++++- 11 files changed, 119 insertions(+), 41 deletions(-) create mode 100644 spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 81c9794b34..76a7d6281f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -376,13 +376,8 @@ public String getListenerId() { return this.beanName; // the container factory sets the bean name to the id attribute } - /** - * Get arbitrary static information that will be added to the - * {@link KafkaHeaders#LISTENER_INFO} header of all records. - * @return the info. - * @since 2.8.4 - */ @Nullable + @Override public byte[] getListenerInfo() { return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 32959fbf6d..212293fdb8 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -139,7 +139,8 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) { ConsumerRecord skipped = records.get(0); this.kafkaTemplate.sendOffsetsToTransaction( Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()), - new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata()); + createOffsetAndMetadata(container, skipped.offset() + 1) + ), consumer.groupMetadata()); } if (!recoverable && this.backOff != null) { @@ -165,4 +166,10 @@ public void clearThreadState() { this.lastIntervals.remove(); } + private OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) { + if (container == null) { + return new OffsetAndMetadata(offset); + } + return ListenerUtils.createOffsetAndMetadata(container, offset); + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java new file mode 100644 index 0000000000..85f0da20d5 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java @@ -0,0 +1,51 @@ +/* + * Copyright 2017-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import org.springframework.lang.Nullable; + +/** + * Default implementation for {@link ListenerMetadata}. + * @author Francois Rosiere + * @since 2.8.6 + */ +public class DefaultListenerMetadata implements ListenerMetadata { + + private final MessageListenerContainer container; + + public DefaultListenerMetadata(MessageListenerContainer container) { + this.container = container; + } + + @Override + @Nullable + public String getListenerId() { + return this.container.getListenerId(); + } + + @Override + @Nullable + public String getGroupId() { + return this.container.getGroupId(); + } + + @Override + @Nullable + public byte[] getListenerInfo() { + return this.container.getListenerInfo(); + } +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 566d944f57..91b6c0b70e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -138,7 +138,7 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N } Map offsets = new HashMap<>(); toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()), - (key, val) -> new OffsetAndMetadata(rec.offset() + 1))); + (key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1))); if (offsets.size() > 0) { commit(consumer, container, offsets); } @@ -149,7 +149,7 @@ private ConsumerRecords seekOrRecover(Exception thrownException, @N ConsumerRecord recovered = remaining.get(0); commit(consumer, container, Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()), - new OffsetAndMetadata(recovered.offset() + 1))); + ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1))); if (remaining.size() > 1) { throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 24c903df38..65a0c0d20a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -560,7 +560,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume ? (listenerMetadata, offset) -> new OffsetAndMetadata(offset) : this.containerProperties.getOffsetAndMetadataProvider(); - private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata(); + private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this); private final Consumer consumer; @@ -3144,32 +3144,7 @@ public String toString() { } private OffsetAndMetadata createOffsetAndMetadata(long offset) { - return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset); - } - - private final class ConsumerAwareListenerMetadata implements ListenerMetadata { - - ConsumerAwareListenerMetadata() { - } - - @Override - @Nullable - public String getListenerId() { - return getBeanName(); - } - - @Override - @Nullable - public String getGroupId() { - return ListenerConsumer.this.consumerGroupId; - } - - @Override - @Nullable - public byte[] getListenerInfo() { - return ListenerConsumer.this.listenerinfo; - } - + return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset); } private final class ConsumerAcknowledgment implements Acknowledgment { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index 68afa00110..ac8b68ea25 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -22,6 +22,7 @@ import java.io.ObjectStreamClass; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -194,5 +195,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter while (System.currentTimeMillis() < timeout); } + /** + * Create a new {@link OffsetAndMetadata} using the given container and offset. + * @param container a container. + * @param offset an offset. + * @return an offset and metadata. + * @since 2.8.6 + */ + public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container, + long offset) { + final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties() + .getOffsetAndMetadataProvider(); + if (metadataProvider != null) { + return metadataProvider.provide(new DefaultListenerMetadata(container), offset); + } + return new OffsetAndMetadata(offset); + } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java index 27b5f76214..c20497ba7a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java @@ -25,6 +25,7 @@ import org.springframework.beans.factory.DisposableBean; import org.springframework.context.SmartLifecycle; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.lang.Nullable; /** @@ -194,6 +195,17 @@ default String getListenerId() { throw new UnsupportedOperationException("This container does not support retrieving the listener id"); } + /** + * Get arbitrary static information that will be added to the + * {@link KafkaHeaders#LISTENER_INFO} header of all records. + * @return the info. + * @since 2.8.4 + */ + @Nullable + default byte[] getListenerInfo() { + throw new UnsupportedOperationException("This container does not support retrieving the listener info"); + } + /** * If this container has child containers, return true if at least one child is running. If there are not * child containers, returns {@link #isRunning()}. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java index eb1f49148e..a67cba1ce0 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java @@ -19,9 +19,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; /** - * Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets, - * the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to - * have more granularity in the way to create an {@link OffsetAndMetadata}. + * Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an + * {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets. * * @author Francois Rosiere * @since 2.8.5 diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 7bfde1fb28..6549b70f31 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -211,7 +211,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List record = records.get(0); Map offsetToCommit = Collections.singletonMap( new TopicPartition(record.topic(), record.partition()), - new OffsetAndMetadata(record.offset() + 1)); + ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1)); if (container.getContainerProperties().isSyncCommits()) { consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout()); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java index efcde9dec9..847a560ad8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java @@ -75,6 +75,7 @@ void testClassifier() { Consumer consumer = mock(Consumer.class); given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo")); MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.getContainerProperties()).willReturn(new ContainerProperties("foo")); processor.process(records, consumer, container, illegalState, true, EOSMode.V2); processor.process(records, consumer, container, new DeserializationException("intended", null, false, illegalState), true, EOSMode.V2); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java index 6bbca30f94..787a0fb9dd 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.junit.jupiter.api.Test; /** @@ -38,4 +39,24 @@ void stoppableSleep() throws InterruptedException { assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500); } + @Test + void testCreationOfOffsetAndMetadataWithoutProvider() { + final MessageListenerContainer container = mock(MessageListenerContainer.class); + given(container.getContainerProperties()).willReturn(new ContainerProperties("foo")); + final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L); + assertThat(offsetAndMetadata.offset()).isEqualTo(1); + assertThat(offsetAndMetadata.metadata()).isEmpty(); + } + + @Test + void testCreationOfOffsetAndMetadataWithProvider() { + final MessageListenerContainer container = mock(MessageListenerContainer.class); + final ContainerProperties properties = new ContainerProperties("foo"); + properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata")); + given(container.getContainerProperties()).willReturn(properties); + final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L); + assertThat(offsetAndMetadata.offset()).isEqualTo(1); + assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata"); + } } + From 9057aee2b3e0755f792ac4127f8b53f65d594f34 Mon Sep 17 00:00:00 2001 From: Francois Rosiere Date: Wed, 4 May 2022 23:03:37 +0200 Subject: [PATCH 2/3] GH-2252: Applied code review --- spring-kafka-docs/src/main/asciidoc/kafka.adoc | 4 ++++ .../kafka/listener/DefaultAfterRollbackProcessor.java | 1 + .../kafka/listener/DefaultListenerMetadata.java | 6 ++++-- .../kafka/listener/FailedBatchProcessor.java | 1 + .../kafka/listener/KafkaMessageListenerContainer.java | 1 + .../org/springframework/kafka/listener/ListenerUtils.java | 1 + .../kafka/listener/MessageListenerContainer.java | 3 ++- .../java/org/springframework/kafka/listener/SeekUtils.java | 1 + .../kafka/listener/DefaultAfterRollbackProcessorTests.java | 1 + .../springframework/kafka/listener/ListenerUtilsTests.java | 1 + 10 files changed, 17 insertions(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index d531822d02..88d32ce568 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -2471,6 +2471,10 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize |`null` |When present and `syncCommits` is `false` a callback invoked after the commit completes. +|[[offsetAndMetadataProvider]]<> +|`null` +|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata. + |[[commitLogLevel]]<> |DEBUG |The logging level for logs pertaining to committing offsets. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 212293fdb8..3be537ceb2 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -44,6 +44,7 @@ * @param the value type. * * @author Gary Russell + * @author Francois Rosiere * * @since 1.3.5 * diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java index 85f0da20d5..52e744ac3d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultListenerMetadata.java @@ -17,17 +17,19 @@ package org.springframework.kafka.listener; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; /** * Default implementation for {@link ListenerMetadata}. * @author Francois Rosiere * @since 2.8.6 */ -public class DefaultListenerMetadata implements ListenerMetadata { +class DefaultListenerMetadata implements ListenerMetadata { private final MessageListenerContainer container; - public DefaultListenerMetadata(MessageListenerContainer container) { + DefaultListenerMetadata(MessageListenerContainer container) { + Assert.notNull(container, "'container' must not be null"); this.container = container; } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java index 91b6c0b70e..178c23ee39 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java @@ -47,6 +47,7 @@ * fallback handler. * * @author Gary Russell + * @author Francois Rosiere * @since 2.8 * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 65a0c0d20a..7604eb4923 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -144,6 +144,7 @@ * @author Tom van den Berge * @author Lukasz Kaminski * @author Tomaz Fernandes + * @author Francois Rosiere */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java index ac8b68ea25..d6f5652826 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java @@ -38,6 +38,7 @@ * Listener utilities. * * @author Gary Russell + * @author Francois Rosiere * @since 2.0 * */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java index c20497ba7a..5c3f06eb0d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java @@ -36,6 +36,7 @@ * @author Gary Russell * @author Vladimir Tsanev * @author Tomaz Fernandes + * @author Francois Rosiere */ public interface MessageListenerContainer extends SmartLifecycle, DisposableBean { @@ -199,7 +200,7 @@ default String getListenerId() { * Get arbitrary static information that will be added to the * {@link KafkaHeaders#LISTENER_INFO} header of all records. * @return the info. - * @since 2.8.4 + * @since 2.8.6 */ @Nullable default byte[] getListenerInfo() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java index 6549b70f31..60e83f2c50 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java @@ -44,6 +44,7 @@ * Seek utilities. * * @author Gary Russell + * @author Francois Rosiere * @since 2.2 * */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java index 847a560ad8..9080a47d92 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java @@ -48,6 +48,7 @@ /** * @author Gary Russell + * @author Francois Rosiere * @since 2.3.1 * */ diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java index 787a0fb9dd..c60ce19827 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java @@ -25,6 +25,7 @@ /** * @author Gary Russell + * @author Francois Rosiere * @since 2.7.1 * */ From e72fed3f0d7070caf88bfff1c5a0583e7be42793 Mon Sep 17 00:00:00 2001 From: Francois Rosiere Date: Thu, 5 May 2022 20:51:54 +0200 Subject: [PATCH 3/3] GH-2252: Added missing static keyword --- .../kafka/listener/DefaultAfterRollbackProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index 3be537ceb2..87ef5ca5cf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -167,7 +167,7 @@ public void clearThreadState() { this.lastIntervals.remove(); } - private OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) { + private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) { if (container == null) { return new OffsetAndMetadata(offset); }