Skip to content

Commit 425f837

Browse files
authored
GH-2528: DLPR - Support Header Replacement (#2529)
Resolves #2528 When adding headers in a DLPR headers function, header values accumulate because Kafka headers support multiple values. Provide a mechanism to allow adding a header to replace any existing header with that name. **cherry-pick to 2.9.x** * Fix javadoc.
1 parent 5ddc0b8 commit 425f837

File tree

3 files changed

+78
-5
lines changed

3 files changed

+78
-5
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,9 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
621621

622622
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`
623623

624+
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
625+
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.
626+
624627
[[retry-topic-combine-blocking]]
625628
==== Combining Blocking and Non-Blocking Retries
626629

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

+46-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,9 @@
4040
import org.apache.kafka.clients.producer.ProducerRecord;
4141
import org.apache.kafka.common.PartitionInfo;
4242
import org.apache.kafka.common.TopicPartition;
43+
import org.apache.kafka.common.header.Header;
4344
import org.apache.kafka.common.header.Headers;
45+
import org.apache.kafka.common.header.internals.RecordHeader;
4446
import org.apache.kafka.common.header.internals.RecordHeaders;
4547

4648
import org.springframework.core.log.LogAccessor;
@@ -224,7 +226,9 @@ public void setRetainExceptionHeader(boolean retainExceptionHeader) {
224226

225227
/**
226228
* Set a function which will be called to obtain additional headers to add to the
227-
* published record.
229+
* published record. If a {@link Header} returned is an instance of
230+
* {@link SingleRecordHeader}, then that header will replace any existing header of
231+
* that name, rather than being appended as a new value.
228232
* @param headersFunction the headers function.
229233
* @since 2.5.4
230234
* @see #addHeadersFunction(BiFunction)
@@ -411,7 +415,10 @@ public void includeHeader(HeaderNames.HeadersToAdd... headers) {
411415
/**
412416
* Add a function which will be called to obtain additional headers to add to the
413417
* published record. Functions are called in the order that they are added, and after
414-
* any function passed into {@link #setHeadersFunction(BiFunction)}.
418+
* any function passed into {@link #setHeadersFunction(BiFunction)}. If a
419+
* {@link Header} returned is an instance of {@link SingleRecordHeader}, then that
420+
* header will replace any existing header of that name, rather than being appended as
421+
* a new value.
415422
* @param headersFunction the headers function.
416423
* @since 2.8.4
417424
* @see #setHeadersFunction(BiFunction)
@@ -707,7 +714,12 @@ private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, E
707714
maybeAddOriginalHeaders(kafkaHeaders, record, exception);
708715
Headers headers = this.headersFunction.apply(record, exception);
709716
if (headers != null) {
710-
headers.forEach(kafkaHeaders::add);
717+
headers.forEach(header -> {
718+
if (header instanceof SingleRecordHeader) {
719+
kafkaHeaders.remove(header.key());
720+
}
721+
kafkaHeaders.add(header);
722+
});
711723
}
712724
}
713725

@@ -1374,4 +1386,34 @@ public interface ExceptionHeadersCreator {
13741386

13751387
}
13761388

1389+
/**
1390+
* A {@link Header} that indicates that this header should replace any existing headers
1391+
* with this name, rather than being appended to the headers, which is the normal behavior.
1392+
*
1393+
* @since 2.9.5
1394+
* @see DeadLetterPublishingRecoverer#setHeadersFunction(BiFunction)
1395+
* @see DeadLetterPublishingRecoverer#addHeadersFunction(BiFunction)
1396+
*/
1397+
public static class SingleRecordHeader extends RecordHeader {
1398+
1399+
/**
1400+
* Construct an instance.
1401+
* @param key the key.
1402+
* @param value the value.
1403+
*/
1404+
public SingleRecordHeader(String key, byte[] value) {
1405+
super(key, value);
1406+
}
1407+
1408+
/**
1409+
* Construct an instance.
1410+
* @param keyBuffer the key buffer.
1411+
* @param valueBuffer the value buffer.
1412+
*/
1413+
public SingleRecordHeader(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
1414+
super(keyBuffer, valueBuffer);
1415+
}
1416+
1417+
}
1418+
13771419
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

+29-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
6969
import org.springframework.kafka.core.ProducerFactory;
7070
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
71+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
7172
import org.springframework.kafka.support.KafkaHeaders;
7273
import org.springframework.kafka.support.SendResult;
7374
import org.springframework.kafka.support.converter.ConversionException;
@@ -878,6 +879,33 @@ void immutableHeaders() {
878879
assertThat(KafkaTestUtils.getPropertyValue(headers, "headers", List.class)).hasSize(12);
879880
}
880881

882+
@SuppressWarnings({ "unchecked", "rawtypes" })
883+
@Test
884+
void replaceNotAppendHeader() {
885+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
886+
CompletableFuture future = mock(CompletableFuture.class);
887+
given(template.send(any(ProducerRecord.class))).willReturn(future);
888+
Headers headers = new RecordHeaders().add(new RecordHeader("foo", "orig".getBytes()));
889+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE,
890+
-1, -1, null, "bar", headers, Optional.empty());
891+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
892+
recoverer.setHeadersFunction((rec, ex) -> {
893+
RecordHeaders toReplace = new RecordHeaders(
894+
new RecordHeader[] { new SingleRecordHeader("foo", "one".getBytes()) });
895+
return toReplace;
896+
});
897+
recoverer.accept(record, new ListenerExecutionFailedException("test", "group", new RuntimeException()));
898+
ArgumentCaptor<ProducerRecord> producerRecordCaptor = ArgumentCaptor.forClass(ProducerRecord.class);
899+
verify(template).send(producerRecordCaptor.capture());
900+
ProducerRecord outRecord = producerRecordCaptor.getValue();
901+
Headers outHeaders = outRecord.headers();
902+
assertThat(KafkaTestUtils.getPropertyValue(outHeaders, "headers", List.class)).hasSize(11);
903+
Iterator<Header> iterator = outHeaders.headers("foo").iterator();
904+
assertThat(iterator.hasNext()).isTrue();
905+
assertThat(iterator.next().value()).isEqualTo("one".getBytes());
906+
assertThat(iterator.hasNext()).isFalse();
907+
}
908+
881909
@SuppressWarnings("unchecked")
882910
@Test
883911
void nonCompliantProducerFactory() throws Exception {

0 commit comments

Comments
 (0)