Skip to content

Commit ab17537

Browse files
committed
spring-projectsGH-2528: DLPR - Support Header Replacement
Resolves spring-projects#2528 When adding headers in a DLPR headers function, header values accumulate because Kafka headers support multiple values. Provide a mechanism to allow adding a header to replace any existing header with that name. **cherry-pick to 2.9.x**
1 parent 5ddc0b8 commit ab17537

File tree

3 files changed

+69
-3
lines changed

3 files changed

+69
-3
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,9 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
621621

622622
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`
623623

624+
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
625+
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.
626+
624627
[[retry-topic-combine-blocking]]
625628
==== Combining Blocking and Non-Blocking Retries
626629

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
4141
import org.apache.kafka.common.PartitionInfo;
4242
import org.apache.kafka.common.TopicPartition;
4343
import org.apache.kafka.common.header.Headers;
44+
import org.apache.kafka.common.header.internals.RecordHeader;
4445
import org.apache.kafka.common.header.internals.RecordHeaders;
4546

4647
import org.springframework.core.log.LogAccessor;
@@ -707,7 +708,12 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
707708
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
708709
Headers headers = this.headersFunction.apply(record, exception);
709710
if (headers != null) {
710-
headers.forEach(kafkaHeaders::add);
711+
headers.forEach(header -> {
712+
if (header instanceof SingleRecordHeader) {
713+
kafkaHeaders.remove(header.key());
714+
}
715+
kafkaHeaders.add(header);
716+
});
711717
}
712718
}
713719

@@ -1374,4 +1380,33 @@ public interface ExceptionHeadersCreator {
13741380

13751381
}
13761382

1383+
/**
1384+
* A marker interface to indicate that this header should replace any existing headers
1385+
* with this name, rather than being appended to the headers, which is the normal behavior.
1386+
*
1387+
* @since 2.9.5
1388+
*/
1389+
public static class SingleRecordHeader extends RecordHeader {
1390+
1391+
/**
1392+
* Construct an instance.
1393+
* @param key the key.
1394+
* @param value the value.
1395+
*/
1396+
public SingleRecordHeader(String key, byte[] value) {
1397+
super(key, value);
1398+
}
1399+
1400+
/**
1401+
* Construct an instance.
1402+
* @param keyBuffer the key buffer.
1403+
* @param valueBuffer the value buffer.
1404+
*/
1405+
public SingleRecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
1406+
super(keyBuffer, valueBuffer);
1407+
}
1408+
1409+
1410+
}
1411+
13771412
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
6969
import org.springframework.kafka.core.ProducerFactory;
7070
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
71+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
7172
import org.springframework.kafka.support.KafkaHeaders;
7273
import org.springframework.kafka.support.SendResult;
7374
import org.springframework.kafka.support.converter.ConversionException;
@@ -878,6 +879,33 @@ void immutableHeaders() {
878879
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
879880
}
880881

882+
@SuppressWarnings({ "unchecked", "rawtypes" })
883+
@Test
884+
void replaceNotAppendHeader() {
885+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
886+
CompletableFuture future = mock(CompletableFuture.class);
887+
given(template.send(any(ProducerRecord.class))).willReturn(future);
888+
Headers headers = new RecordHeaders().add(new RecordHeader("foo", "orig".getBytes()));
889+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE,
890+
-1, -1, null, "bar", headers, Optional.empty());
891+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
892+
recoverer.setHeadersFunction((rec, ex) -> {
893+
RecordHeaders toReplace = new RecordHeaders(
894+
new RecordHeader[] { new SingleRecordHeader("foo", "one".getBytes()) });
895+
return toReplace;
896+
});
897+
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
898+
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
899+
verify(template).send(producerRecordCaptor.capture());
900+
ProducerRecord outRecord = producerRecordCaptor.getValue();
901+
Headers outHeaders = outRecord.headers();
902+
assertThat(KafkaTestUtils.getPropertyValue(outHeaders, "headers", List.class)).hasSize(11);
903+
Iterator<Header> iterator = outHeaders.headers("foo").iterator();
904+
assertThat(iterator.hasNext()).isTrue();
905+
assertThat(iterator.next().value()).isEqualTo("one".getBytes());
906+
assertThat(iterator.hasNext()).isFalse();
907+
}
908+
881909
@SuppressWarnings("unchecked")
882910
@Test
883911
void nonCompliantProducerFactory() throws Exception {

0 commit comments

Comments
 (0)