Skip to content

GH-2252: Keep offset metadata in case of batch reprocessing #2253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2471,6 +2471,10 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize
|`null`
|When present and `syncCommits` is `false` a callback invoked after the commit completes.

|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
|`null`
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.

|[[commitLogLevel]]<<commitLogLevel,`commitLogLevel`>>
|DEBUG
|The logging level for logs pertaining to committing offsets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,8 @@ public String getListenerId() {
return this.beanName; // the container factory sets the bean name to the id attribute
}

/**
* Get arbitrary static information that will be added to the
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
* @return the info.
* @since 2.8.4
*/
@Nullable
@Override
public byte[] getListenerInfo() {
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* @param <V> the value type.
*
* @author Gary Russell
* @author Francois Rosiere
*
* @since 1.3.5
*
Expand Down Expand Up @@ -139,7 +140,8 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K, V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata());
createOffsetAndMetadata(container, skipped.offset() + 1)
), consumer.groupMetadata());
}

if (!recoverable && this.backOff != null) {
Expand All @@ -165,4 +167,10 @@ public void clearThreadState() {
this.lastIntervals.remove();
}

private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) {
if (container == null) {
return new OffsetAndMetadata(offset);
}
return ListenerUtils.createOffsetAndMetadata(container, offset);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Default implementation for {@link ListenerMetadata}.
* @author Francois Rosiere
* @since 2.8.6
*/
class DefaultListenerMetadata implements ListenerMetadata {

private final MessageListenerContainer container;

DefaultListenerMetadata(MessageListenerContainer container) {
Assert.notNull(container, "'container' must not be null");
this.container = container;
}

@Override
@Nullable
public String getListenerId() {
return this.container.getListenerId();
}

@Override
@Nullable
public String getGroupId() {
return this.container.getGroupId();
}

@Override
@Nullable
public byte[] getListenerInfo() {
return this.container.getListenerInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* fallback handler.
*
* @author Gary Russell
* @author Francois Rosiere
* @since 2.8
*
*/
Expand Down Expand Up @@ -138,7 +139,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
}
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
(key, val) -> new OffsetAndMetadata(rec.offset() + 1)));
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
if (offsets.size() > 0) {
commit(consumer, container, offsets);
}
Expand All @@ -149,7 +150,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
ConsumerRecord<?, ?> recovered = remaining.get(0);
commit(consumer, container,
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
new OffsetAndMetadata(recovered.offset() + 1)));
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
if (remaining.size() > 1) {
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
* @author Tom van den Berge
* @author Lukasz Kaminski
* @author Tomaz Fernandes
* @author Francois Rosiere
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> {
Expand Down Expand Up @@ -560,7 +561,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
: this.containerProperties.getOffsetAndMetadataProvider();

private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);

private final Consumer<K, V> consumer;

Expand Down Expand Up @@ -3144,32 +3145,7 @@ public String toString() {
}

private OffsetAndMetadata createOffsetAndMetadata(long offset) {
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
}

private final class ConsumerAwareListenerMetadata implements ListenerMetadata {

ConsumerAwareListenerMetadata() {
}

@Override
@Nullable
public String getListenerId() {
return getBeanName();
}

@Override
@Nullable
public String getGroupId() {
return ListenerConsumer.this.consumerGroupId;
}

@Override
@Nullable
public byte[] getListenerInfo() {
return ListenerConsumer.this.listenerinfo;
}

return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
}

private final class ConsumerAcknowledgment implements Acknowledgment {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.ObjectStreamClass;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
Expand All @@ -37,6 +38,7 @@
* Listener utilities.
*
* @author Gary Russell
* @author Francois Rosiere
* @since 2.0
*
*/
Expand Down Expand Up @@ -194,5 +196,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter
while (System.currentTimeMillis() < timeout);
}

/**
* Create a new {@link OffsetAndMetadata} using the given container and offset.
* @param container a container.
* @param offset an offset.
* @return an offset and metadata.
* @since 2.8.6
*/
public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container,
long offset) {
final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties()
.getOffsetAndMetadataProvider();
if (metadataProvider != null) {
return metadataProvider.provide(new DefaultListenerMetadata(container), offset);
}
return new OffsetAndMetadata(offset);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.lang.Nullable;

/**
Expand All @@ -35,6 +36,7 @@
* @author Gary Russell
* @author Vladimir Tsanev
* @author Tomaz Fernandes
* @author Francois Rosiere
*/
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {

Expand Down Expand Up @@ -194,6 +196,17 @@ default String getListenerId() {
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
}

/**
* Get arbitrary static information that will be added to the
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
* @return the info.
* @since 2.8.6
*/
@Nullable
default byte[] getListenerInfo() {
throw new UnsupportedOperationException("This container does not support retrieving the listener info");
}

/**
* If this container has child containers, return true if at least one child is running. If there are not
* child containers, returns {@link #isRunning()}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;

/**
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
* have more granularity in the way to create an {@link OffsetAndMetadata}.
* Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
* {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
*
* @author Francois Rosiere
* @since 2.8.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* Seek utilities.
*
* @author Gary Russell
* @author Francois Rosiere
* @since 2.2
*
*/
Expand Down Expand Up @@ -211,7 +212,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
ConsumerRecord<?, ?> record = records.get(0);
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
if (container.getContainerProperties().isSyncCommits()) {
consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

/**
* @author Gary Russell
* @author Francois Rosiere
* @since 2.3.1
*
*/
Expand Down Expand Up @@ -75,6 +76,7 @@ void testClassifier() {
Consumer<String, String> consumer = mock(Consumer.class);
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
processor.process(records, consumer, container,
new DeserializationException("intended", null, false, illegalState), true, EOSMode.V2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,10 +20,12 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.junit.jupiter.api.Test;

/**
* @author Gary Russell
* @author Francois Rosiere
* @since 2.7.1
*
*/
Expand All @@ -38,4 +40,24 @@ void stoppableSleep() throws InterruptedException {
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
}

@Test
void testCreationOfOffsetAndMetadataWithoutProvider() {
final MessageListenerContainer container = mock(MessageListenerContainer.class);
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
assertThat(offsetAndMetadata.metadata()).isEmpty();
}

@Test
void testCreationOfOffsetAndMetadataWithProvider() {
final MessageListenerContainer container = mock(MessageListenerContainer.class);
final ContainerProperties properties = new ContainerProperties("foo");
properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata"));
given(container.getContainerProperties()).willReturn(properties);
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata");
}
}