Skip to content

Commit f94620a

Browse files
authored
spring-projectsGH-2252: Keep offset metadata in case of batch reprocessing (spring-projects#2253)
* spring-projectsGH-2252: Keep offset metadata in case of batch reprocessing * spring-projectsGH-2252: Applied code review * spring-projectsGH-2252: Added missing static keyword
1 parent 731af32 commit f94620a

12 files changed

+133
-41
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2474,6 +2474,10 @@ Useful when the consumer code cannot determine that an `ErrorHandlingDeserialize
24742474
|`null`
24752475
|When present and `syncCommits` is `false` a callback invoked after the commit completes.
24762476

2477+
|[[offsetAndMetadataProvider]]<<offsetAndMetadataProvider,`offsetAndMetadataProvider`>>
2478+
|`null`
2479+
|A provider for `OffsetAndMetadata`; by default, the provider creates an offset and metadata with empty metadata. The provider gives a way to customize the metadata.
2480+
24772481
|[[commitLogLevel]]<<commitLogLevel,`commitLogLevel`>>
24782482
|DEBUG
24792483
|The logging level for logs pertaining to committing offsets.

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,13 +376,8 @@ public String getListenerId() {
376376
return this.beanName; // the container factory sets the bean name to the id attribute
377377
}
378378

379-
/**
380-
* Get arbitrary static information that will be added to the
381-
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
382-
* @return the info.
383-
* @since 2.8.4
384-
*/
385379
@Nullable
380+
@Override
386381
public byte[] getListenerInfo() {
387382
return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
388383
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @param <V> the value type.
4545
*
4646
* @author Gary Russell
47+
* @author Francois Rosiere
4748
*
4849
* @since 1.3.5
4950
*
@@ -139,7 +140,8 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
139140
ConsumerRecord<K, V> skipped = records.get(0);
140141
this.kafkaTemplate.sendOffsetsToTransaction(
141142
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
142-
new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata());
143+
createOffsetAndMetadata(container, skipped.offset() + 1)
144+
), consumer.groupMetadata());
143145
}
144146

145147
if (!recoverable && this.backOff != null) {
@@ -165,4 +167,10 @@ public void clearThreadState() {
165167
this.lastIntervals.remove();
166168
}
167169

170+
private static OffsetAndMetadata createOffsetAndMetadata(@Nullable MessageListenerContainer container, long offset) {
171+
if (container == null) {
172+
return new OffsetAndMetadata(offset);
173+
}
174+
return ListenerUtils.createOffsetAndMetadata(container, offset);
175+
}
168176
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2017-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.springframework.lang.Nullable;
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* Default implementation for {@link ListenerMetadata}.
24+
* @author Francois Rosiere
25+
* @since 2.8.6
26+
*/
27+
class DefaultListenerMetadata implements ListenerMetadata {
28+
29+
private final MessageListenerContainer container;
30+
31+
DefaultListenerMetadata(MessageListenerContainer container) {
32+
Assert.notNull(container, "'container' must not be null");
33+
this.container = container;
34+
}
35+
36+
@Override
37+
@Nullable
38+
public String getListenerId() {
39+
return this.container.getListenerId();
40+
}
41+
42+
@Override
43+
@Nullable
44+
public String getGroupId() {
45+
return this.container.getGroupId();
46+
}
47+
48+
@Override
49+
@Nullable
50+
public byte[] getListenerInfo() {
51+
return this.container.getListenerInfo();
52+
}
53+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
* fallback handler.
4848
*
4949
* @author Gary Russell
50+
* @author Francois Rosiere
5051
* @since 2.8
5152
*
5253
*/
@@ -138,7 +139,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
138139
}
139140
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
140141
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
141-
(key, val) -> new OffsetAndMetadata(rec.offset() + 1)));
142+
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
142143
if (offsets.size() > 0) {
143144
commit(consumer, container, offsets);
144145
}
@@ -149,7 +150,7 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
149150
ConsumerRecord<?, ?> recovered = remaining.get(0);
150151
commit(consumer, container,
151152
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
152-
new OffsetAndMetadata(recovered.offset() + 1)));
153+
ListenerUtils.createOffsetAndMetadata(container, recovered.offset() + 1)));
153154
if (remaining.size() > 1) {
154155
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
155156
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@
144144
* @author Tom van den Berge
145145
* @author Lukasz Kaminski
146146
* @author Tomaz Fernandes
147+
* @author Francois Rosiere
147148
*/
148149
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
149150
extends AbstractMessageListenerContainer<K, V> {
@@ -560,7 +561,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
560561
? (listenerMetadata, offset) -> new OffsetAndMetadata(offset)
561562
: this.containerProperties.getOffsetAndMetadataProvider();
562563

563-
private final ConsumerAwareListenerMetadata consumerAwareListenerMetadata = new ConsumerAwareListenerMetadata();
564+
private final ListenerMetadata listenerMetadata = new DefaultListenerMetadata(KafkaMessageListenerContainer.this);
564565

565566
private final Consumer<K, V> consumer;
566567

@@ -3151,32 +3152,7 @@ public String toString() {
31513152
}
31523153

31533154
private OffsetAndMetadata createOffsetAndMetadata(long offset) {
3154-
return this.offsetAndMetadataProvider.provide(this.consumerAwareListenerMetadata, offset);
3155-
}
3156-
3157-
private final class ConsumerAwareListenerMetadata implements ListenerMetadata {
3158-
3159-
ConsumerAwareListenerMetadata() {
3160-
}
3161-
3162-
@Override
3163-
@Nullable
3164-
public String getListenerId() {
3165-
return getBeanName();
3166-
}
3167-
3168-
@Override
3169-
@Nullable
3170-
public String getGroupId() {
3171-
return ListenerConsumer.this.consumerGroupId;
3172-
}
3173-
3174-
@Override
3175-
@Nullable
3176-
public byte[] getListenerInfo() {
3177-
return ListenerConsumer.this.listenerinfo;
3178-
}
3179-
3155+
return this.offsetAndMetadataProvider.provide(this.listenerMetadata, offset);
31803156
}
31813157

31823158
private final class ConsumerAcknowledgment implements Acknowledgment {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.ObjectStreamClass;
2323

2424
import org.apache.kafka.clients.consumer.ConsumerRecord;
25+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2526
import org.apache.kafka.common.header.Header;
2627
import org.apache.kafka.common.header.Headers;
2728
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -37,6 +38,7 @@
3738
* Listener utilities.
3839
*
3940
* @author Gary Russell
41+
* @author Francois Rosiere
4042
* @since 2.0
4143
*
4244
*/
@@ -194,5 +196,21 @@ public static void stoppableSleep(MessageListenerContainer container, long inter
194196
while (System.currentTimeMillis() < timeout);
195197
}
196198

199+
/**
200+
* Create a new {@link OffsetAndMetadata} using the given container and offset.
201+
* @param container a container.
202+
* @param offset an offset.
203+
* @return an offset and metadata.
204+
* @since 2.8.6
205+
*/
206+
public static OffsetAndMetadata createOffsetAndMetadata(MessageListenerContainer container,
207+
long offset) {
208+
final OffsetAndMetadataProvider metadataProvider = container.getContainerProperties()
209+
.getOffsetAndMetadataProvider();
210+
if (metadataProvider != null) {
211+
return metadataProvider.provide(new DefaultListenerMetadata(container), offset);
212+
}
213+
return new OffsetAndMetadata(offset);
214+
}
197215
}
198216

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.springframework.beans.factory.DisposableBean;
2727
import org.springframework.context.SmartLifecycle;
28+
import org.springframework.kafka.support.KafkaHeaders;
2829
import org.springframework.lang.Nullable;
2930

3031
/**
@@ -35,6 +36,7 @@
3536
* @author Gary Russell
3637
* @author Vladimir Tsanev
3738
* @author Tomaz Fernandes
39+
* @author Francois Rosiere
3840
*/
3941
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
4042

@@ -194,6 +196,17 @@ default String getListenerId() {
194196
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
195197
}
196198

199+
/**
200+
* Get arbitrary static information that will be added to the
201+
* {@link KafkaHeaders#LISTENER_INFO} header of all records.
202+
* @return the info.
203+
* @since 2.8.6
204+
*/
205+
@Nullable
206+
default byte[] getListenerInfo() {
207+
throw new UnsupportedOperationException("This container does not support retrieving the listener info");
208+
}
209+
197210
/**
198211
* If this container has child containers, return true if at least one child is running. If there are not
199212
* child containers, returns {@link #isRunning()}.

spring-kafka/src/main/java/org/springframework/kafka/listener/OffsetAndMetadataProvider.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2020

2121
/**
22-
* Provider for {@link OffsetAndMetadata}. In case of async commits of the offsets,
23-
* the provider can be used in combination with an {@link org.apache.kafka.clients.consumer.OffsetCommitCallback} to
24-
* have more granularity in the way to create an {@link OffsetAndMetadata}.
22+
* Provider for {@link OffsetAndMetadata}. The provider can be used to have more granularity when creating an
23+
* {@link OffsetAndMetadata}. The provider is used for both sync and async commits of the offsets.
2524
*
2625
* @author Francois Rosiere
2726
* @since 2.8.5

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* Seek utilities.
4545
*
4646
* @author Gary Russell
47+
* @author Francois Rosiere
4748
* @since 2.2
4849
*
4950
*/
@@ -211,7 +212,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
211212
ConsumerRecord<?, ?> record = records.get(0);
212213
Map<TopicPartition, OffsetAndMetadata> offsetToCommit = Collections.singletonMap(
213214
new TopicPartition(record.topic(), record.partition()),
214-
new OffsetAndMetadata(record.offset() + 1));
215+
ListenerUtils.createOffsetAndMetadata(container, record.offset() + 1));
215216
if (container.getContainerProperties().isSyncCommits()) {
216217
consumer.commitSync(offsetToCommit, container.getContainerProperties().getSyncCommitTimeout());
217218
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
/**
5050
* @author Gary Russell
51+
* @author Francois Rosiere
5152
* @since 2.3.1
5253
*
5354
*/
@@ -75,6 +76,7 @@ void testClassifier() {
7576
Consumer<String, String> consumer = mock(Consumer.class);
7677
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
7778
MessageListenerContainer container = mock(MessageListenerContainer.class);
79+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
7880
processor.process(records, consumer, container, illegalState, true, EOSMode.V2);
7981
processor.process(records, consumer, container,
8082
new DeserializationException("intended", null, false, illegalState), true, EOSMode.V2);

spring-kafka/src/test/java/org/springframework/kafka/listener/ListenerUtilsTests.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,10 +20,12 @@
2020
import static org.mockito.BDDMockito.given;
2121
import static org.mockito.Mockito.mock;
2222

23+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2324
import org.junit.jupiter.api.Test;
2425

2526
/**
2627
* @author Gary Russell
28+
* @author Francois Rosiere
2729
* @since 2.7.1
2830
*
2931
*/
@@ -38,4 +40,24 @@ void stoppableSleep() throws InterruptedException {
3840
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(500);
3941
}
4042

43+
@Test
44+
void testCreationOfOffsetAndMetadataWithoutProvider() {
45+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
46+
given(container.getContainerProperties()).willReturn(new ContainerProperties("foo"));
47+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
48+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
49+
assertThat(offsetAndMetadata.metadata()).isEmpty();
50+
}
51+
52+
@Test
53+
void testCreationOfOffsetAndMetadataWithProvider() {
54+
final MessageListenerContainer container = mock(MessageListenerContainer.class);
55+
final ContainerProperties properties = new ContainerProperties("foo");
56+
properties.setOffsetAndMetadataProvider((listenerMetadata, offset) -> new OffsetAndMetadata(offset, "my-metadata"));
57+
given(container.getContainerProperties()).willReturn(properties);
58+
final OffsetAndMetadata offsetAndMetadata = ListenerUtils.createOffsetAndMetadata(container, 1L);
59+
assertThat(offsetAndMetadata.offset()).isEqualTo(1);
60+
assertThat(offsetAndMetadata.metadata()).isEqualTo("my-metadata");
61+
}
4162
}
63+

0 commit comments

Comments
 (0)