Skip to content

Commit e90b92c

Browse files
committed
spring-projectsGH-2252: Keep offset metadata in case of batch reprocessing
1 parent e1160fa commit e90b92c

11 files changed

+119
-41
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,13 +376,8 @@ public String getListenerId() {
376376
return this.beanName; // the container factory sets the bean name to the id attribute
377377
}
378378

379-
/**
380-
* Get arbitrary static information that will be added to the
381-
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
382-
* @return the info.
383-
* @since 2.8.4
384-
*/
385379
@Nullable
380+
@Override
386381
public byte[] getListenerInfo() {
387382
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
388383
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
139139
ConsumerRecord<K, V> skipped = records.get(0);
140140
this.kafkaTemplate.sendOffsetsToTransaction(
141141
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
142-
new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata());
142+
createOffsetAndMetadata(container, skipped.offset() + 1)
143+
), consumer.groupMetadata());
143144
}
144145

145146
if (!recoverable && this.backOff != null) {
@@ -165,4 +166,10 @@ public void clearThreadState() {
165166
this.lastIntervals.remove();
166167
}
167168

169+
private OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) {
170+
if (container == null) {
171+
return new OffsetAndMetadata(offset);
172+
}
173+
return ListenerUtils.createOffsetAndMetadata(container, offset);
174+
}
168175
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2017-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
21+
/**
22+
* Default implementation for {@link ListenerMetadata}.
23+
* @author Francois Rosiere
24+
* @since 2.8.6
25+
*/
26+
public class DefaultListenerMetadata implements ListenerMetadata {
27+
28+
private final MessageListenerContainer container;
29+
30+
public DefaultListenerMetadata(MessageListenerContainer container) {
31+
this.container = container;
32+
}
33+
34+
@Override
35+
@Nullable
36+
public String getListenerId() {
37+
return this.container.getListenerId();
38+
}
39+
40+
@Override
41+
@Nullable
42+
public String getGroupId() {
43+
return this.container.getGroupId();
44+
}
45+
46+
@Override
47+
@Nullable
48+
public byte[] getListenerInfo() {
49+
return this.container.getListenerInfo();
50+
}
51+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
138138
}
139139
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
140140
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
141-
(key, val) -> new OffsetAndMetadata(rec.offset() + 1)));
141+
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
142142
if (offsets.size() > 0) {
143143
commit(consumer, container, offsets);
144144
}
@@ -149,7 +149,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
149149
ConsumerRecord<?, ?> recovered = remaining.get(0);
150150
commit(consumer, container,
151151
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
152-
new OffsetAndMetadata(recovered.offset() + 1)));
152+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
153153
if (remaining.size() > 1) {
154154
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
155155
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
560560
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
561561
: this.containerProperties.getOffsetAndMetadataProvider();
562562

563-
private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
563+
private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);
564564

565565
private final Consumer<K, V> consumer;
566566

@@ -3144,32 +3144,7 @@ public String toString() {
31443144
}
31453145

31463146
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3147-
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
3148-
}
3149-
3150-
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3151-
3152-
ConsumerAwareListenerMetadata() {
3153-
}
3154-
3155-
@Override
3156-
@Nullable
3157-
public String getListenerId() {
3158-
return getBeanName();
3159-
}
3160-
3161-
@Override
3162-
@Nullable
3163-
public String getGroupId() {
3164-
return ListenerConsumer.this.consumerGroupId;
3165-
}
3166-
3167-
@Override
3168-
@Nullable
3169-
public byte[] getListenerInfo() {
3170-
return ListenerConsumer.this.listenerinfo;
3171-
}
3172-
3147+
return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
31733148
}
31743149

31753150
private final class ConsumerAcknowledgment implements Acknowledgment {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.ObjectStreamClass;
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRecord;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2526
import org.apache.kafka.common.header.Header;
2627
import org.apache.kafka.common.header.Headers;
2728
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -194,5 +195,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter
194195
while (System.currentTimeMillis() < timeout);
195196
}
196197

198+
/**
199+
* Create a new {@link OffsetAndMetadata} using the given container and offset.
200+
* @param container a container.
201+
* @param offset an offset.
202+
* @return an offset and metadata.
203+
* @since 2.8.6
204+
*/
205+
public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container,
206+
long offset) {
207+
final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties()
208+
.getOffsetAndMetadataProvider();
209+
if (metadataProvider != null) {
210+
return metadataProvider.provide(new DefaultListenerMetadata(container), offset);
211+
}
212+
return new OffsetAndMetadata(offset);
213+
}
197214
}
198215

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.context.SmartLifecycle;
28+
import org.springframework.kafka.support.KafkaHeaders;
2829
import org.springframework.lang.Nullable;
2930

3031
/**
@@ -194,6 +195,17 @@ default String getListenerId() {
194195
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
195196
}
196197

198+
/**
199+
* Get arbitrary static information that will be added to the
200+
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
201+
* @return the info.
202+
* @since 2.8.4
203+
*/
204+
@Nullable
205+
default byte[] getListenerInfo() {
206+
throw new UnsupportedOperationException("This container does not support retrieving the listener info");
207+
}
208+
197209
/**
198210
* If this container has child containers, return true if at least one child is running. If there are not
199211
* child containers, returns {@link #isRunning()}.

spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2020

2121
/**
22-
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23-
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24-
* have more granularity in the way to create an {@link OffsetAndMetadata}.
22+
* Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
23+
* {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
2524
*
2625
* @author Francois Rosiere
2726
* @since 2.8.5

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
211211
ConsumerRecord<?, ?> record = records.get(0);
212212
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
213213
new TopicPartition(record.topic(), record.partition()),
214-
new OffsetAndMetadata(record.offset() + 1));
214+
ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
215215
if (container.getContainerProperties().isSyncCommits()) {
216216
consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
217217
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ void testClassifier() {
7575
Consumer<String, String> consumer = mock(Consumer.class);
7676
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
7777
MessageListenerContainer container = mock(MessageListenerContainer.class);
78+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
7879
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
7980
processor.process(records, consumer, container,
8081
new DeserializationException("intended", null, false, illegalState), true, EOSMode.V2);

spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import static org.mockito.BDDMockito.given;
2121
import static org.mockito.Mockito.mock;
2222

23+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2324
import org.junit.jupiter.api.Test;
2425

2526
/**
@@ -38,4 +39,24 @@ void stoppableSleep() throws InterruptedException {
3839
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
3940
}
4041

42+
@Test
43+
void testCreationOfOffsetAndMetadataWithoutProvider() {
44+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
45+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
46+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
47+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
48+
assertThat(offsetAndMetadata.metadata()).isEmpty();
49+
}
50+
51+
@Test
52+
void testCreationOfOffsetAndMetadataWithProvider() {
53+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
54+
final ContainerProperties properties = new ContainerProperties("foo");
55+
properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata"));
56+
given(container.getContainerProperties()).willReturn(properties);
57+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
58+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
59+
assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata");
60+
}
4161
}
62+

0 commit comments

Comments
 (0)